Introduce CLEANUP_UNKNOWN_AND_EXCLUDED step (#149931)

In the context of migrations,
https://github.com/elastic/kibana/pull/147371 avoids reindexing during
an upgrade, provided that `diffMappings === false`.

This _alternative path_ skips some key steps that are performed before
reindexing:
* `CHECK_UNKNOWN_DOCUMENTS`
* `CALCULATE_EXCLUDE_FILTERS`

These steps enrich a search query that is used during reindexing,
effectively filtering out undesired documents.

If the mappings [match](https://github.com/elastic/kibana/pull/147371)
(or they are
[compatible](https://github.com/elastic/kibana/pull/149326)) and we _no
longer reindex_, this cleanup operation does not happen, leaving
undesired documents in our system indices.

The goal of this PR is to add an extra step in the state machine
(`CLEANUP_UNKNOWN_AND_EXCLUDED`), which will actively cleanup a system
index if we're going the _skip reindexing_ path.


![image](https://user-images.githubusercontent.com/25349407/216979691-fef40638-f990-4850-bac8-ee3e58330a7f.png)

Fixes https://github.com/elastic/kibana/issues/150299

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Gerard Soldevila 2023-02-27 11:43:13 +01:00 committed by GitHub
parent a5a51fd0fb
commit 754e8682d4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
46 changed files with 2905 additions and 625 deletions

View file

@ -6,4 +6,4 @@
* Side Public License, v 1.
*/
export { Server, Root, bootstrap } from './src';
export { Server, registerServiceConfig, Root, bootstrap } from './src';

View file

@ -7,5 +7,6 @@
*/
export { Server } from './server';
export { registerServiceConfig } from './register_service_config';
export { bootstrap } from './bootstrap';
export { Root } from './root';

View file

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { config as pathConfig } from '@kbn/utils';
import { ConfigService } from '@kbn/config';
import type { ServiceConfigDescriptor } from '@kbn/core-base-server-internal';
import { config as loggingConfig } from '@kbn/core-logging-server-internal';
import { coreDeprecationProvider } from '@kbn/core-config-server-internal';
import { nodeConfig } from '@kbn/core-node-server-internal';
import { pidConfig } from '@kbn/core-environment-server-internal';
import { executionContextConfig } from '@kbn/core-execution-context-server-internal';
import { config as httpConfig, cspConfig, externalUrlConfig } from '@kbn/core-http-server-internal';
import { config as elasticsearchConfig } from '@kbn/core-elasticsearch-server-internal';
import { opsConfig } from '@kbn/core-metrics-server-internal';
import {
savedObjectsConfig,
savedObjectsMigrationConfig,
} from '@kbn/core-saved-objects-base-server-internal';
import { config as i18nConfig } from '@kbn/core-i18n-server-internal';
import { config as deprecationConfig } from '@kbn/core-deprecations-server-internal';
import { statusConfig } from '@kbn/core-status-server-internal';
import { uiSettingsConfig } from '@kbn/core-ui-settings-server-internal';
import { config as pluginsConfig } from '@kbn/core-plugins-server-internal';
import { elasticApmConfig } from './root/elastic_config';
const rootConfigPath = '';
export function registerServiceConfig(configService: ConfigService) {
const configDescriptors: Array<ServiceConfigDescriptor<unknown>> = [
cspConfig,
deprecationConfig,
elasticsearchConfig,
elasticApmConfig,
executionContextConfig,
externalUrlConfig,
httpConfig,
i18nConfig,
loggingConfig,
nodeConfig,
opsConfig,
pathConfig,
pidConfig,
pluginsConfig,
savedObjectsConfig,
savedObjectsMigrationConfig,
statusConfig,
uiSettingsConfig,
];
configService.addDeprecationProvider(rootConfigPath, coreDeprecationProvider);
for (const descriptor of configDescriptors) {
if (descriptor.deprecations) {
configService.addDeprecationProvider(descriptor.path, descriptor.deprecations);
}
configService.setSchema(descriptor.path, descriptor.schema);
}
}

View file

@ -7,57 +7,30 @@
*/
import apm from 'elastic-apm-node';
import { config as pathConfig } from '@kbn/utils';
import { reportPerformanceMetricEvent } from '@kbn/ebt-tools';
import type { Logger, LoggerFactory } from '@kbn/logging';
import { ConfigService, Env, RawConfigurationProvider } from '@kbn/config';
import type { ServiceConfigDescriptor } from '@kbn/core-base-server-internal';
import { DocLinksService } from '@kbn/core-doc-links-server-internal';
import {
LoggingService,
ILoggingSystem,
config as loggingConfig,
} from '@kbn/core-logging-server-internal';
import {
coreDeprecationProvider,
ensureValidConfiguration,
} from '@kbn/core-config-server-internal';
import { NodeService, nodeConfig } from '@kbn/core-node-server-internal';
import { LoggingService, ILoggingSystem } from '@kbn/core-logging-server-internal';
import { ensureValidConfiguration } from '@kbn/core-config-server-internal';
import { NodeService } from '@kbn/core-node-server-internal';
import { AnalyticsService } from '@kbn/core-analytics-server-internal';
import type { AnalyticsServiceSetup, AnalyticsServiceStart } from '@kbn/core-analytics-server';
import { EnvironmentService, pidConfig } from '@kbn/core-environment-server-internal';
import {
ExecutionContextService,
executionContextConfig,
} from '@kbn/core-execution-context-server-internal';
import { EnvironmentService } from '@kbn/core-environment-server-internal';
import { ExecutionContextService } from '@kbn/core-execution-context-server-internal';
import { PrebootService } from '@kbn/core-preboot-server-internal';
import { ContextService } from '@kbn/core-http-context-server-internal';
import {
HttpService,
config as httpConfig,
cspConfig,
externalUrlConfig,
} from '@kbn/core-http-server-internal';
import {
ElasticsearchService,
config as elasticsearchConfig,
} from '@kbn/core-elasticsearch-server-internal';
import { MetricsService, opsConfig } from '@kbn/core-metrics-server-internal';
import { HttpService } from '@kbn/core-http-server-internal';
import { ElasticsearchService } from '@kbn/core-elasticsearch-server-internal';
import { MetricsService } from '@kbn/core-metrics-server-internal';
import { CapabilitiesService } from '@kbn/core-capabilities-server-internal';
import type { SavedObjectsServiceStart } from '@kbn/core-saved-objects-server';
import {
savedObjectsConfig,
savedObjectsMigrationConfig,
} from '@kbn/core-saved-objects-base-server-internal';
import { SavedObjectsService } from '@kbn/core-saved-objects-server-internal';
import { I18nService, config as i18nConfig } from '@kbn/core-i18n-server-internal';
import {
DeprecationsService,
config as deprecationConfig,
} from '@kbn/core-deprecations-server-internal';
import { I18nService } from '@kbn/core-i18n-server-internal';
import { DeprecationsService } from '@kbn/core-deprecations-server-internal';
import { CoreUsageDataService } from '@kbn/core-usage-data-server-internal';
import { StatusService, statusConfig } from '@kbn/core-status-server-internal';
import { UiSettingsService, uiSettingsConfig } from '@kbn/core-ui-settings-server-internal';
import { StatusService } from '@kbn/core-status-server-internal';
import { UiSettingsService } from '@kbn/core-ui-settings-server-internal';
import { CustomBrandingService } from '@kbn/core-custom-branding-server-internal';
import {
CoreRouteHandlerContext,
@ -75,16 +48,11 @@ import type {
InternalCoreSetup,
InternalCoreStart,
} from '@kbn/core-lifecycle-server-internal';
import {
DiscoveredPlugins,
PluginsService,
config as pluginsConfig,
} from '@kbn/core-plugins-server-internal';
import { DiscoveredPlugins, PluginsService } from '@kbn/core-plugins-server-internal';
import { CoreAppsService } from '@kbn/core-apps-server-internal';
import { elasticApmConfig } from './root/elastic_config';
import { registerServiceConfig } from './register_service_config';
const coreId = Symbol('core');
const rootConfigPath = '';
const KIBANA_STARTED_EVENT = 'kibana_started';
/** @internal */
@ -465,34 +433,7 @@ export class Server {
}
public setupCoreConfig() {
const configDescriptors: Array<ServiceConfigDescriptor<unknown>> = [
cspConfig,
deprecationConfig,
elasticsearchConfig,
elasticApmConfig,
executionContextConfig,
externalUrlConfig,
httpConfig,
i18nConfig,
loggingConfig,
nodeConfig,
opsConfig,
pathConfig,
pidConfig,
pluginsConfig,
savedObjectsConfig,
savedObjectsMigrationConfig,
statusConfig,
uiSettingsConfig,
];
this.configService.addDeprecationProvider(rootConfigPath, coreDeprecationProvider);
for (const descriptor of configDescriptors) {
if (descriptor.deprecations) {
this.configService.addDeprecationProvider(descriptor.path, descriptor.deprecations);
}
this.configService.setSchema(descriptor.path, descriptor.schema);
}
registerServiceConfig(this.configService);
}
/**

View file

@ -9,7 +9,11 @@
export { DocumentMigrator, KibanaMigrator, buildActiveMappings, mergeTypes } from './src';
export type { KibanaMigratorOptions } from './src';
export { getAggregatedTypesDocuments } from './src/actions/check_for_unknown_docs';
export { addExcludedTypesToBoolQuery } from './src/model/helpers';
export {
addExcludedTypesToBoolQuery,
createBulkIndexOperationTuple,
createBulkDeleteOperationBody,
} from './src/model/helpers';
// these are only used for integration tests
export {

View file

@ -369,9 +369,9 @@ completed this step:
- temp index has a write block
- temp index is not found
### New control state
1. If `currentBatch` is the last batch in `transformedDocBatches`
1. If `currentBatch` is the last batch in `bulkOperationBatches`
`REINDEX_SOURCE_TO_TEMP_READ`
2. If there are more batches left in `transformedDocBatches`
2. If there are more batches left in `bulkOperationBatches`
`REINDEX_SOURCE_TO_TEMP_INDEX_BULK`
## REINDEX_SOURCE_TO_TEMP_CLOSE_PIT

View file

@ -18,6 +18,9 @@ Object {
"duration": 0,
"state": Object {
"batchSize": 1000,
"bulkOperationBatches": Array [
Array [],
],
"controlState": "LEGACY_REINDEX",
"currentAlias": ".my-so-index",
"discardCorruptObjects": false,
@ -126,22 +129,6 @@ Object {
"type": "ui-counter",
},
},
Object {
"bool": Object {
"must": Array [
Object {
"match": Object {
"type": "search-session",
},
},
Object {
"match": Object {
"search-session.persisted": false,
},
},
],
},
},
],
},
},
@ -190,7 +177,6 @@ Object {
},
},
},
"transformedDocBatches": Array [],
"versionAlias": ".my-so-index_7.11.0",
"versionIndex": ".my-so-index_7.11.0_001",
"waitForMigrationCompletion": false,
@ -214,6 +200,9 @@ Object {
"duration": 0,
"state": Object {
"batchSize": 1000,
"bulkOperationBatches": Array [
Array [],
],
"controlState": "LEGACY_DELETE",
"currentAlias": ".my-so-index",
"discardCorruptObjects": false,
@ -322,22 +311,6 @@ Object {
"type": "ui-counter",
},
},
Object {
"bool": Object {
"must": Array [
Object {
"match": Object {
"type": "search-session",
},
},
Object {
"match": Object {
"search-session.persisted": false,
},
},
],
},
},
],
},
},
@ -390,7 +363,6 @@ Object {
},
},
},
"transformedDocBatches": Array [],
"versionAlias": ".my-so-index_7.11.0",
"versionIndex": ".my-so-index_7.11.0_001",
"waitForMigrationCompletion": false,
@ -414,6 +386,9 @@ Object {
"duration": 0,
"state": Object {
"batchSize": 1000,
"bulkOperationBatches": Array [
Array [],
],
"controlState": "LEGACY_DELETE",
"currentAlias": ".my-so-index",
"discardCorruptObjects": false,
@ -522,22 +497,6 @@ Object {
"type": "ui-counter",
},
},
Object {
"bool": Object {
"must": Array [
Object {
"match": Object {
"type": "search-session",
},
},
Object {
"match": Object {
"search-session.persisted": false,
},
},
],
},
},
],
},
},
@ -594,7 +553,6 @@ Object {
},
},
},
"transformedDocBatches": Array [],
"versionAlias": ".my-so-index_7.11.0",
"versionIndex": ".my-so-index_7.11.0_001",
"waitForMigrationCompletion": false,
@ -618,6 +576,9 @@ Object {
"duration": 0,
"state": Object {
"batchSize": 1000,
"bulkOperationBatches": Array [
Array [],
],
"controlState": "DONE",
"currentAlias": ".my-so-index",
"discardCorruptObjects": false,
@ -726,22 +687,6 @@ Object {
"type": "ui-counter",
},
},
Object {
"bool": Object {
"must": Array [
Object {
"match": Object {
"type": "search-session",
},
},
Object {
"match": Object {
"search-session.persisted": false,
},
},
],
},
},
],
},
},
@ -802,7 +747,6 @@ Object {
},
},
},
"transformedDocBatches": Array [],
"versionAlias": ".my-so-index_7.11.0",
"versionIndex": ".my-so-index_7.11.0_001",
"waitForMigrationCompletion": false,
@ -864,6 +808,15 @@ Object {
"duration": 0,
"state": Object {
"batchSize": 1000,
"bulkOperationBatches": Array [
Array [
Object {
"index": Object {
"_id": "1234",
},
},
],
],
"controlState": "LEGACY_DELETE",
"currentAlias": ".my-so-index",
"discardCorruptObjects": false,
@ -972,22 +925,6 @@ Object {
"type": "ui-counter",
},
},
Object {
"bool": Object {
"must": Array [
Object {
"match": Object {
"type": "search-session",
},
},
Object {
"match": Object {
"search-session.persisted": false,
},
},
],
},
},
],
},
},
@ -1041,13 +978,6 @@ Object {
},
},
},
"transformedDocBatches": Array [
Array [
Object {
"_id": "1234",
},
],
],
"versionAlias": ".my-so-index_7.11.0",
"versionIndex": ".my-so-index_7.11.0_001",
"waitForMigrationCompletion": false,
@ -1071,6 +1001,15 @@ Object {
"duration": 0,
"state": Object {
"batchSize": 1000,
"bulkOperationBatches": Array [
Array [
Object {
"index": Object {
"_id": "1234",
},
},
],
],
"controlState": "FATAL",
"currentAlias": ".my-so-index",
"discardCorruptObjects": false,
@ -1179,22 +1118,6 @@ Object {
"type": "ui-counter",
},
},
Object {
"bool": Object {
"must": Array [
Object {
"match": Object {
"type": "search-session",
},
},
Object {
"match": Object {
"search-session.persisted": false,
},
},
],
},
},
],
},
},
@ -1252,13 +1175,6 @@ Object {
},
},
},
"transformedDocBatches": Array [
Array [
Object {
"_id": "1234",
},
],
],
"versionAlias": ".my-so-index_7.11.0",
"versionIndex": ".my-so-index_7.11.0_001",
"waitForMigrationCompletion": false,

View file

@ -40,7 +40,7 @@ describe('bulkOverwriteTransformedDocuments', () => {
const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
operations: [],
refresh: 'wait_for',
});
@ -74,7 +74,7 @@ describe('bulkOverwriteTransformedDocuments', () => {
const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
operations: [],
refresh: 'wait_for',
});
@ -99,7 +99,7 @@ describe('bulkOverwriteTransformedDocuments', () => {
const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
operations: [],
refresh: 'wait_for',
});
try {
@ -140,7 +140,7 @@ describe('bulkOverwriteTransformedDocuments', () => {
const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
operations: [],
refresh: 'wait_for',
});
@ -193,7 +193,7 @@ describe('bulkOverwriteTransformedDocuments', () => {
const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
operations: [],
refresh: 'wait_for',
});

View file

@ -11,7 +11,6 @@ import * as TaskEither from 'fp-ts/lib/TaskEither';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { errors as esErrors } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
@ -19,33 +18,13 @@ import {
import { isWriteBlockException, isIndexNotFoundException } from './es_errors';
import { WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE } from './constants';
import type { TargetIndexHadWriteBlock, RequestEntityTooLargeException, IndexNotFound } from '.';
/**
* Given a document and index, creates a valid body for the Bulk API.
*/
export const createBulkOperationBody = (doc: SavedObjectsRawDoc, index: string) => {
return [
{
index: {
_index: index,
_id: doc._id,
// overwrite existing documents
op_type: 'index',
// use optimistic concurrency control to ensure that outdated
// documents are only overwritten once with the latest version
if_seq_no: doc._seq_no,
if_primary_term: doc._primary_term,
},
},
doc._source,
];
};
import type { BulkOperation } from '../model/create_batches';
/** @internal */
export interface BulkOverwriteTransformedDocumentsParams {
client: ElasticsearchClient;
index: string;
transformedDocs: SavedObjectsRawDoc[];
operations: BulkOperation[];
refresh?: estypes.Refresh;
}
@ -57,7 +36,7 @@ export const bulkOverwriteTransformedDocuments =
({
client,
index,
transformedDocs,
operations,
refresh = false,
}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither<
| RetryableEsClientError
@ -67,10 +46,6 @@ export const bulkOverwriteTransformedDocuments =
'bulk_index_succeeded'
> =>
() => {
const body = transformedDocs.flatMap((doc) => {
return createBulkOperationBody(doc, index);
});
return client
.bulk({
// Because we only add aliases in the MARK_VERSION_INDEX_READY step we
@ -80,11 +55,13 @@ export const bulkOverwriteTransformedDocuments =
// mappings. Such tampering could lead to many other problems and is
// probably unlikely so for now we'll accept this risk and wait till
// system indices puts in place a hard control.
index,
require_alias: false,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
refresh,
filter_path: ['items.*.error'],
body,
// we need to unwrap the existing BulkIndexOperationTuple's
operations: operations.flat(),
})
.then((res) => {
// Filter out version_conflict_engine_exception since these just mean

View file

@ -28,7 +28,7 @@ describe('calculateExcludeFilters', () => {
expect(hook2).toHaveBeenCalledWith({ readonlyEsClient: { search: expect.any(Function) } });
expect(Either.isRight(result)).toBe(true);
expect((result as Either.Right<any>).right).toEqual({
mustNotClauses: [
filterClauses: [
{ bool: { must: { term: { fieldA: '123' } } } },
{ bool: { must: { term: { fieldB: 'abc' } } } },
],
@ -49,7 +49,7 @@ describe('calculateExcludeFilters', () => {
expect(Either.isRight(result)).toBe(true);
expect((result as Either.Right<any>).right).toEqual({
mustNotClauses: [{ bool: { must: { term: { fieldB: 'abc' } } } }],
filterClauses: [{ bool: { must: { term: { fieldB: 'abc' } } } }],
errorsByType: { type1: error },
});
});
@ -91,7 +91,7 @@ describe('calculateExcludeFilters', () => {
expect(Either.isRight(result)).toBe(true);
expect((result as Either.Right<any>).right).toEqual({
mustNotClauses: [{ bool: { must: { term: { fieldB: 'abc' } } } }],
filterClauses: [{ bool: { must: { term: { fieldB: 'abc' } } } }],
errorsByType: expect.any(Object),
});
expect((result as Either.Right<any>).right.errorsByType.type1.toString()).toMatchInlineSnapshot(

View file

@ -23,7 +23,7 @@ export interface CalculateExcludeFiltersParams {
export interface CalculatedExcludeFilter {
/** Array with all the clauses that must be bool.must_not'ed */
mustNotClauses: QueryDslQueryContainer[];
filterClauses: QueryDslQueryContainer[];
/** Any errors that were encountered during filter calculation, keyed by the type name */
errorsByType: Record<string, Error>;
}
@ -91,17 +91,17 @@ export const calculateExcludeFilters =
}
const errorsByType: Array<[string, Error]> = [];
const mustNotClauses: QueryDslQueryContainer[] = [];
const filterClauses: QueryDslQueryContainer[] = [];
// Loop through all results and collect successes and errors
results.forEach((r) =>
Either.isRight(r)
? mustNotClauses.push(r.right)
? filterClauses.push(r.right)
: Either.isLeft(r) && errorsByType.push([r.left.soType, r.left.error as Error])
);
return Either.right({
mustNotClauses,
filterClauses,
errorsByType: Object.fromEntries(errorsByType),
});
});

View file

@ -116,7 +116,7 @@ export const checkForUnknownDocs =
RetryableEsClientError,
UnknownDocsFound | {}
> =>
async () => {
() => {
const excludeQuery = addExcludedTypesToBoolQuery(knownTypes, excludeOnUpgradeQuery.bool);
return getAggregatedTypesDocuments(client, indexName, excludeQuery)
.then((unknownDocs) => {

View file

@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
export const emptyResponseClientMock = elasticsearchClientMock.createInternalClient(
Promise.resolve({
took: 0,
timed_out: false,
_shards: {
total: 1,
successful: 1,
skipped: 0,
failed: 0,
},
hits: {
total: {
value: 0,
relation: 'eq',
},
max_score: null,
hits: [],
},
})
);
export const initialExcludeOnUpgradeQueryMock = {
bool: {
must_not: [
{
term: {
type: 'apm-services-telemetry',
},
},
{
term: {
type: 'application_usage_transactional',
},
},
{
term: {
type: 'background-session',
},
},
{
term: {
type: 'cases-sub-case',
},
},
{
term: {
type: 'csp_rule',
},
},
{
term: {
type: 'file-upload-telemetry',
},
},
{
term: {
type: 'fleet-agent-actions',
},
},
{
term: {
type: 'fleet-agent-events',
},
},
{
term: {
type: 'fleet-agents',
},
},
{
term: {
type: 'fleet-enrollment-api-keys',
},
},
{
term: {
type: 'guided-setup-state',
},
},
{
term: {
type: 'maps-telemetry',
},
},
{
term: {
type: 'ml-telemetry',
},
},
{
term: {
type: 'osquery-usage-metric',
},
},
{
term: {
type: 'server',
},
},
{
term: {
type: 'siem-detection-engine-rule-execution-info',
},
},
{
term: {
type: 'siem-detection-engine-rule-status',
},
},
{
term: {
type: 'timelion-sheet',
},
},
{
term: {
type: 'tsvb-validation-telemetry',
},
},
{
term: {
type: 'ui-counter',
},
},
],
},
};

View file

@ -0,0 +1,276 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import * as Either from 'fp-ts/lib/Either';
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import { checkForUnknownDocs, type DocumentIdAndType } from './check_for_unknown_docs';
import { cleanupUnknownAndExcluded } from './cleanup_unknown_and_excluded';
import { calculateExcludeFilters } from './calculate_exclude_filters';
import { deleteByQuery } from './delete_by_query';
import {
emptyResponseClientMock,
initialExcludeOnUpgradeQueryMock,
} from './cleanup_unknown_and_excluded.mocks';
jest.mock('./check_for_unknown_docs');
jest.mock('./calculate_exclude_filters');
jest.mock('./delete_by_query');
const mockCheckForUnknownDocs = checkForUnknownDocs as jest.MockedFunction<
typeof checkForUnknownDocs
>;
const mockCalculateExcludeFilters = calculateExcludeFilters as jest.MockedFunction<
typeof calculateExcludeFilters
>;
const mockDeleteByQuery = deleteByQuery as jest.MockedFunction<typeof deleteByQuery>;
describe('cleanupUnknownAndExcluded', () => {
const unknownDocs: DocumentIdAndType[] = [
{ id: 'dashboard:12345', type: 'dashboard' },
{ id: 'dashboard:67890', type: 'dashboard' },
];
const excludeFromUpgradeFilterHooks = {
'search-session': async () => {
return {
bool: {
must: [
{ term: { type: 'search-session' } },
{ match: { 'search-session.persisted': false } },
],
},
};
},
};
beforeEach(() => {
jest.clearAllMocks();
});
it('calls `Actions.checkForUnknownDocs()` with the correct params', async () => {
mockCheckForUnknownDocs.mockReturnValueOnce(async () => Either.right({}));
mockCalculateExcludeFilters.mockReturnValueOnce(async () =>
Either.right({
filterClauses: [],
errorsByType: {},
})
);
mockDeleteByQuery.mockReturnValueOnce(async () =>
Either.right({
taskId: '1234',
})
);
const task = cleanupUnknownAndExcluded({
client: emptyResponseClientMock, // the client will not be called anyway
indexName: '.kibana_8.0.0',
discardUnknownDocs: false,
excludeOnUpgradeQuery: initialExcludeOnUpgradeQueryMock,
excludeFromUpgradeFilterHooks,
hookTimeoutMs: 50,
knownTypes: ['foo', 'bar'],
removedTypes: ['server', 'deprecated'],
});
await task();
expect(checkForUnknownDocs).toHaveBeenCalledTimes(1);
expect(checkForUnknownDocs).toHaveBeenCalledWith({
client: emptyResponseClientMock,
indexName: '.kibana_8.0.0',
excludeOnUpgradeQuery: initialExcludeOnUpgradeQueryMock,
knownTypes: ['foo', 'bar'],
});
});
it('fails if there are unknown docs and `discardUnknownDocs === false`', async () => {
mockCheckForUnknownDocs.mockReturnValueOnce(async () =>
Either.right({
type: 'unknown_docs_found',
unknownDocs,
})
);
const task = cleanupUnknownAndExcluded({
client: emptyResponseClientMock,
indexName: '.kibana_8.0.0',
discardUnknownDocs: false,
excludeOnUpgradeQuery: initialExcludeOnUpgradeQueryMock,
excludeFromUpgradeFilterHooks,
hookTimeoutMs: 50,
knownTypes: ['foo', 'bar'],
removedTypes: ['server', 'deprecated'],
});
const result = await task();
expect(Either.isLeft(result)).toBe(true);
expect((result as Either.Left<any>).left).toEqual({
type: 'unknown_docs_found',
unknownDocs,
});
expect(calculateExcludeFilters).not.toHaveBeenCalled();
expect(deleteByQuery).not.toHaveBeenCalled();
});
describe('if there are no unknown documents', () => {
it('calls `Actions.calculateExcludeFilters()` with the correct params', async () => {
mockCheckForUnknownDocs.mockReturnValueOnce(async () => Either.right({}));
mockCalculateExcludeFilters.mockReturnValueOnce(async () =>
Either.right({
filterClauses: [],
errorsByType: {},
})
);
mockDeleteByQuery.mockReturnValueOnce(async () =>
Either.right({
taskId: '1234',
})
);
const task = cleanupUnknownAndExcluded({
client: emptyResponseClientMock,
indexName: '.kibana_8.0.0',
discardUnknownDocs: false,
excludeOnUpgradeQuery: initialExcludeOnUpgradeQueryMock,
excludeFromUpgradeFilterHooks,
hookTimeoutMs: 50,
knownTypes: ['foo', 'bar'],
removedTypes: ['server', 'deprecated'],
});
await task();
expect(calculateExcludeFilters).toHaveBeenCalledTimes(1);
expect(calculateExcludeFilters).toHaveBeenCalledWith({
client: emptyResponseClientMock,
excludeFromUpgradeFilterHooks,
hookTimeoutMs: 50,
});
});
});
describe('if there are unknown documents and `discardUnknownDocuments === true`', () => {
it('calls `Actions.calculateExcludeFilters()` with the correct params', async () => {
mockCheckForUnknownDocs.mockReturnValueOnce(async () =>
Either.right({
type: 'unknown_docs_found',
unknownDocs,
})
);
mockCalculateExcludeFilters.mockReturnValueOnce(async () =>
Either.right({
filterClauses: [],
errorsByType: {},
})
);
mockDeleteByQuery.mockReturnValueOnce(async () =>
Either.right({
taskId: '1234',
})
);
const task = cleanupUnknownAndExcluded({
client: emptyResponseClientMock,
indexName: '.kibana_8.0.0',
discardUnknownDocs: true,
excludeOnUpgradeQuery: initialExcludeOnUpgradeQueryMock,
excludeFromUpgradeFilterHooks,
hookTimeoutMs: 28,
knownTypes: ['foo', 'bar'],
removedTypes: ['server', 'deprecated'],
});
await task();
expect(calculateExcludeFilters).toHaveBeenCalledTimes(1);
expect(calculateExcludeFilters).toHaveBeenCalledWith({
client: emptyResponseClientMock,
excludeFromUpgradeFilterHooks,
hookTimeoutMs: 28,
});
});
});
it('calls `deleteByQuery` with the correct params', async () => {
mockCheckForUnknownDocs.mockReturnValueOnce(async () =>
Either.right({
type: 'unknown_docs_found',
unknownDocs,
})
);
const filterClauses: QueryDslQueryContainer[] = [
{
bool: {
must: [
{ term: { type: 'search-session' } },
{ match: { 'search-session.persisted': false } },
],
},
},
];
const errorsByType = { type1: new Error('an error!') };
mockCalculateExcludeFilters.mockReturnValueOnce(async () =>
Either.right({ filterClauses, errorsByType })
);
mockDeleteByQuery.mockReturnValueOnce(async () =>
Either.right({
taskId: '1234',
})
);
const task = cleanupUnknownAndExcluded({
client: emptyResponseClientMock,
indexName: '.kibana_8.0.0',
discardUnknownDocs: true,
excludeOnUpgradeQuery: initialExcludeOnUpgradeQueryMock,
excludeFromUpgradeFilterHooks,
hookTimeoutMs: 28,
knownTypes: ['foo', 'bar'],
removedTypes: ['server', 'deprecated'],
});
const result = await task();
expect(deleteByQuery).toHaveBeenCalledTimes(1);
expect(deleteByQuery).toHaveBeenCalledWith({
client: emptyResponseClientMock,
indexName: '.kibana_8.0.0',
query: {
bool: {
should: [
// excluded from upgrade hook response
{
bool: {
must: [
{ term: { type: 'search-session' } },
{ match: { 'search-session.persisted': false } },
],
},
},
{ term: { type: 'server' } }, // removed type
{ term: { type: 'deprecated' } }, // removed type
{ term: { type: 'dashboard' } }, // unknown type
],
},
},
conflicts: 'proceed',
refresh: false,
});
expect(Either.isRight(result)).toBe(true);
expect((result as Either.Right<any>).right).toEqual({
type: 'cleanup_started' as const,
taskId: '1234',
unknownDocs,
errorsByType,
});
});
});

View file

@ -0,0 +1,133 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import { pipe } from 'fp-ts/lib/function';
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectTypeExcludeFromUpgradeFilterHook } from '@kbn/core-saved-objects-server';
import type { RetryableEsClientError } from './catch_retryable_es_client_errors';
import {
checkForUnknownDocs,
type DocumentIdAndType,
type UnknownDocsFound,
} from './check_for_unknown_docs';
import { isTypeof } from '.';
import { CalculatedExcludeFilter, calculateExcludeFilters } from './calculate_exclude_filters';
import { deleteByQuery } from './delete_by_query';
/** @internal */
export interface CleanupUnknownAndExcludedParams {
client: ElasticsearchClient;
indexName: string;
discardUnknownDocs: boolean;
excludeOnUpgradeQuery: QueryDslQueryContainer;
excludeFromUpgradeFilterHooks: Record<string, SavedObjectTypeExcludeFromUpgradeFilterHook>;
hookTimeoutMs?: number;
knownTypes: string[];
removedTypes: string[];
}
/** @internal */
export interface CleanupStarted {
type: 'cleanup_started';
/** Sample (1000 types * 100 docs per type) of the unknown documents that have been found */
unknownDocs: DocumentIdAndType[];
/** Any errors that were encountered during filter calculation, keyed by the type name */
errorsByType: Record<string, Error>;
/** the id of the asynchronous delete task */
taskId: string;
}
/**
* Cleans up unknown and excluded types from the specified index.
*/
export const cleanupUnknownAndExcluded = ({
client,
indexName,
discardUnknownDocs,
excludeOnUpgradeQuery,
excludeFromUpgradeFilterHooks,
hookTimeoutMs,
knownTypes,
removedTypes,
}: CleanupUnknownAndExcludedParams): TaskEither.TaskEither<
RetryableEsClientError | UnknownDocsFound,
CleanupStarted
> => {
let unknownDocs: DocumentIdAndType[] = [];
let unknownDocTypes: string[] = [];
let errorsByType: Record<string, Error> = {};
return pipe(
// check if there are unknown docs
checkForUnknownDocs({ client, indexName, knownTypes, excludeOnUpgradeQuery }),
// make sure we are allowed to get rid of them (in case there are some)
TaskEither.chainEitherKW((unknownDocsRes: {} | UnknownDocsFound) => {
if (isTypeof(unknownDocsRes, 'unknown_docs_found')) {
unknownDocs = unknownDocsRes.unknownDocs;
unknownDocTypes = [...new Set(unknownDocs.map(({ type }) => type))];
if (!discardUnknownDocs) {
return Either.left({
type: 'unknown_docs_found' as const,
unknownDocs: unknownDocsRes.unknownDocs,
});
}
}
return Either.right(undefined);
}),
// calculate exclude filters (we use them to build the query for documents that must be deleted)
TaskEither.chainW(
(): TaskEither.TaskEither<RetryableEsClientError, CalculatedExcludeFilter> =>
calculateExcludeFilters({ client, excludeFromUpgradeFilterHooks, hookTimeoutMs })
),
// actively delete unwanted documents
TaskEither.chainW((excludeFiltersRes) => {
errorsByType = excludeFiltersRes.errorsByType;
// we must delete everything that matches:
// - any of the plugin-defined exclude filters
// - OR any of the unknown types
const deleteQuery: QueryDslQueryContainer = {
bool: {
should: [
...excludeFiltersRes.filterClauses,
...removedTypes.map((type) => ({ term: { type } })),
...unknownDocTypes.map((type) => ({ term: { type } })),
],
},
};
return deleteByQuery({
client,
indexName,
query: deleteQuery,
// we want to delete as many docs as we can in the current attempt
conflicts: 'proceed',
// instead of forcing refresh after each delete attempt,
// we opt for a delayRetry mechanism when conflicts appear,
// letting the periodic refresh kick in
refresh: false,
});
}),
// map response output
TaskEither.chainEitherKW((res) => {
return Either.right({
type: 'cleanup_started' as const,
taskId: res.taskId,
unknownDocs,
errorsByType,
});
})
);
};

View file

@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import * as Either from 'fp-ts/lib/Either';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { deleteByQuery } from './delete_by_query';
jest.mock('./catch_retryable_es_client_errors');
describe('deleteByQuery', () => {
const deleteQuery = {
bool: {
should: ['server', 'deprecated'].map((type) => ({
term: {
type,
},
})),
},
};
beforeEach(() => {
jest.clearAllMocks();
});
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
// Create a mock client that rejects all methods with a 503 status code response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const task = deleteByQuery({
client,
indexName: '.kibana_8.0.0',
query: deleteQuery,
conflicts: 'proceed',
refresh: true,
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
it('calls `client.deleteByQuery` with the correct parameters', async () => {
const client = elasticsearchClientMock.createInternalClient(
Promise.resolve({ hits: { hits: [] } })
);
const task = deleteByQuery({
client,
indexName: '.kibana_8.0.0',
query: deleteQuery,
conflicts: 'proceed',
refresh: true,
});
await task();
expect(client.deleteByQuery).toHaveBeenCalledTimes(1);
expect(client.deleteByQuery).toHaveBeenCalledWith({
index: '.kibana_8.0.0',
query: deleteQuery,
refresh: true,
wait_for_completion: false,
conflicts: 'proceed',
});
});
it('resolves with `Either.right` if the delete task is successfully created', async () => {
const client = elasticsearchClientMock.createInternalClient(
Promise.resolve({
took: 147,
timed_out: false,
task: 1234,
})
);
const task = deleteByQuery({
client,
indexName: '.kibana_8.0.0',
query: deleteQuery,
conflicts: 'proceed',
refresh: true,
});
const result = await task();
expect(Either.isRight(result)).toBe(true);
expect((result as Either.Right<any>).right).toEqual({ taskId: '1234' });
});
});

View file

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import type { Conflicts, QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
} from './catch_retryable_es_client_errors';
/** @internal */
export interface DeleteByQueryParams {
client: ElasticsearchClient;
indexName: string;
query: QueryDslQueryContainer;
conflicts: Conflicts;
refresh?: boolean;
}
/** @internal */
export interface DeleteByQueryResponse {
taskId: string;
}
/**
* Deletes documents matching the provided query
*/
export const deleteByQuery =
({
client,
indexName,
query,
conflicts,
refresh = false,
}: DeleteByQueryParams): TaskEither.TaskEither<RetryableEsClientError, DeleteByQueryResponse> =>
() => {
return client
.deleteByQuery({
index: indexName,
query,
refresh,
conflicts,
wait_for_completion: false,
})
.then(({ task: taskId }) => {
return Either.right({ taskId: String(taskId!) });
})
.catch(catchRetryableEsClientErrors);
};

View file

@ -76,6 +76,10 @@ import type { AliasNotFound, RemoveIndexNotAConcreteIndex } from './update_alias
export type { AliasAction, UpdateAliasesParams } from './update_aliases';
export { updateAliases } from './update_aliases';
export { cleanupUnknownAndExcluded } from './cleanup_unknown_and_excluded';
export { waitForDeleteByQueryTask } from './wait_for_delete_by_query_task';
export type { CreateIndexParams } from './create_index';
export { createIndex } from './create_index';

View file

@ -0,0 +1,199 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as Option from 'fp-ts/lib/Option';
import { errors as EsErrors, TransportResult } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { waitForDeleteByQueryTask } from './wait_for_delete_by_query_task';
import { waitForTask } from './wait_for_task';
jest.mock('./wait_for_task');
const mockWaitForTask = waitForTask as jest.MockedFunction<typeof waitForTask>;
describe('waitForDeleteByQueryTask', () => {
const client = elasticsearchClientMock.createInternalClient(
Promise.resolve(elasticsearchClientMock.createApiResponse({}))
);
beforeEach(() => {
jest.clearAllMocks();
});
it('calls waitForTask() with the appropriate params', async () => {
// Mock wait for delete finished successfully
mockWaitForTask.mockReturnValueOnce(
TaskEither.right({
completed: true,
error: Option.none,
failures: Option.none,
description: 'some description',
})
);
const task = waitForDeleteByQueryTask({
client,
taskId: 'some task id',
timeout: '60s',
});
await task();
expect(waitForTask).toHaveBeenCalledWith({
client,
taskId: 'some task id',
timeout: '60s',
});
});
describe('when waitForTask() method rejects with a task completion timeout error', () => {
it('catches the error and returns the appropriate Left response', async () => {
// Mock task completion error
const error = createError({
body: { error: { type: 'timeout_exception', reason: 'es_reason' } },
});
mockWaitForTask.mockReturnValueOnce(
TaskEither.left({
type: 'wait_for_task_completion_timeout' as const,
message: '[timeout_exception] es_reason',
error,
})
);
const task = waitForDeleteByQueryTask({
client,
taskId: 'my task id',
timeout: '60s',
});
const res = await task();
expect(res).toEqual(
Either.left({
type: 'wait_for_task_completion_timeout' as const,
message: '[timeout_exception] es_reason',
error,
})
);
});
});
describe('when waitForTask() method rejects with a retryable error', () => {
it('catches the error and returns the appropriate Left response', async () => {
// Mock retryable error
const error = createError({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
});
mockWaitForTask.mockReturnValueOnce(
TaskEither.left({
type: 'retryable_es_client_error' as const,
message: 'es_type',
error,
})
);
const task = waitForDeleteByQueryTask({
client,
taskId: 'my task id',
timeout: '60s',
});
const res = await task();
expect(res).toEqual(
Either.left({
type: 'retryable_es_client_error' as const,
message: 'es_type',
error,
})
);
});
});
describe('when waitForTask() method finishes successfully, but there are failures', () => {
it('returns a Left response, with the list of failures', async () => {
// Mock successful with failures
const failures = ['dashboard:12345 - Failed to delete', 'dashboard:67890 - Failed to delete'];
mockWaitForTask.mockReturnValueOnce(
TaskEither.right({
completed: true,
failures: Option.some(failures),
error: Option.none,
})
);
const task = waitForDeleteByQueryTask({
client,
taskId: 'my task id',
timeout: '60s',
});
const res = await task();
expect(res).toEqual(
Either.left({
type: 'cleanup_failed' as const,
failures,
})
);
});
});
describe('when waitForTask() method throws an unexpected error', () => {
it('rethrows the error', async () => {
// Mock unexpected 500 Server Error
const error = createError({
statusCode: 500,
body: { error: { type: 'server_error', reason: 'Something really bad happened' } },
});
mockWaitForTask.mockReturnValueOnce(async () => {
throw error;
});
const task = waitForDeleteByQueryTask({
client,
taskId: 'my task id',
timeout: '60s',
});
expect(task()).rejects.toEqual(error);
});
});
describe('when waitForTask() method finishes successfully without failures', () => {
it('finsihes with a cleanup_successful Right clause', async () => {
// Mock wait for delete finished successfully
mockWaitForTask.mockReturnValueOnce(
TaskEither.right({
completed: true,
error: Option.none,
failures: Option.none,
description: 'some description',
})
);
const task = waitForDeleteByQueryTask({
client,
taskId: 'my task id',
timeout: '60s',
});
const res = await task();
expect(res).toEqual(Either.right({ type: 'cleanup_successful' as const }));
});
});
});
const createError = (esResponse: Partial<TransportResult>) => {
return new EsErrors.ResponseError(elasticsearchClientMock.createApiResponse(esResponse));
};

View file

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as Option from 'fp-ts/lib/Option';
import { flow } from 'fp-ts/lib/function';
import { waitForTask } from './wait_for_task';
/** @internal */
export interface CleanupErrorResponse {
type: 'cleanup_failed';
failures: string[];
versionConflicts?: number;
}
/** @internal */
export interface CleanupSuccessfulResponse {
type: 'cleanup_successful';
deleted?: number;
}
export const waitForDeleteByQueryTask = flow(
waitForTask,
TaskEither.chainW(
(res): TaskEither.TaskEither<CleanupErrorResponse, CleanupSuccessfulResponse> => {
if (Option.isSome(res.failures) || res.response?.version_conflicts) {
return TaskEither.left({
type: 'cleanup_failed' as const,
failures: Option.isSome(res.failures) ? res.failures.value : [],
versionConflicts: res.response?.version_conflicts,
});
} else if (Option.isSome(res.error)) {
throw new Error(
'waitForDeleteByQueryTask task failed with the following error:\n' +
JSON.stringify(res.error.value)
);
} else {
return TaskEither.right({
type: 'cleanup_successful' as const,
deleted: res.response?.deleted,
});
}
}
)
);

View file

@ -10,32 +10,33 @@ import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors
import { errors as EsErrors } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { waitForPickupUpdatedMappingsTask } from './wait_for_pickup_updated_mappings_task';
import { setWriteBlock } from './set_write_block';
jest.mock('./catch_retryable_es_client_errors');
jest.mock('./catch_retryable_es_client_errors', () => {
const { catchRetryableEsClientErrors: actualImplementation } = jest.requireActual(
'./catch_retryable_es_client_errors'
);
return {
catchRetryableEsClientErrors: jest.fn(actualImplementation),
};
});
describe('waitForPickupUpdatedMappingsTask', () => {
beforeEach(() => {
jest.clearAllMocks();
});
// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const nonRetryableError = new Error('crash');
const clientWithNonRetryableError = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError)
);
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const task = waitForPickupUpdatedMappingsTask({
client,
taskId: 'my task id',
@ -50,11 +51,16 @@ describe('waitForPickupUpdatedMappingsTask', () => {
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
it('re-throws non retry-able errors', async () => {
const task = setWriteBlock({
client: clientWithNonRetryableError,
index: 'my_index',
const nonRetryableError = new Error('crash');
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError)
);
const task = waitForPickupUpdatedMappingsTask({
client,
taskId: 'my task id',
timeout: '2m',
});
await task();
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError);
expect(task()).rejects.toThrowError(nonRetryableError);
});
});

View file

@ -5,44 +5,113 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { errors as EsErrors } from '@elastic/elasticsearch';
import { waitForTask } from './wait_for_task';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
jest.mock('./catch_retryable_es_client_errors');
import * as Either from 'fp-ts/lib/Either';
import { errors as EsErrors, TransportResult } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { waitForTask } from './wait_for_task';
describe('waitForTask', () => {
beforeEach(() => {
jest.clearAllMocks();
});
// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
it('calls tasks API get() with the correct parameters', async () => {
// Mock client that rejects with a retryable error
const { client } = createErrorClient({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
});
const task = waitForTask({
client,
taskId: 'my task id',
timeout: '60s',
});
await task();
expect(client.tasks.get).toHaveBeenCalledTimes(1);
expect(client.tasks.get).toHaveBeenCalledWith({
task_id: 'my task id',
wait_for_completion: true,
timeout: '60s',
});
});
describe('when tasks API get() method rejects with a task completion timeout error', () => {
it('catches the error and returns the appropriate Left response', async () => {
// Mock client that rejects with a task completion timeout error
const { client, error } = createErrorClient({
body: { error: { type: 'timeout_exception', reason: 'es_reason' } },
});
describe('waitForPickupUpdatedMappingsTask', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = waitForTask({
client,
taskId: 'my task id',
timeout: '60s',
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
const res = await task();
expect(res).toEqual(
Either.left({
type: 'wait_for_task_completion_timeout' as const,
message: '[timeout_exception] es_reason',
error,
})
);
});
});
describe('when tasks API get() method rejects with a retryable error', () => {
it('catches the error and returns the appropriate Left response', async () => {
// Mock client that rejects with a 503 status code
const { client, error } = createErrorClient({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
});
const task = waitForTask({
client,
taskId: 'my task id',
timeout: '60s',
});
const res = await task();
expect(res).toEqual(
Either.left({
type: 'retryable_es_client_error' as const,
message: 'es_type',
error,
})
);
});
});
describe('when tasks API get() method rejects with an unexpected error', () => {
it('rethrows the error', async () => {
// Mock client that rejects with a 500 Server Error
const { client, error } = createErrorClient({
statusCode: 500,
body: { error: { type: 'server_error', reason: 'Something really bad happened' } },
});
const task = waitForTask({
client,
taskId: 'my task id',
timeout: '60s',
});
expect(task()).rejects.toEqual(error);
});
});
});
const createErrorClient = (esResponse: Partial<TransportResult>) => {
const error = new EsErrors.ResponseError(elasticsearchClientMock.createApiResponse(esResponse));
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(error)
);
return { client, error };
};

View file

@ -5,7 +5,7 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type * as estypes from '@elastic/elasticsearch/lib/api/types';
import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as Option from 'fp-ts/lib/Option';
@ -22,6 +22,7 @@ export interface WaitForTaskResponse {
completed: boolean;
failures: Option.Option<any[]>;
description?: string;
response?: estypes.TasksTaskStatus;
}
/**
@ -90,6 +91,7 @@ export const waitForTask =
error: Option.fromNullable(body.error as estypes.ErrorCauseKeys),
failures: failures.length > 0 ? Option.some(failures) : Option.none,
description: body.task.description,
response: body.response,
});
})
.catch(catchWaitForTaskCompletionTimeout)

View file

@ -47,10 +47,6 @@ export const REMOVED_TYPES: string[] = [
'csp_rule',
].sort();
// When migrating from the outdated index we use a read query which excludes
// saved objects which are no longer used. These saved objects will still be
// kept in the outdated index for backup purposes, but won't be available in
// the upgraded index.
export const excludeUnusedTypesQuery: QueryDslQueryContainer = {
bool: {
must_not: [
@ -59,23 +55,6 @@ export const excludeUnusedTypesQuery: QueryDslQueryContainer = {
type: typeName,
},
})),
// https://github.com/elastic/kibana/issues/96131
{
bool: {
must: [
{
match: {
type: 'search-session',
},
},
{
match: {
'search-session.persisted': false,
},
},
],
},
},
],
},
};

View file

@ -163,22 +163,6 @@ describe('createInitialState', () => {
"type": "ui-counter",
},
},
Object {
"bool": Object {
"must": Array [
Object {
"match": Object {
"type": "search-session",
},
},
Object {
"match": Object {
"search-session.persisted": false,
},
},
],
},
},
],
},
},

View file

@ -17,7 +17,7 @@ import * as Either from 'fp-ts/lib/Either';
import * as Option from 'fp-ts/lib/Option';
import { errors } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { AllControlStates, State } from './state';
import type { AllControlStates, State } from './state';
import { createInitialState } from './initial_state';
import { ByteSizeValue } from '@kbn/config-schema';
@ -102,7 +102,9 @@ describe('migrationsStateActionMachine', () => {
...initialState,
reason: 'the fatal reason',
outdatedDocuments: [{ _id: '1234', password: 'sensitive password' }],
transformedDocBatches: [[{ _id: '1234', password: 'sensitive transformed password' }]],
bulkOperationBatches: [
[[{ index: { _id: '1234' } }, { password: 'sensitive transformed password' }]],
],
} as State,
logger: mockLogger.get(),
model: transitionModel(['LEGACY_DELETE', 'FATAL']),

View file

@ -15,9 +15,11 @@ import {
getRequestDebugMeta,
} from '@kbn/core-elasticsearch-client-server-internal';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import type { BulkOperationContainer } from '@elastic/elasticsearch/lib/api/types';
import { type Model, type Next, stateActionMachine } from './state_action_machine';
import { cleanup } from './migrations_state_machine_cleanup';
import type { ReindexSourceToTempTransform, ReindexSourceToTempIndexBulk, State } from './state';
import type { BulkOperation } from './model/create_batches';
interface StateTransitionLogMeta extends LogMeta {
kibana: {
@ -128,9 +130,9 @@ export async function migrationStateActionMachine({
),
},
...{
transformedDocBatches: (
(newState as ReindexSourceToTempIndexBulk).transformedDocBatches ?? []
).map((batches) => batches.map((doc) => ({ _id: doc._id }))) as [SavedObjectsRawDoc[]],
bulkOperationBatches: redactBulkOperationBatches(
(newState as ReindexSourceToTempIndexBulk).bulkOperationBatches ?? [[]]
),
},
};
@ -212,3 +214,11 @@ export async function migrationStateActionMachine({
}
}
}
const redactBulkOperationBatches = (
bulkOperationBatches: BulkOperation[][]
): BulkOperationContainer[][] => {
return bulkOperationBatches.map((batch) =>
batch.map((operation) => (Array.isArray(operation) ? operation[0] : operation))
);
};

View file

@ -10,8 +10,13 @@ import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import { createBatches } from './create_batches';
describe('createBatches', () => {
const DOCUMENT_SIZE_BYTES = 128;
const INDEX = '.kibana_version_index';
const documentToOperation = (document: SavedObjectsRawDoc) => [
{ index: { _id: document._id } },
document._source,
];
const DOCUMENT_SIZE_BYTES = 77; // 76 + \n
it('returns right one batch if all documents fit in maxBatchSizeBytes', () => {
const documents = [
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title ¹' } },
@ -19,8 +24,8 @@ describe('createBatches', () => {
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title ®' } },
];
expect(createBatches(documents, INDEX, DOCUMENT_SIZE_BYTES * 3)).toEqual(
Either.right([documents])
expect(createBatches({ documents, maxBatchSizeBytes: DOCUMENT_SIZE_BYTES * 3 })).toEqual(
Either.right([documents.map(documentToOperation)])
);
});
it('creates multiple batches with each batch limited to maxBatchSizeBytes', () => {
@ -31,32 +36,36 @@ describe('createBatches', () => {
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title 44' } },
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title 55' } },
];
expect(createBatches(documents, INDEX, DOCUMENT_SIZE_BYTES * 2)).toEqual(
Either.right([[documents[0], documents[1]], [documents[2], documents[3]], [documents[4]]])
expect(createBatches({ documents, maxBatchSizeBytes: DOCUMENT_SIZE_BYTES * 2 })).toEqual(
Either.right([
documents.slice(0, 2).map(documentToOperation),
documents.slice(2, 4).map(documentToOperation),
documents.slice(4).map(documentToOperation),
])
);
});
it('creates a single empty batch if there are no documents', () => {
const documents = [] as SavedObjectsRawDoc[];
expect(createBatches(documents, INDEX, 100)).toEqual(Either.right([[]]));
expect(createBatches({ documents, maxBatchSizeBytes: 100 })).toEqual(Either.right([[]]));
});
it('throws if any one document exceeds the maxBatchSizeBytes', () => {
const documents = [
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title ¹' } },
{ _id: 'foo', _source: { type: 'dashboard', title: 'my saved object title ¹' } },
{
_id: '',
_id: 'bar',
_source: {
type: 'dashboard',
title: 'my saved object title ² with a very long title that exceeds max size bytes',
},
},
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title ®' } },
{ _id: 'baz', _source: { type: 'dashboard', title: 'my saved object title ®' } },
];
expect(createBatches(documents, INDEX, 178)).toEqual(
expect(createBatches({ documents, maxBatchSizeBytes: 120 })).toEqual(
Either.left({
maxBatchSizeBytes: 178,
docSizeBytes: 179,
maxBatchSizeBytes: 120,
docSizeBytes: 130,
type: 'document_exceeds_batch_size_bytes',
document: documents[1],
documentId: documents[1]._id,
})
);
});

View file

@ -7,27 +7,50 @@
*/
import * as Either from 'fp-ts/lib/Either';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import { createBulkOperationBody } from '../actions/bulk_overwrite_transformed_documents';
import type { SavedObjectsRawDoc, SavedObjectsRawDocSource } from '@kbn/core-saved-objects-server';
import type { BulkOperationContainer } from '@elastic/elasticsearch/lib/api/types';
import { createBulkDeleteOperationBody, createBulkIndexOperationTuple } from './helpers';
import type { TransformErrorObjects } from '../core';
export type BulkIndexOperationTuple = [BulkOperationContainer, SavedObjectsRawDocSource];
export type BulkOperation = BulkIndexOperationTuple | BulkOperationContainer;
export interface CreateBatchesParams {
documents: SavedObjectsRawDoc[];
corruptDocumentIds?: string[];
transformErrors?: TransformErrorObjects[];
maxBatchSizeBytes: number;
}
export interface DocumentExceedsBatchSize {
documentId: string;
type: 'document_exceeds_batch_size_bytes';
docSizeBytes: number;
maxBatchSizeBytes: number;
}
/**
* Creates batches of documents to be used by the bulk API. Each batch will
* have a request body content length that's <= maxBatchSizeBytes
*/
export function createBatches(
docs: SavedObjectsRawDoc[],
index: string,
maxBatchSizeBytes: number
) {
export function createBatches({
documents,
corruptDocumentIds = [],
transformErrors = [],
maxBatchSizeBytes,
}: CreateBatchesParams): Either.Either<DocumentExceedsBatchSize, BulkOperation[][]> {
/* To build up the NDJSON request body we construct an array of objects like:
* [
* {"index": ...}
* {"title": "my saved object"}
* {"delete": ...}
* {"delete": ...}
* ...
* ]
* However, when we call JSON.stringify on this array the resulting string
* will be surrounded by `[]` which won't be present in the NDJSON so these
* two characters need to be removed from the size calculation.
* For indexing operations, createBulkIndexOperationTuple
* returns a tuple of the form [{operation, id}, {document}]
* Thus, for batch size calculations, we must take into account
* that this tuple's surrounding brackets `[]` won't be present in the NDJSON
*/
const BRACKETS_BYTES = 2;
/* Each document in the NDJSON (including the last one) needs to be
@ -36,29 +59,68 @@ export function createBatches(
*/
const NDJSON_NEW_LINE_BYTES = 1;
const batches = [[]] as [SavedObjectsRawDoc[]];
const BASE_DELETE_OPERATION_SIZE = Buffer.byteLength(
JSON.stringify(createBulkDeleteOperationBody('')),
'utf8'
);
const batches: BulkOperation[][] = [[]];
let currBatch = 0;
let currBatchSizeBytes = 0;
for (const doc of docs) {
const bulkOperationBody = createBulkOperationBody(doc, index);
const docSizeBytes =
Buffer.byteLength(JSON.stringify(bulkOperationBody), 'utf8') -
BRACKETS_BYTES +
NDJSON_NEW_LINE_BYTES;
if (docSizeBytes > maxBatchSizeBytes) {
return Either.left({
type: 'document_exceeds_batch_size_bytes',
docSizeBytes,
maxBatchSizeBytes,
document: doc,
});
} else if (currBatchSizeBytes + docSizeBytes <= maxBatchSizeBytes) {
batches[currBatch].push(doc);
currBatchSizeBytes = currBatchSizeBytes + docSizeBytes;
// group operations in batches of at most maxBatchSize
const assignToBatch = (
operation: BulkOperationContainer | BulkIndexOperationTuple,
operationSizeBytes: number
): boolean => {
operationSizeBytes += NDJSON_NEW_LINE_BYTES;
if (operationSizeBytes > maxBatchSizeBytes) {
// the current operation (+ payload) does not even fit a single batch, fail!
return false;
} else if (currBatchSizeBytes + operationSizeBytes <= maxBatchSizeBytes) {
batches[currBatch].push(operation);
currBatchSizeBytes = currBatchSizeBytes + operationSizeBytes;
} else {
currBatch++;
batches[currBatch] = [doc];
currBatchSizeBytes = docSizeBytes;
batches[currBatch] = [operation];
currBatchSizeBytes = operationSizeBytes;
}
return true;
};
// create index (update) operations for all transformed documents
for (const document of documents) {
const bulkIndexOperationBody = createBulkIndexOperationTuple(document);
// take into account that this tuple's surrounding brackets `[]` won't be present in the NDJSON
const docSizeBytes =
Buffer.byteLength(JSON.stringify(bulkIndexOperationBody), 'utf8') - BRACKETS_BYTES;
if (!assignToBatch(bulkIndexOperationBody, docSizeBytes)) {
return Either.left({
documentId: document._id,
type: 'document_exceeds_batch_size_bytes' as const,
docSizeBytes,
maxBatchSizeBytes,
});
}
}
// create delete operations for all corrupt documents + transform errors
const unwantedDocumentIds = [
...corruptDocumentIds,
...transformErrors.map(({ rawId: documentId }) => documentId),
];
for (const documentId of unwantedDocumentIds) {
const bulkDeleteOperationBody = createBulkDeleteOperationBody(documentId);
const docSizeBytes = BASE_DELETE_OPERATION_SIZE + Buffer.byteLength(documentId, 'utf8');
if (!assignToBatch(bulkDeleteOperationBody, docSizeBytes)) {
return Either.left({
documentId,
type: 'document_exceeds_batch_size_bytes' as const,
docSizeBytes,
maxBatchSizeBytes,
});
}
}

View file

@ -8,13 +8,16 @@
import { gt, valid } from 'semver';
import type {
BulkOperationContainer,
QueryDslBoolQuery,
QueryDslQueryContainer,
} from '@elastic/elasticsearch/lib/api/types';
import * as Either from 'fp-ts/lib/Either';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import type { State } from '../state';
import type { AliasAction, FetchIndexResponse } from '../actions';
import type { BulkIndexOperationTuple } from './create_batches';
/**
* A helper function/type for ensuring that all control state's are handled.
@ -132,11 +135,11 @@ export function addMustClausesToBoolQuery(
/**
* Add the given clauses to the 'must_not' of the given query
* @param boolQuery the bool query to be enriched
* @param mustNotClauses the clauses to be added to a 'must_not'
* @param filterClauses the clauses to be added to a 'must_not'
* @returns a new query container with the enriched query
*/
export function addMustNotClausesToBoolQuery(
mustNotClauses: QueryDslQueryContainer[],
filterClauses: QueryDslQueryContainer[],
boolQuery?: QueryDslBoolQuery
): QueryDslQueryContainer {
let mustNot: QueryDslQueryContainer[] = [];
@ -145,7 +148,7 @@ export function addMustNotClausesToBoolQuery(
mustNot = mustNot.concat(boolQuery.must_not);
}
mustNot.push(...mustNotClauses);
mustNot.push(...filterClauses);
return {
bool: {
@ -205,3 +208,28 @@ export function buildRemoveAliasActions(
return [{ remove: { index, alias, must_exist: true } }];
});
}
/**
* Given a document, creates a valid body to index the document using the Bulk API.
*/
export const createBulkIndexOperationTuple = (doc: SavedObjectsRawDoc): BulkIndexOperationTuple => {
return [
{
index: {
_id: doc._id,
// use optimistic concurrency control to ensure that outdated
// documents are only overwritten once with the latest version
...(typeof doc._seq_no !== 'undefined' && { if_seq_no: doc._seq_no }),
...(typeof doc._primary_term !== 'undefined' && { if_primary_term: doc._primary_term }),
},
},
doc._source,
];
};
/**
* Given a document id, creates a valid body to delete the document using the Bulk API.
*/
export const createBulkDeleteOperationBody = (_id: string): BulkOperationContainer => ({
delete: { _id },
});

View file

@ -11,48 +11,51 @@ import * as Option from 'fp-ts/lib/Option';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import type {
BaseState,
CalculateExcludeFiltersState,
CheckTargetMappingsState,
CheckUnknownDocumentsState,
CheckVersionIndexReadyActions,
CleanupUnknownAndExcluded,
CleanupUnknownAndExcludedWaitForTaskState,
CloneTempToSource,
CreateNewTargetState,
CreateReindexTempState,
FatalState,
State,
LegacySetWriteBlockState,
SetSourceWriteBlockState,
LegacyCreateReindexTargetState,
LegacyDeleteState,
LegacyReindexState,
LegacyReindexWaitForTaskState,
LegacyDeleteState,
ReindexSourceToTempOpenPit,
ReindexSourceToTempRead,
ReindexSourceToTempClosePit,
ReindexSourceToTempTransform,
RefreshTarget,
UpdateTargetMappingsState,
UpdateTargetMappingsWaitForTaskState,
LegacySetWriteBlockState,
MarkVersionIndexReady,
MarkVersionIndexReadyConflict,
OutdatedDocumentsSearchClosePit,
OutdatedDocumentsSearchOpenPit,
OutdatedDocumentsSearchRead,
OutdatedDocumentsSearchClosePit,
OutdatedDocumentsTransform,
MarkVersionIndexReady,
BaseState,
CreateReindexTempState,
MarkVersionIndexReadyConflict,
CreateNewTargetState,
CloneTempToSource,
SetTempWriteBlock,
WaitForYellowSourceState,
TransformedDocumentsBulkIndex,
ReindexSourceToTempIndexBulk,
CheckUnknownDocumentsState,
CalculateExcludeFiltersState,
PostInitState,
CheckVersionIndexReadyActions,
UpdateTargetMappingsMeta,
CheckTargetMappingsState,
PrepareCompatibleMigration,
RefreshTarget,
ReindexSourceToTempClosePit,
ReindexSourceToTempIndexBulk,
ReindexSourceToTempOpenPit,
ReindexSourceToTempRead,
ReindexSourceToTempTransform,
SetSourceWriteBlockState,
SetTempWriteBlock,
State,
TransformedDocumentsBulkIndex,
UpdateTargetMappingsMeta,
UpdateTargetMappingsState,
UpdateTargetMappingsWaitForTaskState,
WaitForYellowSourceState,
} from '../state';
import { type TransformErrorObjects, TransformSavedObjectDocumentError } from '../core';
import type { AliasAction, RetryableEsClientError } from '../actions';
import type { ResponseType } from '../next';
import { createInitialProgress } from './progress';
import { model } from './model';
import type { BulkIndexOperationTuple, BulkOperation } from './create_batches';
describe('migrations v2 model', () => {
const indexMapping: IndexMapping = {
@ -112,6 +115,26 @@ describe('migrations v2 model', () => {
waitForMigrationCompletion: false,
};
const aProcessedDoc = {
_id: 'a:b',
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
};
const processedDocs: SavedObjectsRawDoc[] = [aProcessedDoc];
const bulkOperationBatches: BulkOperation[][] = [
[
[
{
index: {
_id: aProcessedDoc._id,
},
},
aProcessedDoc._source,
],
],
];
describe('exponential retry delays for retryable_es_client_error', () => {
let state: State = {
...baseState,
@ -1235,16 +1258,7 @@ describe('migrations v2 model', () => {
});
describe('and mappings match (diffMappings == false)', () => {
const unchangedMappingsState: State = {
...waitForYellowSourceState,
controlState: 'WAIT_FOR_YELLOW_SOURCE',
kibanaVersion: '7.12.0', // new version!
currentAlias: '.kibana',
versionAlias: '.kibana_7.12.0',
versionIndex: '.kibana_7.11.0_001',
};
test('WAIT_FOR_YELLOW_SOURCE -> PREPARE_COMPATIBLE_MIGRATION', () => {
test('WAIT_FOR_YELLOW_SOURCE -> CLEANUP_UNKNOWN_AND_EXCLUDED', () => {
const res: ResponseType<'WAIT_FOR_YELLOW_SOURCE'> = Either.right({
'.kibana_7.11.0_001': {
aliases: {
@ -1255,44 +1269,11 @@ describe('migrations v2 model', () => {
settings: {},
},
});
const newState = model(unchangedMappingsState, res) as PrepareCompatibleMigration;
const newState = model(waitForYellowSourceState, res) as CleanupUnknownAndExcluded;
expect(newState.controlState).toEqual('PREPARE_COMPATIBLE_MIGRATION');
expect(newState.targetIndexRawMappings).toEqual({
_meta: {
migrationMappingPropertyHashes: {
new_saved_object_type: '4a11183eee21e6fbad864f7a30b39ad0',
},
},
properties: {
new_saved_object_type: {
properties: {
value: {
type: 'text',
},
},
},
},
});
expect(newState.versionAlias).toEqual('.kibana_7.12.0');
expect(newState.currentAlias).toEqual('.kibana');
// will point to
expect(newState.targetIndex).toEqual('.kibana_7.11.0_001');
expect(newState.preTransformDocsActions).toEqual([
{
add: {
alias: '.kibana_7.12.0',
index: '.kibana_7.11.0_001',
},
},
{
remove: {
alias: '.kibana_7.11.0',
index: '.kibana_7.11.0_001',
must_exist: true,
},
},
]);
expect(newState.controlState).toEqual('CLEANUP_UNKNOWN_AND_EXCLUDED');
expect(newState.targetIndex).toEqual(baseState.versionIndex);
expect(newState.versionIndexReadyActions).toEqual(Option.none);
});
});
@ -1312,13 +1293,8 @@ describe('migrations v2 model', () => {
},
};
const changedMappingsState: State = {
const changedMappingsState: WaitForYellowSourceState = {
...waitForYellowSourceState,
controlState: 'WAIT_FOR_YELLOW_SOURCE',
kibanaVersion: '7.12.0', // new version!
currentAlias: '.kibana',
versionAlias: '.kibana_7.12.0',
versionIndex: '.kibana_7.11.0_001',
sourceIndexMappings: actualMappings,
};
@ -1354,6 +1330,178 @@ describe('migrations v2 model', () => {
});
});
describe('CLEANUP_UNKNOWN_AND_EXCLUDED', () => {
const cleanupUnknownAndExcluded: CleanupUnknownAndExcluded = {
...baseState,
controlState: 'CLEANUP_UNKNOWN_AND_EXCLUDED',
sourceIndex: Option.some('.kibana_7.11.0_001') as Option.Some<string>,
sourceIndexMappings: baseState.targetIndexMappings,
targetIndex: baseState.versionIndex,
kibanaVersion: '7.12.0', // new version!
currentAlias: '.kibana',
versionAlias: '.kibana_7.12.0',
aliases: {
'.kibana': '.kibana_7.11.0_001',
'.kibana_7.11.0': '.kibana_7.11.0_001',
},
versionIndexReadyActions: Option.none,
};
describe('if action succeeds', () => {
test('CLEANUP_UNKNOWN_AND_EXCLUDED -> CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK', () => {
const res: ResponseType<'CLEANUP_UNKNOWN_AND_EXCLUDED'> = Either.right({
type: 'cleanup_started' as const,
taskId: '1234',
unknownDocs: [],
errorsByType: {},
});
const newState = model(cleanupUnknownAndExcluded, res) as PrepareCompatibleMigration;
expect(newState.controlState).toEqual('CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK');
// expect(newState.targetIndexRawMappings).toEqual(indexMapping);
// expect(newState.targetIndexMappings).toEqual(indexMapping);
// expect(newState.targetIndex).toEqual('.kibana_7.11.0_001');
// expect(newState.preTransformDocsActions).toEqual([
// {
// add: {
// alias: '.kibana_7.12.0',
// index: '.kibana_7.11.0_001',
// },
// },
// {
// remove: {
// alias: '.kibana_7.11.0',
// index: '.kibana_7.11.0_001',
// must_exist: true,
// },
// },
// ]);
});
});
test('CLEANUP_UNKNOWN_AND_EXCLUDED -> FATAL if discardUnknownObjects=false', () => {
const res: ResponseType<'CLEANUP_UNKNOWN_AND_EXCLUDED'> = Either.left({
type: 'unknown_docs_found' as const,
unknownDocs: [
{ id: 'dashboard:12', type: 'dashboard' },
{ id: 'foo:17', type: 'foo' },
],
});
const newState = model(cleanupUnknownAndExcluded, res);
expect(newState).toMatchObject({
controlState: 'FATAL',
reason: expect.stringContaining(
'Migration failed because some documents were found which use unknown saved object types'
),
});
});
});
describe('CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK', () => {
const cleanupUnknownAndExcludedWaitForTask: CleanupUnknownAndExcludedWaitForTaskState = {
...baseState,
controlState: 'CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK',
deleteByQueryTaskId: '1234',
sourceIndex: Option.some('.kibana_7.11.0_001') as Option.Some<string>,
sourceIndexMappings: baseState.targetIndexMappings,
targetIndex: baseState.versionIndex,
kibanaVersion: '7.12.0', // new version!
currentAlias: '.kibana',
versionAlias: '.kibana_7.12.0',
aliases: {
'.kibana': '.kibana_7.11.0_001',
'.kibana_7.11.0': '.kibana_7.11.0_001',
},
versionIndexReadyActions: Option.none,
};
test('CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK -> CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK when response is left wait_for_task_completion_timeout', () => {
const res: ResponseType<'UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK'> = Either.left({
message: '[timeout_exception] Timeout waiting for ...',
type: 'wait_for_task_completion_timeout',
});
const newState = model(cleanupUnknownAndExcludedWaitForTask, res);
expect(newState.controlState).toEqual('CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK');
expect(newState.retryCount).toEqual(1);
expect(newState.retryDelay).toEqual(2000);
});
test('CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK -> PREPARE_COMPATIBLE_MIGRATION if action succeeds', () => {
const res: ResponseType<'CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK'> = Either.right({
type: 'cleanup_successful' as const,
});
const newState = model(
cleanupUnknownAndExcludedWaitForTask,
res
) as PrepareCompatibleMigration;
expect(newState.controlState).toEqual('PREPARE_COMPATIBLE_MIGRATION');
expect(newState.targetIndexRawMappings).toEqual(indexMapping);
expect(newState.targetIndexMappings).toEqual(indexMapping);
expect(newState.targetIndex).toEqual('.kibana_7.11.0_001');
expect(newState.preTransformDocsActions).toEqual([
{
add: {
alias: '.kibana_7.12.0',
index: '.kibana_7.11.0_001',
},
},
{
remove: {
alias: '.kibana_7.11.0',
index: '.kibana_7.11.0_001',
must_exist: true,
},
},
]);
});
test('CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK -> CLEANUP_UNKNOWN_AND_EXCLUDED if the deleteQuery fails and we have some attempts left', () => {
const res: ResponseType<'CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK'> = Either.left({
type: 'cleanup_failed' as const,
failures: ['Failed to delete dashboard:12345', 'Failed to delete dashboard:67890'],
versionConflicts: 12,
});
const newState = model(cleanupUnknownAndExcludedWaitForTask, res);
expect(newState).toMatchObject({
controlState: 'CLEANUP_UNKNOWN_AND_EXCLUDED',
logs: [
{
level: 'warning',
message:
'Errors occurred whilst deleting unwanted documents. Another instance is probably updating or deleting documents in the same index. Retrying attempt 1.',
},
],
});
});
test('CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK -> FAIL if the deleteQuery fails after N retries', () => {
const res: ResponseType<'CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK'> = Either.left({
type: 'cleanup_failed' as const,
failures: ['Failed to delete dashboard:12345', 'Failed to delete dashboard:67890'],
});
const newState = model(
{
...cleanupUnknownAndExcludedWaitForTask,
retryCount: cleanupUnknownAndExcludedWaitForTask.retryAttempts,
},
res
);
expect(newState).toMatchObject({
controlState: 'FATAL',
reason: expect.stringContaining(
'Migration failed because it was unable to delete unwanted documents from the .kibana_7.11.0_001 system index'
),
});
});
});
describe('CHECK_UNKNOWN_DOCUMENTS', () => {
const mappingsWithUnknownType = {
properties: {
@ -1536,7 +1684,7 @@ describe('migrations v2 model', () => {
});
it('CALCULATE_EXCLUDE_FILTERS -> CREATE_REINDEX_TEMP if action succeeds with filters', () => {
const res: ResponseType<'CALCULATE_EXCLUDE_FILTERS'> = Either.right({
mustNotClauses: [{ term: { fieldA: 'abc' } }],
filterClauses: [{ term: { fieldA: 'abc' } }],
errorsByType: { type1: new Error('an error!') },
});
const newState = model(state, res);
@ -1784,12 +1932,6 @@ describe('migrations v2 model', () => {
transformErrors: [],
progress: { processed: undefined, total: 1 },
};
const processedDocs = [
{
_id: 'a:b',
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
},
] as SavedObjectsRawDoc[];
it('REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK if action succeeded', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_TRANSFORM'> = Either.right({
@ -1798,7 +1940,7 @@ describe('migrations v2 model', () => {
const newState = model(state, res) as ReindexSourceToTempIndexBulk;
expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_INDEX_BULK');
expect(newState.currentBatch).toEqual(0);
expect(newState.transformedDocBatches).toEqual([processedDocs]);
expect(newState.bulkOperationBatches).toEqual(bulkOperationBatches);
expect(newState.progress.processed).toBe(0); // Result of `(undefined ?? 0) + corruptDocumentsId.length`
});
@ -1854,18 +1996,10 @@ describe('migrations v2 model', () => {
});
describe('REINDEX_SOURCE_TO_TEMP_INDEX_BULK', () => {
const transformedDocBatches = [
[
{
_id: 'a:b',
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
},
],
] as [SavedObjectsRawDoc[]];
const reindexSourceToTempIndexBulkState: ReindexSourceToTempIndexBulk = {
...baseState,
controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK',
transformedDocBatches,
bulkOperationBatches,
currentBatch: 0,
versionIndexReadyActions: Option.none,
sourceIndex: Option.some('.kibana') as Option.Some<string>,
@ -2024,6 +2158,18 @@ describe('migrations v2 model', () => {
preTransformDocsActions: [someAliasAction],
};
it('PREPARE_COMPATIBLE_MIGRATIONS -> REFRESH_TARGET if action succeeds and we must refresh the index', () => {
const res: ResponseType<'PREPARE_COMPATIBLE_MIGRATION'> = Either.right(
'update_aliases_succeeded'
);
const newState = model(
{ ...state, mustRefresh: true },
res
) as OutdatedDocumentsSearchOpenPit;
expect(newState.controlState).toEqual('REFRESH_TARGET');
expect(newState.versionIndexReadyActions).toEqual(Option.none);
});
it('PREPARE_COMPATIBLE_MIGRATIONS -> OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT if action succeeds', () => {
const res: ResponseType<'PREPARE_COMPATIBLE_MIGRATION'> = Either.right(
'update_aliases_succeeded'
@ -2033,6 +2179,19 @@ describe('migrations v2 model', () => {
expect(newState.versionIndexReadyActions).toEqual(Option.none);
});
it('PREPARE_COMPATIBLE_MIGRATIONS -> REFRESH_TARGET if action fails because the alias is not found', () => {
const res: ResponseType<'PREPARE_COMPATIBLE_MIGRATION'> = Either.left({
type: 'alias_not_found_exception',
});
const newState = model(
{ ...state, mustRefresh: true },
res
) as OutdatedDocumentsSearchOpenPit;
expect(newState.controlState).toEqual('REFRESH_TARGET');
expect(newState.versionIndexReadyActions).toEqual(Option.none);
});
it('PREPARE_COMPATIBLE_MIGRATIONS -> OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT if action fails because the alias is not found', () => {
const res: ResponseType<'PREPARE_COMPATIBLE_MIGRATION'> = Either.left({
type: 'alias_not_found_exception',
@ -2268,12 +2427,6 @@ describe('migrations v2 model', () => {
progress: createInitialProgress(),
};
describe('OUTDATED_DOCUMENTS_TRANSFORM if action succeeds', () => {
const processedDocs = [
{
_id: 'a:b',
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
},
] as SavedObjectsRawDoc[];
test('OUTDATED_DOCUMENTS_TRANSFORM -> TRANSFORMED_DOCUMENTS_BULK_INDEX if action succeeds', () => {
const res: ResponseType<'OUTDATED_DOCUMENTS_TRANSFORM'> = Either.right({ processedDocs });
const newState = model(
@ -2281,7 +2434,7 @@ describe('migrations v2 model', () => {
res
) as TransformedDocumentsBulkIndex;
expect(newState.controlState).toEqual('TRANSFORMED_DOCUMENTS_BULK_INDEX');
expect(newState.transformedDocBatches).toEqual([processedDocs]);
expect(newState.bulkOperationBatches).toEqual(bulkOperationBatches);
expect(newState.currentBatch).toEqual(0);
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
@ -2367,30 +2520,28 @@ describe('migrations v2 model', () => {
});
describe('TRANSFORMED_DOCUMENTS_BULK_INDEX', () => {
const transformedDocBatches = [
[
// batch 0
{
_id: 'a:b',
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
const idToIndexOperation = (_id: string): BulkIndexOperationTuple => [
// "index" operations have a first part with the operation and the SO id
{
index: {
_id,
},
{
_id: 'a:c',
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
},
],
[
// batch 1
{
_id: 'a:d',
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
},
],
] as SavedObjectsRawDoc[][];
},
// and a second part with the object _source
{ type: 'a', a: { name: `HOI ${_id}!` }, migrationVersion: {}, references: [] },
// these two parts are then serialized to NDJSON by esClient and sent over with POST _bulk
];
const customBulkOperationBatches: BulkOperation[][] = [
// batch 0
['a:b', 'a:c'].map(idToIndexOperation),
// batch 1
['a:d'].map(idToIndexOperation),
];
const transformedDocumentsBulkIndexState: TransformedDocumentsBulkIndex = {
...baseState,
controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX',
transformedDocBatches,
bulkOperationBatches: customBulkOperationBatches,
currentBatch: 0,
versionIndexReadyActions: Option.none,
sourceIndex: Option.some('.kibana') as Option.Some<string>,

View file

@ -9,7 +9,8 @@
import * as Either from 'fp-ts/lib/Either';
import * as Option from 'fp-ts/lib/Option';
import { type AliasAction, isTypeof } from '../actions';
import { isTypeof } from '../actions';
import type { AliasAction } from '../actions';
import type { AllActionStates, State } from '../state';
import type { ResponseType } from '../next';
import {
@ -278,30 +279,6 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
],
};
}
} else if (stateP.controlState === 'PREPARE_COMPATIBLE_MIGRATION') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
return {
...stateP,
controlState: 'OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT',
};
} else if (Either.isLeft(res)) {
// Note: if multiple newer Kibana versions are competing with each other to perform a migration,
// it might happen that another Kibana instance has deleted this instance's version index.
// NIT to handle this in properly, we'd have to add a PREPARE_COMPATIBLE_MIGRATION_CONFLICT step,
// similar to MARK_VERSION_INDEX_READY_CONFLICT.
if (isTypeof(res.left, 'alias_not_found_exception')) {
// We assume that the alias was already deleted by another Kibana instance
return {
...stateP,
controlState: 'OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT',
};
} else {
throwBadResponse(stateP, res.left as never);
}
} else {
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'LEGACY_SET_WRITE_BLOCK') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
// If the write block is successfully in place
@ -457,35 +434,16 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
stateP.sourceIndexMappings,
/* expected */
stateP.targetIndexMappings
)
) &&
Math.random() < 10
) {
// The source index .kibana is pointing to. E.g: ".xx8.7.0_001"
const source = stateP.sourceIndex.value;
return {
...stateP,
controlState: 'PREPARE_COMPATIBLE_MIGRATION',
sourceIndex: Option.none,
targetIndex: source!,
targetIndexRawMappings: stateP.sourceIndexMappings,
targetIndexMappings: mergeMigrationMappingPropertyHashes(
stateP.targetIndexMappings,
stateP.sourceIndexMappings
),
preTransformDocsActions: [
// Point the version alias to the source index. This let's other Kibana
// instances know that a migration for the current version is "done"
// even though we may be waiting for document transformations to finish.
{ add: { index: source!, alias: stateP.versionAlias } },
...buildRemoveAliasActions(source!, Object.keys(stateP.aliases), [
stateP.currentAlias,
stateP.versionAlias,
]),
],
controlState: 'CLEANUP_UNKNOWN_AND_EXCLUDED',
targetIndex: stateP.sourceIndex.value!, // We preserve the same index, source == target (E.g: ".xx8.7.0_001")
versionIndexReadyActions: Option.none,
};
} else {
// the mappings have changed, but changes might still be compatible
return {
...stateP,
controlState: 'CHECK_UNKNOWN_DOCUMENTS',
@ -507,6 +465,133 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
} else {
return throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'CLEANUP_UNKNOWN_AND_EXCLUDED') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
if (res.right.unknownDocs.length) {
logs = [
...stateP.logs,
{ level: 'warning', message: extractDiscardedUnknownDocs(res.right.unknownDocs) },
];
}
logs = [
...logs,
...Object.entries(res.right.errorsByType).map(([soType, error]) => ({
level: 'warning' as const,
message: `Ignored excludeOnUpgrade hook on type [${soType}] that failed with error: "${error.toString()}"`,
})),
];
return {
...stateP,
logs,
controlState: 'CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK',
deleteByQueryTaskId: res.right.taskId,
};
} else {
return {
...stateP,
controlState: 'FATAL',
reason: extractUnknownDocFailureReason(
stateP.migrationDocLinks.resolveMigrationFailures,
res.left.unknownDocs
),
};
}
} else if (stateP.controlState === 'CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
const source = stateP.sourceIndex.value;
return {
...stateP,
logs,
controlState: 'PREPARE_COMPATIBLE_MIGRATION',
mustRefresh:
stateP.mustRefresh || typeof res.right.deleted === 'undefined' || res.right.deleted > 0,
targetIndexRawMappings: stateP.sourceIndexMappings,
targetIndexMappings: mergeMigrationMappingPropertyHashes(
stateP.targetIndexMappings,
stateP.sourceIndexMappings
),
preTransformDocsActions: [
// Point the version alias to the source index. This let's other Kibana
// instances know that a migration for the current version is "done"
// even though we may be waiting for document transformations to finish.
{ add: { index: source!, alias: stateP.versionAlias } },
...buildRemoveAliasActions(source!, Object.keys(stateP.aliases), [
stateP.currentAlias,
stateP.versionAlias,
]),
],
};
} else {
if (isTypeof(res.left, 'wait_for_task_completion_timeout')) {
// After waiting for the specified timeout, the task has not yet
// completed. Retry this step to see if the task has completed after an
// exponential delay. We will basically keep polling forever until the
// Elasticsearch task succeeds or fails.
return delayRetryState(stateP, res.left.message, Number.MAX_SAFE_INTEGER);
} else {
if (stateP.retryCount < stateP.retryAttempts) {
const retryCount = stateP.retryCount + 1;
const retryDelay = 1500 + 1000 * Math.random();
return {
...stateP,
controlState: 'CLEANUP_UNKNOWN_AND_EXCLUDED',
mustRefresh: true,
retryCount,
retryDelay,
logs: [
...stateP.logs,
{
level: 'warning',
message: `Errors occurred whilst deleting unwanted documents. Another instance is probably updating or deleting documents in the same index. Retrying attempt ${retryCount}.`,
},
],
};
} else {
const failures = res.left.failures.length;
const versionConflicts = res.left.versionConflicts ?? 0;
let reason = `Migration failed because it was unable to delete unwanted documents from the ${stateP.sourceIndex.value} system index (${failures} failures and ${versionConflicts} conflicts)`;
if (failures) {
reason += `:\n` + res.left.failures.map((failure: string) => `- ${failure}\n`).join('');
}
return {
...stateP,
controlState: 'FATAL',
reason,
};
}
}
}
} else if (stateP.controlState === 'PREPARE_COMPATIBLE_MIGRATION') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
return {
...stateP,
controlState: stateP.mustRefresh ? 'REFRESH_TARGET' : 'OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT',
};
} else if (Either.isLeft(res)) {
// Note: if multiple newer Kibana versions are competing with each other to perform a migration,
// it might happen that another Kibana instance has deleted this instance's version index.
// NIT to handle this in properly, we'd have to add a PREPARE_COMPATIBLE_MIGRATION_CONFLICT step,
// similar to MARK_VERSION_INDEX_READY_CONFLICT.
if (isTypeof(res.left, 'alias_not_found_exception')) {
// We assume that the alias was already deleted by another Kibana instance
return {
...stateP,
controlState: stateP.mustRefresh
? 'REFRESH_TARGET'
: 'OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT',
};
} else {
throwBadResponse(stateP, res.left as never);
}
} else {
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'CHECK_UNKNOWN_DOCUMENTS') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
@ -579,7 +664,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
if (Either.isRight(res)) {
excludeOnUpgradeQuery = addMustNotClausesToBoolQuery(
res.right.mustNotClauses,
res.right.filterClauses,
stateP.excludeOnUpgradeQuery?.bool
);
@ -733,10 +818,8 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
(stateP.corruptDocumentIds.length === 0 && stateP.transformErrors.length === 0) ||
stateP.discardCorruptObjects
) {
const processedDocs = Either.isRight(res)
? res.right.processedDocs
: res.left.processedDocs;
const batches = createBatches(processedDocs, stateP.tempIndex, stateP.maxBatchSizeBytes);
const documents = Either.isRight(res) ? res.right.processedDocs : res.left.processedDocs;
const batches = createBatches({ documents, maxBatchSizeBytes: stateP.maxBatchSizeBytes });
if (Either.isRight(batches)) {
let corruptDocumentIds = stateP.corruptDocumentIds;
let transformErrors = stateP.transformErrors;
@ -751,7 +834,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
corruptDocumentIds,
transformErrors,
controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK', // handles the actual bulk indexing into temp index
transformedDocBatches: batches.right,
bulkOperationBatches: batches.right,
currentBatch: 0,
progress,
};
@ -760,7 +843,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
...stateP,
controlState: 'FATAL',
reason: fatalReasonDocumentExceedsMaxBatchSizeBytes({
_id: batches.left.document._id,
_id: batches.left.documentId,
docSizeBytes: batches.left.docSizeBytes,
maxBatchSizeBytes: batches.left.maxBatchSizeBytes,
}),
@ -796,7 +879,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
if (stateP.currentBatch + 1 < stateP.transformedDocBatches.length) {
if (stateP.currentBatch + 1 < stateP.bulkOperationBatches.length) {
return {
...stateP,
controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK',
@ -938,25 +1021,40 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
} else {
// we don't have any more outdated documents and need to either fail or move on to updating the target mappings.
if (stateP.corruptDocumentIds.length > 0 || stateP.transformErrors.length > 0) {
const transformFailureReason = extractTransformFailuresReason(
stateP.migrationDocLinks.resolveMigrationFailures,
stateP.corruptDocumentIds,
stateP.transformErrors
);
return {
...stateP,
controlState: 'FATAL',
reason: transformFailureReason,
};
} else {
// If there are no more results we have transformed all outdated
// documents and we didn't encounter any corrupt documents or transformation errors
// and can proceed to the next step
return {
...stateP,
controlState: 'OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT',
};
if (!stateP.discardCorruptObjects) {
const transformFailureReason = extractTransformFailuresReason(
stateP.migrationDocLinks.resolveMigrationFailures,
stateP.corruptDocumentIds,
stateP.transformErrors
);
return {
...stateP,
controlState: 'FATAL',
reason: transformFailureReason,
};
}
// at this point, users have configured kibana to discard corrupt objects
// thus, we can ignore corrupt documents and transform errors and proceed with the migration
logs = [
...stateP.logs,
{
level: 'warning',
message: extractDiscardedCorruptDocs(
stateP.corruptDocumentIds,
stateP.transformErrors
),
},
];
}
// If there are no more results we have transformed all outdated
// documents and we didn't encounter any corrupt documents or transformation errors
// and can proceed to the next step
return {
...stateP,
controlState: 'OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT',
};
}
} else {
throwBadResponse(stateP, res);
@ -968,20 +1066,36 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
// Otherwise the progress might look off when there are errors.
const progress = incrementProcessedProgress(stateP.progress, stateP.outdatedDocuments.length);
if (Either.isRight(res)) {
// we haven't seen corrupt documents or any transformation errors thus far in the migration
// index the migrated docs
if (stateP.corruptDocumentIds.length === 0 && stateP.transformErrors.length === 0) {
const batches = createBatches(
res.right.processedDocs,
stateP.targetIndex,
stateP.maxBatchSizeBytes
);
if (
Either.isRight(res) ||
(isTypeof(res.left, 'documents_transform_failed') && stateP.discardCorruptObjects)
) {
// we might have some transformation errors, but user has chosen to discard them
if (
(stateP.corruptDocumentIds.length === 0 && stateP.transformErrors.length === 0) ||
stateP.discardCorruptObjects
) {
const documents = Either.isRight(res) ? res.right.processedDocs : res.left.processedDocs;
let corruptDocumentIds = stateP.corruptDocumentIds;
let transformErrors = stateP.transformErrors;
if (Either.isLeft(res)) {
corruptDocumentIds = [...stateP.corruptDocumentIds, ...res.left.corruptDocumentIds];
transformErrors = [...stateP.transformErrors, ...res.left.transformErrors];
}
const batches = createBatches({
documents,
corruptDocumentIds,
transformErrors,
maxBatchSizeBytes: stateP.maxBatchSizeBytes,
});
if (Either.isRight(batches)) {
return {
...stateP,
controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX',
transformedDocBatches: batches.right,
bulkOperationBatches: batches.right,
currentBatch: 0,
hasTransformedDocs: true,
progress,
@ -991,7 +1105,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
...stateP,
controlState: 'FATAL',
reason: fatalReasonDocumentExceedsMaxBatchSizeBytes({
_id: batches.left.document._id,
_id: batches.left.documentId,
docSizeBytes: batches.left.docSizeBytes,
maxBatchSizeBytes: batches.left.maxBatchSizeBytes,
}),
@ -1024,7 +1138,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
} else if (stateP.controlState === 'TRANSFORMED_DOCUMENTS_BULK_INDEX') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
if (stateP.currentBatch + 1 < stateP.transformedDocBatches.length) {
if (stateP.currentBatch + 1 < stateP.bulkOperationBatches.length) {
return {
...stateP,
controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX',

View file

@ -43,9 +43,12 @@ import type {
WaitForMigrationCompletionState,
CheckTargetMappingsState,
PrepareCompatibleMigration,
CleanupUnknownAndExcluded,
CleanupUnknownAndExcludedWaitForTaskState,
} from './state';
import type { TransformRawDocs } from './types';
import * as Actions from './actions';
import { REMOVED_TYPES } from './core';
type ActionMap = ReturnType<typeof nextActionMap>;
@ -63,12 +66,30 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra
return {
INIT: (state: InitState) =>
Actions.initAction({ client, indices: [state.currentAlias, state.versionAlias] }),
PREPARE_COMPATIBLE_MIGRATION: (state: PrepareCompatibleMigration) =>
Actions.updateAliases({ client, aliasActions: state.preTransformDocsActions }),
WAIT_FOR_MIGRATION_COMPLETION: (state: WaitForMigrationCompletionState) =>
Actions.fetchIndices({ client, indices: [state.currentAlias, state.versionAlias] }),
WAIT_FOR_YELLOW_SOURCE: (state: WaitForYellowSourceState) =>
Actions.waitForIndexStatus({ client, index: state.sourceIndex.value, status: 'yellow' }),
CLEANUP_UNKNOWN_AND_EXCLUDED: (state: CleanupUnknownAndExcluded) =>
Actions.cleanupUnknownAndExcluded({
client,
indexName: state.sourceIndex.value,
discardUnknownDocs: state.discardUnknownObjects,
excludeOnUpgradeQuery: state.excludeOnUpgradeQuery,
excludeFromUpgradeFilterHooks: state.excludeFromUpgradeFilterHooks,
knownTypes: state.knownTypes,
removedTypes: REMOVED_TYPES,
}),
CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK: (
state: CleanupUnknownAndExcludedWaitForTaskState
) =>
Actions.waitForDeleteByQueryTask({
client,
taskId: state.deleteByQueryTaskId,
timeout: '120s',
}),
PREPARE_COMPATIBLE_MIGRATION: (state: PrepareCompatibleMigration) =>
Actions.updateAliases({ client, aliasActions: state.preTransformDocsActions }),
CHECK_UNKNOWN_DOCUMENTS: (state: CheckUnknownDocumentsState) =>
Actions.checkForUnknownDocs({
client,
@ -117,7 +138,7 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra
Actions.bulkOverwriteTransformedDocuments({
client,
index: state.tempIndex,
transformedDocs: state.transformedDocBatches[state.currentBatch],
operations: state.bulkOperationBatches[state.currentBatch],
/**
* Since we don't run a search against the target index, we disable "refresh" to speed up
* the migration process.
@ -178,14 +199,14 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra
Actions.bulkOverwriteTransformedDocuments({
client,
index: state.targetIndex,
transformedDocs: state.transformedDocBatches[state.currentBatch],
operations: state.bulkOperationBatches[state.currentBatch],
/**
* Since we don't run a search against the target index, we disable "refresh" to speed up
* the migration process.
* Although any further step must run "refresh" for the target index
* before we reach out to the MARK_VERSION_INDEX_READY step.
* Right now, it's performed during OUTDATED_DOCUMENTS_REFRESH step.
*/
refresh: false,
}),
MARK_VERSION_INDEX_READY: (state: MarkVersionIndexReady) =>
Actions.updateAliases({ client, aliasActions: state.versionIndexReadyActions.value }),

View file

@ -18,6 +18,7 @@ import type { ControlState } from './state_action_machine';
import type { AliasAction } from './actions';
import type { TransformErrorObjects } from './core';
import type { MigrationLog, Progress } from './types';
import type { BulkOperation } from './model/create_batches';
export interface BaseState extends ControlState {
/** The first part of the index name such as `.kibana` or `.kibana_task_manager` */
@ -180,14 +181,37 @@ export interface PostInitState extends BaseState {
*/
readonly targetIndexRawMappings?: IndexMapping;
readonly versionIndexReadyActions: Option.Option<AliasAction[]>;
readonly outdatedDocumentsQuery: QueryDslQueryContainer;
}
export interface SourceExistsState {
readonly sourceIndex: Option.Some<string>;
}
export type BaseWithSource = BaseState & SourceExistsState;
export type PostInitWithSource = PostInitState & SourceExistsState;
export interface DoneState extends PostInitState {
/** Migration completed successfully */
readonly controlState: 'DONE';
}
export interface CleanupUnknownAndExcluded extends PostInitWithSource {
/** Clean the source index, removing SOs with unknown and excluded types */
readonly controlState: 'CLEANUP_UNKNOWN_AND_EXCLUDED';
readonly sourceIndexMappings: IndexMapping;
readonly aliases: Record<string, string | undefined>;
/** The cleanup operation has deleted one or more documents, we gotta refresh the index */
readonly mustRefresh?: boolean;
}
export interface CleanupUnknownAndExcludedWaitForTaskState extends PostInitWithSource {
readonly controlState: 'CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK';
readonly deleteByQueryTaskId: string;
readonly sourceIndexMappings: IndexMapping;
readonly aliases: Record<string, string | undefined>;
/** The cleanup operation has deleted one or more documents, we gotta refresh the index */
readonly mustRefresh?: boolean;
}
/**
* Compatibe migrations do not require migrating to a new index because all
* schema changes are compatible with current index mappings.
@ -196,11 +220,13 @@ export interface DoneState extends PostInitState {
* need to make sure that no older Kibana versions are still writing to target
* index.
*/
export interface PrepareCompatibleMigration extends PostInitState {
export interface PrepareCompatibleMigration extends PostInitWithSource {
/** We have found a schema-compatible migration, this means we can optimise our migration steps */
readonly controlState: 'PREPARE_COMPATIBLE_MIGRATION';
/** Alias-level actions that prepare for this migration */
readonly preTransformDocsActions: AliasAction[];
/** Indicates whether we must refresh the index */
readonly mustRefresh?: boolean;
}
export interface FatalState extends BaseState {
@ -210,30 +236,26 @@ export interface FatalState extends BaseState {
readonly reason: string;
}
export interface WaitForYellowSourceState extends BaseState {
export interface WaitForYellowSourceState extends BaseWithSource {
/** Wait for the source index to be yellow before reading from it. */
readonly controlState: 'WAIT_FOR_YELLOW_SOURCE';
readonly sourceIndex: Option.Some<string>;
readonly sourceIndexMappings: IndexMapping;
readonly aliases: Record<string, string | undefined>;
}
export interface CheckUnknownDocumentsState extends BaseState {
export interface CheckUnknownDocumentsState extends BaseWithSource {
/** Check if any unknown document is present in the source index */
readonly controlState: 'CHECK_UNKNOWN_DOCUMENTS';
readonly sourceIndex: Option.Some<string>;
readonly sourceIndexMappings: IndexMapping;
}
export interface SetSourceWriteBlockState extends PostInitState {
export interface SetSourceWriteBlockState extends PostInitWithSource {
/** Set a write block on the source index to prevent any further writes */
readonly controlState: 'SET_SOURCE_WRITE_BLOCK';
readonly sourceIndex: Option.Some<string>;
}
export interface CalculateExcludeFiltersState extends PostInitState {
export interface CalculateExcludeFiltersState extends PostInitWithSource {
readonly controlState: 'CALCULATE_EXCLUDE_FILTERS';
readonly sourceIndex: Option.Some<string>;
}
export interface CreateNewTargetState extends PostInitState {
@ -243,19 +265,17 @@ export interface CreateNewTargetState extends PostInitState {
readonly versionIndexReadyActions: Option.Some<AliasAction[]>;
}
export interface CreateReindexTempState extends PostInitState {
export interface CreateReindexTempState extends PostInitWithSource {
/**
* Create a target index with mappings from the source index and registered
* plugins
*/
readonly controlState: 'CREATE_REINDEX_TEMP';
readonly sourceIndex: Option.Some<string>;
}
export interface ReindexSourceToTempOpenPit extends PostInitState {
export interface ReindexSourceToTempOpenPit extends PostInitWithSource {
/** Open PIT to the source index */
readonly controlState: 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT';
readonly sourceIndex: Option.Some<string>;
}
interface ReindexSourceToTempBatch extends PostInitState {
@ -282,24 +302,19 @@ export interface ReindexSourceToTempTransform extends ReindexSourceToTempBatch {
export interface ReindexSourceToTempIndexBulk extends ReindexSourceToTempBatch {
readonly controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK';
readonly transformedDocBatches: [SavedObjectsRawDoc[]];
readonly bulkOperationBatches: BulkOperation[][];
readonly currentBatch: number;
}
export type SetTempWriteBlock = PostInitState & {
/**
*
*/
export interface SetTempWriteBlock extends PostInitWithSource {
readonly controlState: 'SET_TEMP_WRITE_BLOCK';
readonly sourceIndex: Option.Some<string>;
};
}
export interface CloneTempToSource extends PostInitState {
export interface CloneTempToSource extends PostInitWithSource {
/**
* Clone the temporary reindex index into
*/
readonly controlState: 'CLONE_TEMP_TO_TARGET';
readonly sourceIndex: Option.Some<string>;
}
export interface RefreshTarget extends PostInitState {
@ -380,7 +395,7 @@ export interface TransformedDocumentsBulkIndex extends PostInitState {
* Write the up-to-date transformed documents to the target index
*/
readonly controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX';
readonly transformedDocBatches: SavedObjectsRawDoc[][];
readonly bulkOperationBatches: BulkOperation[][];
readonly currentBatch: number;
readonly lastHitSortValue: number[] | undefined;
readonly hasTransformedDocs: boolean;
@ -423,8 +438,7 @@ export interface MarkVersionIndexReadyConflict extends PostInitState {
* If we're migrating from a legacy index we need to perform some additional
* steps to prepare this index so that it can be used as a migration 'source'.
*/
export interface LegacyBaseState extends PostInitState {
readonly sourceIndex: Option.Some<string>;
export interface LegacyBaseState extends PostInitWithSource {
readonly legacyPreMigrationDoneActions: AliasAction[];
/**
* The mappings read from the legacy index, used to create a new reindex
@ -474,6 +488,8 @@ export type State = Readonly<
| FatalState
| InitState
| PrepareCompatibleMigration
| CleanupUnknownAndExcluded
| CleanupUnknownAndExcludedWaitForTaskState
| WaitForMigrationCompletionState
| DoneState
| WaitForYellowSourceState

View file

@ -118,7 +118,7 @@ describe('migration v2', () => {
await root.preboot();
await root.setup();
await expect(root.start()).rejects.toMatchInlineSnapshot(
`[Error: Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715329 bytes which exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.]`
`[Error: Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715272 bytes which exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.]`
);
await retryAsync(
@ -131,7 +131,7 @@ describe('migration v2', () => {
expect(
records.find((rec) =>
rec.message.startsWith(
`Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715329 bytes which exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.`
`Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715272 bytes which exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.`
)
)
).toBeDefined();

View file

@ -126,7 +126,7 @@ describe('checking migration metadata changes on all registered SO types', () =>
"rules-settings": "9854495c3b54b16a6625fb250c35e5504da72266",
"sample-data-telemetry": "c38daf1a49ed24f2a4fb091e6e1e833fccf19935",
"search": "01bc42d635e9ea0588741c4c7a2bbd3feb3ac5dc",
"search-session": "5f40f6101fc2ec8ce5210d735ea2e00a87c02886",
"search-session": "58a44d14ec991739166b2ec28d718001ab0f4b28",
"search-telemetry": "ab67ef721f294f28d5e10febbd20653347720188",
"security-rule": "1ff82dfb2298c3caf6888fc3ef15c6bf7a628877",
"security-solution-signals-migration": "c2db409c1857d330beb3d6fd188fa186f920302c",

View file

@ -43,6 +43,7 @@ import {
type DocumentsTransformFailed,
type DocumentsTransformSuccess,
MIGRATION_CLIENT_OPTIONS,
createBulkIndexOperationTuple,
} from '@kbn/core-saved-objects-migration-server-internal';
const { startES } = createTestServers({
@ -78,7 +79,7 @@ describe('migration actions', () => {
},
},
})();
const sourceDocs = [
const docs = [
{ _source: { title: 'doc 1' } },
{ _source: { title: 'doc 2' } },
{ _source: { title: 'doc 3' } },
@ -88,7 +89,7 @@ describe('migration actions', () => {
await bulkOverwriteTransformedDocuments({
client,
index: 'existing_index_with_docs',
transformedDocs: sourceDocs,
operations: docs.map(createBulkIndexOperationTuple),
refresh: 'wait_for',
})();
@ -101,7 +102,7 @@ describe('migration actions', () => {
await bulkOverwriteTransformedDocuments({
client,
index: 'existing_index_with_write_block',
transformedDocs: sourceDocs,
operations: docs.map(createBulkIndexOperationTuple),
refresh: 'wait_for',
})();
await setWriteBlock({ client, index: 'existing_index_with_write_block' })();
@ -302,7 +303,7 @@ describe('migration actions', () => {
const res = (await bulkOverwriteTransformedDocuments({
client,
index: 'new_index_without_write_block',
transformedDocs: sourceDocs,
operations: sourceDocs.map(createBulkIndexOperationTuple),
refresh: 'wait_for',
})()) as Either.Left<unknown>;
@ -882,7 +883,7 @@ describe('migration actions', () => {
await bulkOverwriteTransformedDocuments({
client,
index: 'reindex_target_4',
transformedDocs: sourceDocs,
operations: sourceDocs.map(createBulkIndexOperationTuple),
refresh: 'wait_for',
})();
@ -1441,7 +1442,7 @@ describe('migration actions', () => {
await bulkOverwriteTransformedDocuments({
client,
index: 'existing_index_without_mappings',
transformedDocs: sourceDocs,
operations: sourceDocs.map(createBulkIndexOperationTuple),
refresh: 'wait_for',
})();
@ -1837,7 +1838,7 @@ describe('migration actions', () => {
const task = bulkOverwriteTransformedDocuments({
client,
index: 'existing_index_with_docs',
transformedDocs: newDocs,
operations: newDocs.map(createBulkIndexOperationTuple),
refresh: 'wait_for',
});
@ -1860,10 +1861,10 @@ describe('migration actions', () => {
const task = bulkOverwriteTransformedDocuments({
client,
index: 'existing_index_with_docs',
transformedDocs: [
operations: [
...existingDocs,
{ _source: { title: 'doc 8' } } as unknown as SavedObjectsRawDoc,
],
].map(createBulkIndexOperationTuple),
refresh: 'wait_for',
});
await expect(task()).resolves.toMatchInlineSnapshot(`
@ -1883,7 +1884,7 @@ describe('migration actions', () => {
bulkOverwriteTransformedDocuments({
client,
index: 'existing_index_with_write_block',
transformedDocs: newDocs,
operations: newDocs.map(createBulkIndexOperationTuple),
refresh: 'wait_for',
})()
).resolves.toMatchInlineSnapshot(`
@ -1906,7 +1907,7 @@ describe('migration actions', () => {
const task = bulkOverwriteTransformedDocuments({
client,
index: 'existing_index_with_docs',
transformedDocs: newDocs,
operations: newDocs.map(createBulkIndexOperationTuple),
});
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {

View file

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server';
import type { SavedObjectsType } from '@kbn/core-saved-objects-server';
const defaultType: SavedObjectsType<any> = {
name: 'defaultType',
hidden: false,
namespaceType: 'agnostic',
mappings: {
properties: {
name: { type: 'keyword' },
},
},
migrations: {},
};
export const baselineTypes: Array<SavedObjectsType<any>> = [
{
...defaultType,
name: 'server',
},
{
...defaultType,
name: 'basic',
},
{
...defaultType,
name: 'deprecated',
},
{
...defaultType,
name: 'complex',
mappings: {
properties: {
name: { type: 'text' },
value: { type: 'integer' },
},
},
excludeOnUpgrade: () => {
return {
bool: {
must: [{ term: { type: 'complex' } }, { range: { 'complex.value': { lte: 1 } } }],
},
};
},
},
];
export const baselineDocuments: SavedObjectsBulkCreateObject[] = [
...['server-foo', 'server-bar', 'server-baz'].map((name) => ({
type: 'server',
attributes: {
name,
},
})),
...['basic-foo', 'basic-bar', 'basic-baz'].map((name) => ({
type: 'basic',
attributes: {
name,
},
})),
...['deprecated-foo', 'deprecated-bar', 'deprecated-baz'].map((name) => ({
type: 'deprecated',
attributes: {
name,
},
})),
...['complex-foo', 'complex-bar', 'complex-baz', 'complex-lipsum'].map((name, index) => ({
type: 'complex',
attributes: {
name,
value: index,
},
})),
];

View file

@ -0,0 +1,335 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import Path from 'path';
import fs from 'fs/promises';
import { SemVer } from 'semver';
import { Env } from '@kbn/config';
import type { AggregationsAggregate, SearchResponse } from '@elastic/elasticsearch/lib/api/types';
import { getEnvOptions } from '@kbn/config-mocks';
import { REPO_ROOT } from '@kbn/repo-info';
import { createTestServers, type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsType } from '@kbn/core-saved-objects-server';
import { getKibanaMigratorTestKit } from '../kibana_migrator_test_kit';
import { baselineDocuments, baselineTypes } from './active_delete.fixtures';
import { delay } from '../test_utils';
const kibanaIndex = '.kibana_migrator_tests';
export const logFilePath = Path.join(__dirname, 'active_delete.test.log');
const currentVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
const nextMinor = new SemVer(currentVersion).inc('minor').format();
describe('when upgrading to a new stack version', () => {
let esServer: TestElasticsearchUtils['es'];
let esClient: ElasticsearchClient;
const startElasticsearch = async () => {
const { startES } = createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
settings: {
es: {
license: 'basic',
},
},
});
return await startES();
};
const createBaseline = async () => {
const { client, runMigrations, savedObjectsRepository } = await getKibanaMigratorTestKit({
kibanaIndex,
types: baselineTypes,
});
await runMigrations();
await savedObjectsRepository.bulkCreate(baselineDocuments, {
refresh: 'wait_for',
});
return client;
};
beforeAll(async () => {
esServer = await startElasticsearch();
});
afterAll(async () => {
await esServer?.stop();
await delay(10);
});
describe('and the mappings match (diffMappings() === false)', () => {
describe('and discardUnknownObjects = true', () => {
let indexContents: SearchResponse<{ type: string }, Record<string, AggregationsAggregate>>;
beforeAll(async () => {
esClient = await createBaseline();
await fs.unlink(logFilePath).catch(() => {});
// remove the 'deprecated' type from the mappings, so that it is considered unknown
const types = baselineTypes.filter((type) => type.name !== 'deprecated');
const { client, runMigrations } = await getKibanaMigratorTestKit({
settings: {
migrations: {
discardUnknownObjects: nextMinor,
},
},
kibanaIndex,
types,
kibanaVersion: nextMinor,
logFilePath,
});
await runMigrations();
indexContents = await client.search({ index: kibanaIndex, size: 100 });
});
afterAll(async () => {
await esClient?.indices.delete({ index: `${kibanaIndex}_${currentVersion}_001` });
});
it('the migrator is skipping reindex operation and executing CLEANUP_UNKNOWN_AND_EXCLUDED step', async () => {
const logs = await fs.readFile(logFilePath, 'utf-8');
expect(logs).toMatch('[.kibana_migrator_tests] INIT -> WAIT_FOR_YELLOW_SOURCE');
expect(logs).toMatch(
'[.kibana_migrator_tests] WAIT_FOR_YELLOW_SOURCE -> CLEANUP_UNKNOWN_AND_EXCLUDED'
);
// we gotta inform that we are deleting unknown documents too (discardUnknownObjects: true)
expect(logs).toMatch(
'[.kibana_migrator_tests] Kibana has been configured to discard unknown documents for this migration.'
);
expect(logs).toMatch(
'Therefore, the following documents with unknown types will not be taken into account and they will not be available after the migration:'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] CLEANUP_UNKNOWN_AND_EXCLUDED -> CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK -> PREPARE_COMPATIBLE_MIGRATION'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] PREPARE_COMPATIBLE_MIGRATION -> REFRESH_TARGET'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] REFRESH_TARGET -> OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] CHECK_TARGET_MAPPINGS -> CHECK_VERSION_INDEX_READY_ACTIONS'
);
expect(logs).toMatch('[.kibana_migrator_tests] CHECK_VERSION_INDEX_READY_ACTIONS -> DONE');
});
describe('CLEANUP_UNKNOWN_AND_EXCLUDED', () => {
it('preserves documents with known types', async () => {
const basicDocumentCount = indexContents.hits.hits.filter(
(result) => result._source?.type === 'basic'
).length;
expect(basicDocumentCount).toEqual(3);
});
it('deletes documents with unknown types', async () => {
const deprecatedDocumentCount = indexContents.hits.hits.filter(
(result) => result._source?.type === 'deprecated'
).length;
expect(deprecatedDocumentCount).toEqual(0);
});
it('deletes documents that belong to REMOVED_TYPES', async () => {
const serverDocumentCount = indexContents.hits.hits.filter(
(result) => result._source?.type === 'server'
).length;
expect(serverDocumentCount).toEqual(0);
});
it("deletes documents that have been excludeOnUpgrade'd via plugin hook", async () => {
const complexDocuments = indexContents.hits.hits.filter(
(result) => result._source?.type === 'complex'
);
expect(complexDocuments.length).toEqual(2);
expect(complexDocuments[0]._source).toEqual(
expect.objectContaining({
complex: {
name: 'complex-baz',
value: 2,
},
type: 'complex',
})
);
expect(complexDocuments[1]._source).toEqual(
expect.objectContaining({
complex: {
name: 'complex-lipsum',
value: 3,
},
type: 'complex',
})
);
});
});
});
describe('and discardUnknownObjects = false', () => {
beforeAll(async () => {
esClient = await createBaseline();
});
afterAll(async () => {
await esClient?.indices.delete({ index: `${kibanaIndex}_${currentVersion}_001` });
});
beforeEach(async () => {
await fs.unlink(logFilePath).catch(() => {});
});
it('fails if unknown documents exist', async () => {
// remove the 'deprecated' type from the mappings, so that SO of this type are considered unknown
const types = baselineTypes.filter((type) => type.name !== 'deprecated');
const { runMigrations } = await getKibanaMigratorTestKit({
kibanaIndex,
types,
kibanaVersion: nextMinor,
logFilePath,
});
try {
await runMigrations();
} catch (err) {
const errorMessage = err.message;
expect(errorMessage).toMatch(
'Unable to complete saved object migrations for the [.kibana_migrator_tests] index: Migration failed because some documents were found which use unknown saved object types:'
);
expect(errorMessage).toMatch(
'To proceed with the migration you can configure Kibana to discard unknown saved objects for this migration.'
);
expect(errorMessage).toMatch(/deprecated:.*\(type: "deprecated"\)/);
}
const logs = await fs.readFile(logFilePath, 'utf-8');
expect(logs).toMatch('[.kibana_migrator_tests] INIT -> WAIT_FOR_YELLOW_SOURCE');
expect(logs).toMatch(
'[.kibana_migrator_tests] WAIT_FOR_YELLOW_SOURCE -> CLEANUP_UNKNOWN_AND_EXCLUDED'
);
expect(logs).toMatch('[.kibana_migrator_tests] CLEANUP_UNKNOWN_AND_EXCLUDED -> FATAL');
});
it('proceeds if there are no unknown documents', async () => {
const { client, runMigrations } = await getKibanaMigratorTestKit({
kibanaIndex,
types: baselineTypes,
kibanaVersion: nextMinor,
logFilePath,
});
await runMigrations();
const logs = await fs.readFile(logFilePath, 'utf-8');
expect(logs).toMatch('[.kibana_migrator_tests] INIT -> WAIT_FOR_YELLOW_SOURCE');
expect(logs).toMatch(
'[.kibana_migrator_tests] WAIT_FOR_YELLOW_SOURCE -> CLEANUP_UNKNOWN_AND_EXCLUDED'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] CLEANUP_UNKNOWN_AND_EXCLUDED -> CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK -> PREPARE_COMPATIBLE_MIGRATION'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] PREPARE_COMPATIBLE_MIGRATION -> REFRESH_TARGET'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] REFRESH_TARGET -> OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] CHECK_TARGET_MAPPINGS -> CHECK_VERSION_INDEX_READY_ACTIONS'
);
expect(logs).toMatch('[.kibana_migrator_tests] CHECK_VERSION_INDEX_READY_ACTIONS -> DONE');
const indexContents = await client.search({ index: kibanaIndex, size: 100 });
expect(indexContents.hits.hits.length).toEqual(8);
});
});
});
describe('and the mappings do NOT match (diffMappings() === true)', () => {
beforeAll(async () => {
esClient = await createBaseline();
});
afterAll(async () => {
await esClient?.indices.delete({ index: `${kibanaIndex}_${currentVersion}_001` });
});
beforeEach(async () => {
await fs.unlink(logFilePath).catch(() => {});
});
it('the migrator does not skip reindexing', async () => {
const incompatibleTypes: Array<SavedObjectsType<any>> = baselineTypes.map((type) => {
if (type.name === 'complex') {
return {
...type,
mappings: {
properties: {
name: { type: 'keyword' }, // text => keyword
value: { type: 'long' }, // integer => long
},
},
};
} else {
return type;
}
});
const { client, runMigrations } = await getKibanaMigratorTestKit({
kibanaIndex,
types: incompatibleTypes,
kibanaVersion: nextMinor,
logFilePath,
});
await runMigrations();
const logs = await fs.readFile(logFilePath, 'utf-8');
expect(logs).toMatch('[.kibana_migrator_tests] INIT -> WAIT_FOR_YELLOW_SOURCE');
expect(logs).toMatch(
'[.kibana_migrator_tests] WAIT_FOR_YELLOW_SOURCE -> CHECK_UNKNOWN_DOCUMENTS.'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] CHECK_UNKNOWN_DOCUMENTS -> SET_SOURCE_WRITE_BLOCK.'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] CHECK_TARGET_MAPPINGS -> UPDATE_TARGET_MAPPINGS.'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] UPDATE_TARGET_MAPPINGS_META -> CHECK_VERSION_INDEX_READY_ACTIONS.'
);
expect(logs).toMatch(
'[.kibana_migrator_tests] CHECK_VERSION_INDEX_READY_ACTIONS -> MARK_VERSION_INDEX_READY.'
);
expect(logs).toMatch('[.kibana_migrator_tests] MARK_VERSION_INDEX_READY -> DONE');
const indexContents: SearchResponse<
{ type: string },
Record<string, AggregationsAggregate>
> = await client.search({ index: kibanaIndex, size: 100 });
expect(indexContents.hits.hits.length).toEqual(8); // we're removing a couple of 'complex' (value < = 1)
// double-check that the deprecated documents have not been deleted
const deprecatedDocumentCount = indexContents.hits.hits.filter(
(result) => result._source?.type === 'deprecated'
).length;
expect(deprecatedDocumentCount).toEqual(3);
});
});
});

View file

@ -0,0 +1,186 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import Path from 'path';
import fs from 'fs/promises';
import { SemVer } from 'semver';
import { Env } from '@kbn/config';
import { getEnvOptions } from '@kbn/config-mocks';
import { REPO_ROOT } from '@kbn/repo-info';
import { type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server';
import {
defaultLogFilePath,
getEsClient,
getKibanaMigratorTestKit,
startElasticsearch,
} from '../kibana_migrator_test_kit';
import { baselineTypes } from './active_delete.fixtures';
import { delay } from '../test_utils';
import { createBaselineArchive } from '../kibana_migrator_archive_utils';
const PARALLEL_MIGRATORS = 6;
const DOCUMENTS_PER_TYPE = 250000;
const kibanaIndex = '.kibana_migrator_tests';
const currentVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
const nextMinor = new SemVer(currentVersion).inc('minor').format();
const dataArchive = Path.join(__dirname, '..', 'archives', '1m_dummy_so.zip');
jest.setTimeout(24 * 3600 * 100);
describe('multiple migrator instances running in parallel', () => {
it.skip('enable and focus this test (it.skip => fit), and run it, in order to create a baseline archive', async () => {
// generate DOCUMENTS_PER_TYPE documents of each type
const documents: SavedObjectsBulkCreateObject[] = ['server', 'basic', 'deprecated', 'complex']
.map((type) =>
new Array(DOCUMENTS_PER_TYPE).fill(true).map((_, index) => ({
type,
attributes: {
name: `${type}-${++index}`,
...(type === 'complex' && { value: index }),
},
}))
)
.flat();
await createBaselineArchive({ kibanaIndex, types: baselineTypes, documents, dataArchive });
});
describe('when upgrading to a new stack version with matching mappings', () => {
let esServer: TestElasticsearchUtils['es'];
let esClient: ElasticsearchClient;
beforeAll(async () => {
esServer = await startElasticsearch({ dataArchive });
esClient = await getEsClient();
await fs.unlink(defaultLogFilePath).catch(() => {});
for (let i = 0; i < PARALLEL_MIGRATORS; ++i) {
await fs.unlink(Path.join(__dirname, `active_delete_instance_${i}.log`)).catch(() => {});
}
});
it('will actively delete and successfully complete migration', async () => {
const startTime = Date.now();
const types = baselineTypes
.filter((type) => type.name !== 'deprecated')
.map((type) => {
if (type.name !== 'complex') {
return type;
}
return {
...type,
excludeOnUpgrade: () => {
return {
bool: {
must: [
{ term: { type: 'complex' } },
{ range: { 'complex.value': { lte: 125000 } } },
],
},
};
},
};
});
const beforeCleanup = await getAggregatedTypesCount();
expect(beforeCleanup.server).toEqual(DOCUMENTS_PER_TYPE);
expect(beforeCleanup.basic).toEqual(DOCUMENTS_PER_TYPE);
expect(beforeCleanup.deprecated).toEqual(DOCUMENTS_PER_TYPE);
expect(beforeCleanup.complex).toEqual(DOCUMENTS_PER_TYPE);
const testKits = await Promise.all(
new Array(PARALLEL_MIGRATORS)
.fill({
settings: {
migrations: {
discardUnknownObjects: nextMinor,
},
},
kibanaIndex,
types,
kibanaVersion: nextMinor,
})
.map((config, index) =>
getKibanaMigratorTestKit({
...config,
logFilePath: Path.join(__dirname, `active_delete_instance_${index}.log`),
})
)
);
const results = await Promise.all(testKits.map((testKit) => testKit.runMigrations()));
expect(results.flat().every((result) => result.status === 'migrated')).toEqual(true);
for (let i = 0; i < PARALLEL_MIGRATORS; ++i) {
const logs = await fs.readFile(
Path.join(__dirname, `active_delete_instance_${i}.log`),
'utf-8'
);
expect(logs).toMatch('CHECK_VERSION_INDEX_READY_ACTIONS -> DONE');
expect(logs).toMatch('Migration completed');
}
const endTime = Date.now();
// eslint-disable-next-line no-console
console.debug(`Migration took: ${(endTime - startTime) / 1000} seconds`);
// After cleanup
const afterCleanup = await getAggregatedTypesCount();
expect(afterCleanup.server).not.toBeDefined(); // 'server' is part of the REMOVED_TYPES
expect(afterCleanup.basic).toEqual(DOCUMENTS_PER_TYPE); // we keep 'basic' SOs
expect(afterCleanup.deprecated).not.toBeDefined(); // 'deprecated' is no longer present in nextMinor's mappings
expect(afterCleanup.complex).toEqual(DOCUMENTS_PER_TYPE / 2); // we excludeFromUpgrade half of them with a hook
});
afterAll(async () => {
// await esClient?.indices.delete({ index: `${kibanaIndex}_${currentVersion}_001` });
await esServer?.stop();
await delay(10);
});
const getAggregatedTypesCount = async () => {
await esClient.indices.refresh();
const response = await esClient.search<unknown, { typesAggregation: { buckets: any[] } }>({
index: kibanaIndex,
_source: false,
aggs: {
typesAggregation: {
terms: {
// assign type __UNKNOWN__ to those documents that don't define one
missing: '__UNKNOWN__',
field: 'type',
size: 10,
},
aggs: {
docs: {
top_hits: {
size: 2,
_source: {
excludes: ['*'],
},
},
},
},
},
},
});
return (response.aggregations!.typesAggregation.buckets as unknown as any).reduce(
(acc: any, current: any) => {
acc[current.key] = current.doc_count;
return acc;
},
{}
);
};
});
});

View file

@ -64,7 +64,13 @@ describe('skip reindexing', () => {
logs = await fs.readFile(logFilePath, 'utf-8');
expect(logs).toMatch('INIT -> WAIT_FOR_YELLOW_SOURCE');
expect(logs).toMatch('WAIT_FOR_YELLOW_SOURCE -> PREPARE_COMPATIBLE_MIGRATION');
expect(logs).toMatch('WAIT_FOR_YELLOW_SOURCE -> CLEANUP_UNKNOWN_AND_EXCLUDED');
expect(logs).toMatch(
'CLEANUP_UNKNOWN_AND_EXCLUDED -> CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK'
);
expect(logs).toMatch(
'CLEANUP_UNKNOWN_AND_EXCLUDED_WAIT_FOR_TASK -> PREPARE_COMPATIBLE_MIGRATION'
);
expect(logs).toMatch('PREPARE_COMPATIBLE_MIGRATION -> OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT');
expect(logs).toMatch('CHECK_TARGET_MAPPINGS -> CHECK_VERSION_INDEX_READY_ACTIONS');
expect(logs).toMatch('CHECK_VERSION_INDEX_READY_ACTIONS -> DONE');
@ -87,7 +93,7 @@ describe('skip reindexing', () => {
logs = await fs.readFile(logFilePath, 'utf-8');
expect(logs).toMatch('INIT -> OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT');
expect(logs).not.toMatch('INIT -> PREPARE_COMPATIBLE_MIGRATION');
expect(logs).not.toMatch('INIT -> WAIT_FOR_YELLOW_SOURCE');
});
});

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
/* eslint-disable no-console */
import Path from 'path';
import { exec } from 'child_process';
import { promisify } from 'util';
const execPromise = promisify(exec);
import { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server';
import { SavedObjectsType } from '@kbn/core-saved-objects-server';
import { getKibanaMigratorTestKit, startElasticsearch } from './kibana_migrator_test_kit';
import { delay } from './test_utils';
const DEFAULT_BATCH_SIZE = 100000;
interface CreateBaselineArchiveParams {
kibanaIndex: string;
types: Array<SavedObjectsType<any>>;
documents: SavedObjectsBulkCreateObject[];
batchSize?: number;
esBaseFolder?: string;
dataArchive: string;
}
export const createBaselineArchive = async ({
types,
documents,
kibanaIndex,
batchSize = DEFAULT_BATCH_SIZE,
esBaseFolder = Path.join(__dirname, `target`),
dataArchive,
}: CreateBaselineArchiveParams) => {
const startTime = Date.now();
const esServer = await startElasticsearch({ basePath: esBaseFolder });
const { runMigrations, savedObjectsRepository } = await getKibanaMigratorTestKit({
kibanaIndex,
types,
});
await runMigrations();
const batches = Math.ceil(documents.length / batchSize);
for (let i = 0; i < batches; ++i) {
console.log(`Indexing up to ${batchSize} docs (batch ${i + 1} of ${batches})`);
await savedObjectsRepository.bulkCreate(documents.slice(batchSize * i, batchSize * (i + 1)), {
refresh: 'wait_for',
});
}
await compressBaselineArchive(esBaseFolder, dataArchive);
console.log(`Archive created in: ${(Date.now() - startTime) / 1000} seconds`, dataArchive);
await delay(200);
await esServer.stop();
// await fs.rm(esBaseFolder, { recursive: true });
};
const compressBaselineArchive = async (esFolder: string, archiveFile: string) => {
const dataFolder = Path.join(esFolder, 'es-test-cluster');
const cmd = `cd ${dataFolder} && zip -r ${archiveFile} data -x ".DS_Store" -x "__MACOSX"`;
await execPromise(cmd);
};

View file

@ -0,0 +1,274 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import Path from 'path';
import { defaultsDeep } from 'lodash';
import { BehaviorSubject, firstValueFrom, map } from 'rxjs';
import { ConfigService, Env } from '@kbn/config';
import { getEnvOptions } from '@kbn/config-mocks';
import { REPO_ROOT } from '@kbn/repo-info';
import { KibanaMigrator } from '@kbn/core-saved-objects-migration-server-internal';
import {
SavedObjectConfig,
type SavedObjectsConfigType,
type SavedObjectsMigrationConfigType,
SavedObjectTypeRegistry,
IKibanaMigrator,
MigrationResult,
} from '@kbn/core-saved-objects-base-server-internal';
import { SavedObjectsRepository } from '@kbn/core-saved-objects-api-server-internal';
import {
ElasticsearchConfig,
type ElasticsearchConfigType,
} from '@kbn/core-elasticsearch-server-internal';
import { AgentManager, configureClient } from '@kbn/core-elasticsearch-client-server-internal';
import { type LoggingConfigType, LoggingSystem } from '@kbn/core-logging-server-internal';
import type { ISavedObjectTypeRegistry, SavedObjectsType } from '@kbn/core-saved-objects-server';
import { esTestConfig, kibanaServerTestUser } from '@kbn/test';
import { LoggerFactory } from '@kbn/logging';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { registerServiceConfig } from '@kbn/core-root-server-internal';
import { ISavedObjectsRepository } from '@kbn/core-saved-objects-api-server';
import { getDocLinks, getDocLinksMeta } from '@kbn/doc-links';
import { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import { createTestServers } from '@kbn/core-test-helpers-kbn-server';
export const defaultLogFilePath = Path.join(__dirname, 'kibana_migrator_test_kit.log');
const env = Env.createDefault(REPO_ROOT, getEnvOptions());
// Extract current stack version from Env, to use as a default
const currentVersion = env.packageInfo.version;
const currentBranch = env.packageInfo.branch;
export interface GetEsClientParams {
settings?: Record<string, any>;
kibanaVersion?: string;
logFilePath?: string;
}
export interface KibanaMigratorTestKitParams {
kibanaIndex?: string;
kibanaVersion?: string;
kibanaBranch?: string;
settings?: Record<string, any>;
types?: Array<SavedObjectsType<any>>;
logFilePath?: string;
}
export interface KibanaMigratorTestKit {
client: ElasticsearchClient;
migrator: IKibanaMigrator;
runMigrations: (rerun?: boolean) => Promise<MigrationResult[]>;
typeRegistry: ISavedObjectTypeRegistry;
savedObjectsRepository: ISavedObjectsRepository;
}
export const startElasticsearch = async ({
basePath,
dataArchive,
}: {
basePath?: string;
dataArchive?: string;
}) => {
const { startES } = createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
settings: {
es: {
license: 'basic',
basePath,
dataArchive,
},
},
});
return await startES();
};
export const getEsClient = async ({
settings = {},
kibanaVersion = currentVersion,
logFilePath = defaultLogFilePath,
}: GetEsClientParams = {}) => {
const loggingSystem = new LoggingSystem();
const loggerFactory = loggingSystem.asLoggerFactory();
const configService = getConfigService(settings, loggerFactory, logFilePath);
// configure logging system
const loggingConf = await firstValueFrom(configService.atPath<LoggingConfigType>('logging'));
loggingSystem.upgrade(loggingConf);
return await getElasticsearchClient(configService, loggerFactory, kibanaVersion);
};
export const getKibanaMigratorTestKit = async ({
settings = {},
kibanaIndex = '.kibana',
kibanaVersion = currentVersion,
kibanaBranch = currentBranch,
types = [],
logFilePath = defaultLogFilePath,
}: KibanaMigratorTestKitParams = {}): Promise<KibanaMigratorTestKit> => {
const loggingSystem = new LoggingSystem();
const loggerFactory = loggingSystem.asLoggerFactory();
const configService = getConfigService(settings, loggerFactory, logFilePath);
// configure logging system
const loggingConf = await firstValueFrom(configService.atPath<LoggingConfigType>('logging'));
loggingSystem.upgrade(loggingConf);
const client = await getElasticsearchClient(configService, loggerFactory, kibanaVersion);
const typeRegistry = new SavedObjectTypeRegistry();
// types must be registered before instantiating the migrator
registerTypes(typeRegistry, types);
const migrator = await getMigrator(
configService,
client,
typeRegistry,
loggerFactory,
kibanaIndex,
kibanaVersion,
kibanaBranch
);
const runMigrations = async (rerun?: boolean) => {
migrator.prepareMigrations();
return await migrator.runMigrations({ rerun });
};
const savedObjectsRepository = SavedObjectsRepository.createRepository(
migrator,
typeRegistry,
kibanaIndex,
client,
loggerFactory.get('saved_objects')
);
return {
client,
migrator,
runMigrations,
typeRegistry,
savedObjectsRepository,
};
};
const getConfigService = (
settings: Record<string, any>,
loggerFactory: LoggerFactory,
logFilePath: string
) => {
// Define some basic default kibana settings
const DEFAULTS_SETTINGS = {
server: {
autoListen: true,
// Use the ephemeral port to make sure that tests use the first available
// port and aren't affected by the timing issues in test environment.
port: 0,
xsrf: { disableProtection: true },
},
elasticsearch: {
hosts: [esTestConfig.getUrl()],
username: kibanaServerTestUser.username,
password: kibanaServerTestUser.password,
},
logging: {
appenders: {
file: {
type: 'file',
fileName: logFilePath,
layout: {
type: 'json',
},
},
},
loggers: [
{
name: 'root',
level: 'info',
appenders: ['file'],
},
],
},
plugins: {},
migrations: { skip: false },
};
const rawConfigProvider = {
getConfig$: () => new BehaviorSubject(defaultsDeep({}, settings, DEFAULTS_SETTINGS)),
};
const configService = new ConfigService(rawConfigProvider, env, loggerFactory);
registerServiceConfig(configService);
return configService;
};
const getElasticsearchClient = async (
configService: ConfigService,
loggerFactory: LoggerFactory,
kibanaVersion: string
) => {
const esClientConfig = await firstValueFrom(
configService
.atPath<ElasticsearchConfigType>('elasticsearch')
.pipe(map((rawConfig) => new ElasticsearchConfig(rawConfig)))
);
return configureClient(esClientConfig, {
logger: loggerFactory.get('elasticsearch'),
type: 'data',
agentFactoryProvider: new AgentManager(),
kibanaVersion,
});
};
const getMigrator = async (
configService: ConfigService,
client: ElasticsearchClient,
typeRegistry: ISavedObjectTypeRegistry,
loggerFactory: LoggerFactory,
kibanaIndex: string,
kibanaVersion: string,
kibanaBranch: string
) => {
const savedObjectsConf = await firstValueFrom(
configService.atPath<SavedObjectsConfigType>('savedObjects')
);
const savedObjectsMigrationConf = await firstValueFrom(
configService.atPath<SavedObjectsMigrationConfigType>('migrations')
);
const soConfig = new SavedObjectConfig(savedObjectsConf, savedObjectsMigrationConf);
const docLinks: DocLinksServiceStart = {
...getDocLinksMeta({ kibanaBranch }),
links: getDocLinks({ kibanaBranch }),
};
return new KibanaMigrator({
client,
typeRegistry,
kibanaIndex,
soMigrationsConfig: soConfig.migration,
kibanaVersion,
logger: loggerFactory.get('savedobjects-service'),
docLinks,
waitForMigrationCompletion: false, // ensure we have an active role in the migration
});
};
const registerTypes = (
typeRegistry: SavedObjectTypeRegistry,
types?: Array<SavedObjectsType<any>>
) => {
(types || []).forEach((type) => typeRegistry.registerType(type));
};

View file

@ -148,6 +148,7 @@
"@kbn/core-lifecycle-browser",
"@kbn/core-custom-branding-browser",
"@kbn/core-custom-branding-server",
"@kbn/core-elasticsearch-client-server-internal",
],
"exclude": [
"target/**/*",

View file

@ -64,4 +64,14 @@ export const searchSessionSavedObjectType: SavedObjectsType = {
},
},
migrations: searchSessionSavedObjectMigrations,
excludeOnUpgrade: async () => {
return {
bool: {
must: [
{ term: { type: SEARCH_SESSION_TYPE } },
{ match: { 'search-session.persisted': false } },
],
},
};
},
};