mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
Migrationsv2: limit batch sizes to migrations.batchSizeBytes (= 100mb by default) (#109540)
* Fix logging for existing integration test * First stab at limiting batches to batchSizeBytes * Fix tests * Fix batch size calculation, NDJSON needs to be terminated by an empty line * Integration tests * Fix type failures * rename migration integration tests and log files to be consistent & more descriptive * Review feedback * Remove duplication of fatal error reasons * migrations.maxBatchSizeBytes to docker environment vars * docs for migrations.maxBatchSizeBytes
This commit is contained in:
parent
249c5fbf9a
commit
393505ab39
32 changed files with 764 additions and 103 deletions
|
@ -406,7 +406,10 @@ override this parameter to use their own Tile Map Service. For example:
|
|||
`"https://tiles.elastic.co/v2/default/{z}/{x}/{y}.png?elastic_tile_service_tos=agree&my_app_name=kibana"`
|
||||
|
||||
| `migrations.batchSize:`
|
||||
| Defines the number of documents migrated at a time. The higher the value, the faster the Saved Objects migration process performs at the cost of higher memory consumption. If the migration fails due to a `circuit_breaking_exception`, set a smaller `batchSize` value. *Default: `1000`*
|
||||
| Defines the number of documents migrated at a time. The higher the value, the faster the Saved Objects migration process performs at the cost of higher memory consumption. If upgrade migrations results in {kib} crashing with an out of memory exception or fails due to an Elasticsearch `circuit_breaking_exception`, use a smaller `batchSize` value to reduce the memory pressure. *Default: `1000`*
|
||||
|
||||
| `migrations.maxBatchSizeBytes:`
|
||||
| Defines the maximum payload size for indexing batches of upgraded saved objects to avoid migrations failing due to a 413 Request Entity Too Large response from Elasticsearch. This value should be lower than or equal to your Elasticsearch cluster's `http.max_content_length` configuration option. *Default: `100mb`*
|
||||
|
||||
| `migrations.enableV2:`
|
||||
| experimental[]. Enables the new Saved Objects migration algorithm. For information about the migration algorithm, refer to <<upgrade-migrations>>. When `migrations v2` is stable, the setting will be removed in an upcoming release without any further notice. Setting the value to `false` causes {kib} to use the legacy migration algorithm, which shipped in 7.11 and earlier versions. *Default: `true`*
|
||||
|
|
|
@ -11,6 +11,7 @@ import { buildActiveMappings } from '../core';
|
|||
const { mergeTypes } = jest.requireActual('./kibana_migrator');
|
||||
import { SavedObjectsType } from '../../types';
|
||||
import { BehaviorSubject } from 'rxjs';
|
||||
import { ByteSizeValue } from '@kbn/config-schema';
|
||||
|
||||
const defaultSavedObjectTypes: SavedObjectsType[] = [
|
||||
{
|
||||
|
@ -37,6 +38,7 @@ const createMigrator = (
|
|||
kibanaVersion: '8.0.0-testing',
|
||||
soMigrationsConfig: {
|
||||
batchSize: 100,
|
||||
maxBatchSizeBytes: ByteSizeValue.parse('30kb'),
|
||||
scrollDuration: '15m',
|
||||
pollInterval: 1500,
|
||||
skip: false,
|
||||
|
|
|
@ -15,6 +15,7 @@ import { loggingSystemMock } from '../../../logging/logging_system.mock';
|
|||
import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry';
|
||||
import { SavedObjectsType } from '../../types';
|
||||
import { DocumentMigrator } from '../core/document_migrator';
|
||||
import { ByteSizeValue } from '@kbn/config-schema';
|
||||
jest.mock('../core/document_migrator', () => {
|
||||
return {
|
||||
// Create a mock for spying on the constructor
|
||||
|
@ -396,6 +397,7 @@ const mockOptions = ({ enableV2 }: { enableV2: boolean } = { enableV2: false })
|
|||
} as KibanaMigratorOptions['kibanaConfig'],
|
||||
soMigrationsConfig: {
|
||||
batchSize: 20,
|
||||
maxBatchSizeBytes: ByteSizeValue.parse('20mb'),
|
||||
pollInterval: 20000,
|
||||
scrollDuration: '10m',
|
||||
skip: false,
|
||||
|
|
|
@ -316,7 +316,10 @@ completed this step:
|
|||
- temp index has a write block
|
||||
- temp index is not found
|
||||
### New control state
|
||||
1. If `currentBatch` is the last batch in `transformedDocBatches`
|
||||
→ `REINDEX_SOURCE_TO_TEMP_READ`
|
||||
2. If there are more batches left in `transformedDocBatches`
|
||||
→ `REINDEX_SOURCE_TO_TEMP_INDEX_BULK`
|
||||
|
||||
## REINDEX_SOURCE_TO_TEMP_CLOSE_PIT
|
||||
### Next action
|
||||
|
|
|
@ -23,6 +23,27 @@ import type {
|
|||
IndexNotFound,
|
||||
} from './index';
|
||||
|
||||
/**
|
||||
* Given a document and index, creates a valid body for the Bulk API.
|
||||
*/
|
||||
export const createBulkOperationBody = (doc: SavedObjectsRawDoc, index: string) => {
|
||||
return [
|
||||
{
|
||||
index: {
|
||||
_index: index,
|
||||
_id: doc._id,
|
||||
// overwrite existing documents
|
||||
op_type: 'index',
|
||||
// use optimistic concurrency control to ensure that outdated
|
||||
// documents are only overwritten once with the latest version
|
||||
if_seq_no: doc._seq_no,
|
||||
if_primary_term: doc._primary_term,
|
||||
},
|
||||
},
|
||||
doc._source,
|
||||
];
|
||||
};
|
||||
|
||||
/** @internal */
|
||||
export interface BulkOverwriteTransformedDocumentsParams {
|
||||
client: ElasticsearchClient;
|
||||
|
@ -47,6 +68,10 @@ export const bulkOverwriteTransformedDocuments = ({
|
|||
| RequestEntityTooLargeException,
|
||||
'bulk_index_succeeded'
|
||||
> => () => {
|
||||
const body = transformedDocs.flatMap((doc) => {
|
||||
return createBulkOperationBody(doc, index);
|
||||
});
|
||||
|
||||
return client
|
||||
.bulk({
|
||||
// Because we only add aliases in the MARK_VERSION_INDEX_READY step we
|
||||
|
@ -60,23 +85,7 @@ export const bulkOverwriteTransformedDocuments = ({
|
|||
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
|
||||
refresh,
|
||||
filter_path: ['items.*.error'],
|
||||
body: transformedDocs.flatMap((doc) => {
|
||||
return [
|
||||
{
|
||||
index: {
|
||||
_index: index,
|
||||
_id: doc._id,
|
||||
// overwrite existing documents
|
||||
op_type: 'index',
|
||||
// use optimistic concurrency control to ensure that outdated
|
||||
// documents are only overwritten once with the latest version
|
||||
if_seq_no: doc._seq_no,
|
||||
if_primary_term: doc._primary_term,
|
||||
},
|
||||
},
|
||||
doc._source,
|
||||
];
|
||||
}),
|
||||
body,
|
||||
})
|
||||
.then((res) => {
|
||||
// Filter out version_conflict_engine_exception since these just mean
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { ByteSizeValue } from '@kbn/config-schema';
|
||||
import * as Option from 'fp-ts/Option';
|
||||
import { SavedObjectsMigrationConfigType } from '../saved_objects_config';
|
||||
import { SavedObjectTypeRegistry } from '../saved_objects_type_registry';
|
||||
|
@ -21,6 +22,7 @@ describe('createInitialState', () => {
|
|||
const migrationsConfig = ({
|
||||
retryAttempts: 15,
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: ByteSizeValue.parse('100mb'),
|
||||
} as unknown) as SavedObjectsMigrationConfigType;
|
||||
it('creates the initial state for the model based on the passed in parameters', () => {
|
||||
expect(
|
||||
|
@ -37,6 +39,7 @@ describe('createInitialState', () => {
|
|||
})
|
||||
).toEqual({
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: ByteSizeValue.parse('100mb').getValueInBytes(),
|
||||
controlState: 'INIT',
|
||||
currentAlias: '.kibana_task_manager',
|
||||
excludeFromUpgradeFilterHooks: {},
|
||||
|
|
|
@ -82,6 +82,7 @@ export const createInitialState = ({
|
|||
retryDelay: 0,
|
||||
retryAttempts: migrationsConfig.retryAttempts,
|
||||
batchSize: migrationsConfig.batchSize,
|
||||
maxBatchSizeBytes: migrationsConfig.maxBatchSizeBytes.getValueInBytes(),
|
||||
logs: [],
|
||||
unusedTypesQuery: excludeUnusedTypesQuery,
|
||||
knownTypes,
|
||||
|
|
|
@ -17,7 +17,7 @@ import { InternalCoreStart } from '../../../internal_types';
|
|||
import { Root } from '../../../root';
|
||||
|
||||
const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
|
||||
const logFilePath = path.join(__dirname, 'migration_test_kibana.log');
|
||||
const logFilePath = path.join(__dirname, '7.7.2_xpack_100k.log');
|
||||
|
||||
async function removeLogFile() {
|
||||
// ignore errors if it doesn't exist
|
||||
|
@ -61,9 +61,12 @@ describe('migration from 7.7.2-xpack with 100k objects', () => {
|
|||
},
|
||||
},
|
||||
},
|
||||
root: {
|
||||
appenders: ['default', 'file'],
|
||||
},
|
||||
loggers: [
|
||||
{
|
||||
name: 'root',
|
||||
appenders: ['file'],
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
{
|
|
@ -12,7 +12,7 @@ import * as kbnTestServer from '../../../../test_helpers/kbn_server';
|
|||
import { Root } from '../../../root';
|
||||
import { ElasticsearchClient } from '../../../elasticsearch';
|
||||
|
||||
const logFilePath = Path.join(__dirname, '7_13_failed_action_tasks_test.log');
|
||||
const logFilePath = Path.join(__dirname, '7_13_failed_action_tasks.log');
|
||||
|
||||
async function removeLogFile() {
|
||||
// ignore errors if it doesn't exist
|
|
@ -12,7 +12,7 @@ import Util from 'util';
|
|||
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
|
||||
import { Root } from '../../../root';
|
||||
|
||||
const logFilePath = Path.join(__dirname, '7_13_corrupt_transform_failures_test.log');
|
||||
const logFilePath = Path.join(__dirname, '7_13_corrupt_transform_failures.log');
|
||||
|
||||
const asyncUnlink = Util.promisify(Fs.unlink);
|
||||
|
|
@ -16,10 +16,12 @@ import { ElasticsearchClient } from '../../../elasticsearch';
|
|||
import { Env } from '@kbn/config';
|
||||
import { REPO_ROOT } from '@kbn/utils';
|
||||
import { getEnvOptions } from '../../../config/mocks';
|
||||
import { retryAsync } from '../test_helpers/retry_async';
|
||||
import { LogRecord } from '@kbn/logging';
|
||||
|
||||
const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
|
||||
const targetIndex = `.kibana_${kibanaVersion}_001`;
|
||||
const logFilePath = Path.join(__dirname, '7_13_unknown_types_test.log');
|
||||
const logFilePath = Path.join(__dirname, '7_13_unknown_types.log');
|
||||
|
||||
async function removeLogFile() {
|
||||
// ignore errors if it doesn't exist
|
||||
|
@ -68,24 +70,31 @@ describe('migration v2', () => {
|
|||
await root.setup();
|
||||
await root.start();
|
||||
|
||||
const logFileContent = await fs.readFile(logFilePath, 'utf-8');
|
||||
const records = logFileContent
|
||||
.split('\n')
|
||||
.filter(Boolean)
|
||||
.map((str) => JSON5.parse(str));
|
||||
let unknownDocsWarningLog: LogRecord;
|
||||
|
||||
const unknownDocsWarningLog = records.find((rec) =>
|
||||
rec.message.startsWith(`[.kibana] CHECK_UNKNOWN_DOCUMENTS`)
|
||||
await retryAsync(
|
||||
async () => {
|
||||
const logFileContent = await fs.readFile(logFilePath, 'utf-8');
|
||||
const records = logFileContent
|
||||
.split('\n')
|
||||
.filter(Boolean)
|
||||
.map((str) => JSON5.parse(str));
|
||||
|
||||
unknownDocsWarningLog = records.find((rec) =>
|
||||
rec.message.startsWith(`[.kibana] CHECK_UNKNOWN_DOCUMENTS`)
|
||||
);
|
||||
|
||||
expect(
|
||||
unknownDocsWarningLog.message.startsWith(
|
||||
'[.kibana] CHECK_UNKNOWN_DOCUMENTS Upgrades will fail for 8.0+ because documents were found for unknown saved ' +
|
||||
'object types. To ensure that upgrades will succeed in the future, either re-enable plugins or delete ' +
|
||||
`these documents from the "${targetIndex}" index after the current upgrade completes.`
|
||||
)
|
||||
).toBeTruthy();
|
||||
},
|
||||
{ retryAttempts: 10, retryDelayMs: 200 }
|
||||
);
|
||||
|
||||
expect(
|
||||
unknownDocsWarningLog.message.startsWith(
|
||||
'[.kibana] CHECK_UNKNOWN_DOCUMENTS Upgrades will fail for 8.0+ because documents were found for unknown saved ' +
|
||||
'object types. To ensure that upgrades will succeed in the future, either re-enable plugins or delete ' +
|
||||
`these documents from the "${targetIndex}" index after the current upgrade completes.`
|
||||
)
|
||||
).toBeTruthy();
|
||||
|
||||
const unknownDocs = [
|
||||
{ type: 'space', id: 'space:default' },
|
||||
{ type: 'space', id: 'space:first' },
|
Binary file not shown.
|
@ -0,0 +1,145 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import Path from 'path';
|
||||
import fs from 'fs/promises';
|
||||
import JSON5 from 'json5';
|
||||
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
|
||||
import { Root } from '../../../root';
|
||||
import { ElasticsearchClient } from '../../../elasticsearch';
|
||||
import { Env } from '@kbn/config';
|
||||
import { REPO_ROOT } from '@kbn/utils';
|
||||
import { getEnvOptions } from '../../../config/mocks';
|
||||
import { LogRecord } from '@kbn/logging';
|
||||
import { retryAsync } from '../test_helpers/retry_async';
|
||||
|
||||
const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
|
||||
const targetIndex = `.kibana_${kibanaVersion}_001`;
|
||||
const logFilePath = Path.join(__dirname, 'batch_size_bytes.log');
|
||||
|
||||
async function removeLogFile() {
|
||||
// ignore errors if it doesn't exist
|
||||
await fs.unlink(logFilePath).catch(() => void 0);
|
||||
}
|
||||
|
||||
describe('migration v2', () => {
|
||||
let esServer: kbnTestServer.TestElasticsearchUtils;
|
||||
let root: Root;
|
||||
let startES: () => Promise<kbnTestServer.TestElasticsearchUtils>;
|
||||
|
||||
beforeAll(async () => {
|
||||
await removeLogFile();
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
({ startES } = kbnTestServer.createTestServers({
|
||||
adjustTimeout: (t: number) => jest.setTimeout(t),
|
||||
settings: {
|
||||
es: {
|
||||
license: 'basic',
|
||||
dataArchive: Path.join(__dirname, 'archives', '7.14.0_xpack_sample_saved_objects.zip'),
|
||||
esArgs: ['http.max_content_length=1715275b'],
|
||||
},
|
||||
},
|
||||
}));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
if (root) {
|
||||
await root.shutdown();
|
||||
}
|
||||
if (esServer) {
|
||||
await esServer.stop();
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 10000));
|
||||
});
|
||||
|
||||
it('completes the migration even when a full batch would exceed ES http.max_content_length', async () => {
|
||||
root = createRoot({ maxBatchSizeBytes: 1715275 });
|
||||
esServer = await startES();
|
||||
await root.preboot();
|
||||
await root.setup();
|
||||
await expect(root.start()).resolves.toBeTruthy();
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 1000));
|
||||
|
||||
const esClient: ElasticsearchClient = esServer.es.getClient();
|
||||
const migratedIndexResponse = await esClient.count({
|
||||
index: targetIndex,
|
||||
});
|
||||
const oldIndexResponse = await esClient.count({
|
||||
index: '.kibana_7.14.0_001',
|
||||
});
|
||||
|
||||
// Use a >= comparison since once Kibana has started it might create new
|
||||
// documents like telemetry tasks
|
||||
expect(migratedIndexResponse.body.count).toBeGreaterThanOrEqual(oldIndexResponse.body.count);
|
||||
});
|
||||
|
||||
it('fails with a descriptive message when a single document exceeds maxBatchSizeBytes', async () => {
|
||||
root = createRoot({ maxBatchSizeBytes: 1015275 });
|
||||
esServer = await startES();
|
||||
await root.preboot();
|
||||
await root.setup();
|
||||
await expect(root.start()).rejects.toMatchInlineSnapshot(
|
||||
`[Error: Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715275 bytes which exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.]`
|
||||
);
|
||||
|
||||
await retryAsync(
|
||||
async () => {
|
||||
const logFileContent = await fs.readFile(logFilePath, 'utf-8');
|
||||
const records = logFileContent
|
||||
.split('\n')
|
||||
.filter(Boolean)
|
||||
.map((str) => JSON5.parse(str)) as LogRecord[];
|
||||
expect(
|
||||
records.find((rec) =>
|
||||
rec.message.startsWith(
|
||||
`Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715275 bytes which exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.`
|
||||
)
|
||||
)
|
||||
).toBeDefined();
|
||||
},
|
||||
{ retryAttempts: 10, retryDelayMs: 200 }
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
function createRoot(options: { maxBatchSizeBytes?: number }) {
|
||||
return kbnTestServer.createRootWithCorePlugins(
|
||||
{
|
||||
migrations: {
|
||||
skip: false,
|
||||
enableV2: true,
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: options.maxBatchSizeBytes,
|
||||
},
|
||||
logging: {
|
||||
appenders: {
|
||||
file: {
|
||||
type: 'file',
|
||||
fileName: logFilePath,
|
||||
layout: {
|
||||
type: 'json',
|
||||
},
|
||||
},
|
||||
},
|
||||
loggers: [
|
||||
{
|
||||
name: 'root',
|
||||
appenders: ['file'],
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
{
|
||||
oss: true,
|
||||
}
|
||||
);
|
||||
}
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import Path from 'path';
|
||||
import fs from 'fs/promises';
|
||||
import JSON5 from 'json5';
|
||||
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
|
||||
import { Root } from '../../../root';
|
||||
import { retryAsync } from '../test_helpers/retry_async';
|
||||
|
||||
const logFilePath = Path.join(__dirname, 'batch_size_bytes_exceeds_es_content_length.log');
|
||||
|
||||
async function removeLogFile() {
|
||||
// ignore errors if it doesn't exist
|
||||
await fs.unlink(logFilePath).catch(() => void 0);
|
||||
}
|
||||
|
||||
describe('migration v2', () => {
|
||||
let esServer: kbnTestServer.TestElasticsearchUtils;
|
||||
let root: Root;
|
||||
let startES: () => Promise<kbnTestServer.TestElasticsearchUtils>;
|
||||
|
||||
beforeAll(async () => {
|
||||
await removeLogFile();
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
({ startES } = kbnTestServer.createTestServers({
|
||||
adjustTimeout: (t: number) => jest.setTimeout(t),
|
||||
settings: {
|
||||
es: {
|
||||
license: 'basic',
|
||||
dataArchive: Path.join(__dirname, 'archives', '7.14.0_xpack_sample_saved_objects.zip'),
|
||||
esArgs: ['http.max_content_length=1mb'],
|
||||
},
|
||||
},
|
||||
}));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
if (root) {
|
||||
await root.shutdown();
|
||||
}
|
||||
if (esServer) {
|
||||
await esServer.stop();
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 10000));
|
||||
});
|
||||
|
||||
it('fails with a descriptive message when maxBatchSizeBytes exceeds ES http.max_content_length', async () => {
|
||||
root = createRoot({ maxBatchSizeBytes: 1715275 });
|
||||
esServer = await startES();
|
||||
await root.preboot();
|
||||
await root.setup();
|
||||
await expect(root.start()).rejects.toMatchInlineSnapshot(
|
||||
`[Error: Unable to complete saved object migrations for the [.kibana] index: While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.maxBatchSizeBytes' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option.]`
|
||||
);
|
||||
|
||||
await retryAsync(
|
||||
async () => {
|
||||
const logFileContent = await fs.readFile(logFilePath, 'utf-8');
|
||||
const records = logFileContent
|
||||
.split('\n')
|
||||
.filter(Boolean)
|
||||
.map((str) => JSON5.parse(str)) as any[];
|
||||
|
||||
expect(
|
||||
records.find((rec) =>
|
||||
rec.message.startsWith(
|
||||
`Unable to complete saved object migrations for the [.kibana] index: While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.maxBatchSizeBytes' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option.`
|
||||
)
|
||||
)
|
||||
).toBeDefined();
|
||||
},
|
||||
{ retryAttempts: 10, retryDelayMs: 200 }
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
function createRoot(options: { maxBatchSizeBytes?: number }) {
|
||||
return kbnTestServer.createRootWithCorePlugins(
|
||||
{
|
||||
migrations: {
|
||||
skip: false,
|
||||
enableV2: true,
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: options.maxBatchSizeBytes,
|
||||
},
|
||||
logging: {
|
||||
appenders: {
|
||||
file: {
|
||||
type: 'file',
|
||||
fileName: logFilePath,
|
||||
layout: {
|
||||
type: 'json',
|
||||
},
|
||||
},
|
||||
},
|
||||
loggers: [
|
||||
{
|
||||
name: 'root',
|
||||
appenders: ['file'],
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
{
|
||||
oss: true,
|
||||
}
|
||||
);
|
||||
}
|
|
@ -13,7 +13,7 @@ import JSON5 from 'json5';
|
|||
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
|
||||
import type { Root } from '../../../root';
|
||||
|
||||
const logFilePath = Path.join(__dirname, 'cleanup_test.log');
|
||||
const logFilePath = Path.join(__dirname, 'cleanup.log');
|
||||
|
||||
const asyncUnlink = Util.promisify(Fs.unlink);
|
||||
const asyncReadFile = Util.promisify(Fs.readFile);
|
||||
|
|
|
@ -12,7 +12,7 @@ import Util from 'util';
|
|||
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
|
||||
import { Root } from '../../../root';
|
||||
|
||||
const logFilePath = Path.join(__dirname, 'migration_test_corrupt_docs_kibana.log');
|
||||
const logFilePath = Path.join(__dirname, 'collects_corrupt_docs.log');
|
||||
|
||||
const asyncUnlink = Util.promisify(Fs.unlink);
|
||||
|
|
@ -12,7 +12,7 @@ import Util from 'util';
|
|||
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
|
||||
import { Root } from '../../../root';
|
||||
|
||||
const logFilePath = Path.join(__dirname, 'migration_test_corrupt_docs_kibana.log');
|
||||
const logFilePath = Path.join(__dirname, 'corrupt_outdated_docs.log');
|
||||
|
||||
const asyncUnlink = Util.promisify(Fs.unlink);
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ import { Root } from '../../../root';
|
|||
|
||||
const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
|
||||
|
||||
const logFilePath = Path.join(__dirname, 'migration_test_kibana_from_v1.log');
|
||||
const logFilePath = Path.join(__dirname, 'migration_from_v1.log');
|
||||
|
||||
const asyncUnlink = Util.promisify(Fs.unlink);
|
||||
async function removeLogFile() {
|
|
@ -14,7 +14,7 @@ import * as kbnTestServer from '../../../../test_helpers/kbn_server';
|
|||
import type { ElasticsearchClient } from '../../../elasticsearch';
|
||||
import { Root } from '../../../root';
|
||||
|
||||
const logFilePath = Path.join(__dirname, 'migration_test_kibana.log');
|
||||
const logFilePath = Path.join(__dirname, 'outdated_docs.log');
|
||||
|
||||
const asyncUnlink = Util.promisify(Fs.unlink);
|
||||
async function removeLogFile() {
|
||||
|
|
|
@ -15,7 +15,7 @@ import type { ElasticsearchClient } from '../../../elasticsearch';
|
|||
import { Root } from '../../../root';
|
||||
import { deterministicallyRegenerateObjectId } from '../../migrations/core/document_migrator';
|
||||
|
||||
const logFilePath = Path.join(__dirname, 'migration_test_kibana.log');
|
||||
const logFilePath = Path.join(__dirname, 'rewriting_id.log');
|
||||
|
||||
const asyncUnlink = Util.promisify(Fs.unlink);
|
||||
async function removeLogFile() {
|
||||
|
|
|
@ -17,6 +17,7 @@ import { elasticsearchClientMock } from '../../elasticsearch/client/mocks';
|
|||
import { LoggerAdapter } from '../../logging/logger_adapter';
|
||||
import { AllControlStates, State } from './types';
|
||||
import { createInitialState } from './initial_state';
|
||||
import { ByteSizeValue } from '@kbn/config-schema';
|
||||
|
||||
const esClient = elasticsearchServiceMock.createElasticsearchClient();
|
||||
|
||||
|
@ -40,6 +41,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
indexPrefix: '.my-so-index',
|
||||
migrationsConfig: {
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: new ByteSizeValue(1e8),
|
||||
pollInterval: 0,
|
||||
scrollDuration: '0s',
|
||||
skip: false,
|
||||
|
@ -235,6 +237,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
...initialState,
|
||||
reason: 'the fatal reason',
|
||||
outdatedDocuments: [{ _id: '1234', password: 'sensitive password' }],
|
||||
transformedDocBatches: [[{ _id: '1234', password: 'sensitive transformed password' }]],
|
||||
} as State,
|
||||
logger: mockLogger.get(),
|
||||
model: transitionModel(['LEGACY_DELETE', 'FATAL']),
|
||||
|
@ -257,6 +260,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
kibana: {
|
||||
migrationState: {
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: 1e8,
|
||||
controlState: 'LEGACY_DELETE',
|
||||
currentAlias: '.my-so-index',
|
||||
excludeFromUpgradeFilterHooks: {},
|
||||
|
@ -270,7 +274,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
message: 'Log from LEGACY_DELETE control state',
|
||||
},
|
||||
],
|
||||
outdatedDocuments: ['1234'],
|
||||
outdatedDocuments: [{ _id: '1234' }],
|
||||
outdatedDocumentsQuery: expect.any(Object),
|
||||
preMigrationScript: {
|
||||
_tag: 'None',
|
||||
|
@ -284,6 +288,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
},
|
||||
tempIndex: '.my-so-index_7.11.0_reindex_temp',
|
||||
tempIndexMappings: expect.any(Object),
|
||||
transformedDocBatches: [[{ _id: '1234' }]],
|
||||
unusedTypesQuery: expect.any(Object),
|
||||
versionAlias: '.my-so-index_7.11.0',
|
||||
versionIndex: '.my-so-index_7.11.0_001',
|
||||
|
@ -304,6 +309,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
kibana: {
|
||||
migrationState: {
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: 1e8,
|
||||
controlState: 'FATAL',
|
||||
currentAlias: '.my-so-index',
|
||||
excludeFromUpgradeFilterHooks: {},
|
||||
|
@ -321,7 +327,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
message: 'Log from FATAL control state',
|
||||
},
|
||||
],
|
||||
outdatedDocuments: ['1234'],
|
||||
outdatedDocuments: [{ _id: '1234' }],
|
||||
outdatedDocumentsQuery: expect.any(Object),
|
||||
preMigrationScript: {
|
||||
_tag: 'None',
|
||||
|
@ -335,6 +341,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
},
|
||||
tempIndex: '.my-so-index_7.11.0_reindex_temp',
|
||||
tempIndexMappings: expect.any(Object),
|
||||
transformedDocBatches: [[{ _id: '1234' }]],
|
||||
unusedTypesQuery: expect.any(Object),
|
||||
versionAlias: '.my-so-index_7.11.0',
|
||||
versionIndex: '.my-so-index_7.11.0_001',
|
||||
|
@ -447,6 +454,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
kibana: {
|
||||
migrationState: {
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: 1e8,
|
||||
controlState: 'LEGACY_REINDEX',
|
||||
currentAlias: '.my-so-index',
|
||||
excludeFromUpgradeFilterHooks: {},
|
||||
|
@ -474,6 +482,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
},
|
||||
tempIndex: '.my-so-index_7.11.0_reindex_temp',
|
||||
tempIndexMappings: expect.any(Object),
|
||||
transformedDocBatches: [],
|
||||
unusedTypesQuery: expect.any(Object),
|
||||
versionAlias: '.my-so-index_7.11.0',
|
||||
versionIndex: '.my-so-index_7.11.0_001',
|
||||
|
@ -488,6 +497,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
kibana: {
|
||||
migrationState: {
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: 1e8,
|
||||
controlState: 'LEGACY_DELETE',
|
||||
currentAlias: '.my-so-index',
|
||||
excludeFromUpgradeFilterHooks: {},
|
||||
|
@ -519,6 +529,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
},
|
||||
tempIndex: '.my-so-index_7.11.0_reindex_temp',
|
||||
tempIndexMappings: expect.any(Object),
|
||||
transformedDocBatches: [],
|
||||
unusedTypesQuery: expect.any(Object),
|
||||
versionAlias: '.my-so-index_7.11.0',
|
||||
versionIndex: '.my-so-index_7.11.0_001',
|
||||
|
|
|
@ -13,7 +13,8 @@ import type { ElasticsearchClient } from '../../elasticsearch';
|
|||
import { getErrorMessage, getRequestDebugMeta } from '../../elasticsearch';
|
||||
import { Model, Next, stateActionMachine } from './state_action_machine';
|
||||
import { cleanup } from './migrations_state_machine_cleanup';
|
||||
import { State } from './types';
|
||||
import { ReindexSourceToTempIndex, ReindexSourceToTempIndexBulk, State } from './types';
|
||||
import { SavedObjectsRawDoc } from '../serialization';
|
||||
|
||||
interface StateLogMeta extends LogMeta {
|
||||
kibana: {
|
||||
|
@ -140,11 +141,22 @@ export async function migrationStateActionMachine({
|
|||
const newState = model(state, res);
|
||||
// Redact the state to reduce the memory consumption and so that we
|
||||
// don't log sensitive information inside documents by only keeping
|
||||
// the _id's of outdatedDocuments
|
||||
// the _id's of documents
|
||||
const redactedNewState = {
|
||||
...newState,
|
||||
// @ts-expect-error outdatedDocuments don't exist in all states
|
||||
...{ outdatedDocuments: (newState.outdatedDocuments ?? []).map((doc) => doc._id) },
|
||||
...{
|
||||
outdatedDocuments: ((newState as ReindexSourceToTempIndex).outdatedDocuments ?? []).map(
|
||||
(doc) =>
|
||||
({
|
||||
_id: doc._id,
|
||||
} as SavedObjectsRawDoc)
|
||||
),
|
||||
},
|
||||
...{
|
||||
transformedDocBatches: (
|
||||
(newState as ReindexSourceToTempIndexBulk).transformedDocBatches ?? []
|
||||
).map((batches) => batches.map((doc) => ({ _id: doc._id }))) as [SavedObjectsRawDoc[]],
|
||||
},
|
||||
};
|
||||
executionLog.push({
|
||||
type: 'transition',
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import { SavedObjectsRawDoc } from '../../serialization';
|
||||
import { createBatches } from './create_batches';
|
||||
|
||||
describe('createBatches', () => {
|
||||
const DOCUMENT_SIZE_BYTES = 128;
|
||||
const INDEX = '.kibana_version_index';
|
||||
it('returns right one batch if all documents fit in maxBatchSizeBytes', () => {
|
||||
const documents = [
|
||||
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title ¹' } },
|
||||
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title ²' } },
|
||||
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title ®' } },
|
||||
];
|
||||
|
||||
expect(createBatches(documents, INDEX, DOCUMENT_SIZE_BYTES * 3)).toEqual(
|
||||
Either.right([documents])
|
||||
);
|
||||
});
|
||||
it('creates multiple batches with each batch limited to maxBatchSizeBytes', () => {
|
||||
const documents = [
|
||||
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title ¹' } },
|
||||
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title ²' } },
|
||||
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title ®' } },
|
||||
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title 44' } },
|
||||
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title 55' } },
|
||||
];
|
||||
expect(createBatches(documents, INDEX, DOCUMENT_SIZE_BYTES * 2)).toEqual(
|
||||
Either.right([[documents[0], documents[1]], [documents[2], documents[3]], [documents[4]]])
|
||||
);
|
||||
});
|
||||
it('creates a single empty batch if there are no documents', () => {
|
||||
const documents = [] as SavedObjectsRawDoc[];
|
||||
expect(createBatches(documents, INDEX, 100)).toEqual(Either.right([[]]));
|
||||
});
|
||||
it('throws if any one document exceeds the maxBatchSizeBytes', () => {
|
||||
const documents = [
|
||||
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title ¹' } },
|
||||
{
|
||||
_id: '',
|
||||
_source: {
|
||||
type: 'dashboard',
|
||||
title: 'my saved object title ² with a very long title that exceeds max size bytes',
|
||||
},
|
||||
},
|
||||
{ _id: '', _source: { type: 'dashboard', title: 'my saved object title ®' } },
|
||||
];
|
||||
expect(createBatches(documents, INDEX, 178)).toEqual(
|
||||
Either.left({
|
||||
maxBatchSizeBytes: 178,
|
||||
docSizeBytes: 179,
|
||||
type: 'document_exceeds_batch_size_bytes',
|
||||
document: documents[1],
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import { SavedObjectsRawDoc } from '../..';
|
||||
import { createBulkOperationBody } from '../actions/bulk_overwrite_transformed_documents';
|
||||
|
||||
/**
|
||||
* Creates batches of documents to be used by the bulk API. Each batch will
|
||||
* have a request body content length that's <= maxBatchSizeBytes
|
||||
*/
|
||||
export function createBatches(
|
||||
docs: SavedObjectsRawDoc[],
|
||||
index: string,
|
||||
maxBatchSizeBytes: number
|
||||
) {
|
||||
/* To build up the NDJSON request body we construct an array of objects like:
|
||||
* [
|
||||
* {"index": ...}
|
||||
* {"title": "my saved object"}
|
||||
* ...
|
||||
* ]
|
||||
* However, when we call JSON.stringify on this array the resulting string
|
||||
* will be surrounded by `[]` which won't be present in the NDJSON so these
|
||||
* two characters need to be removed from the size calculation.
|
||||
*/
|
||||
const BRACKETS_BYTES = 2;
|
||||
/* Each document in the NDJSON (including the last one) needs to be
|
||||
* terminated by a newline, so we need to account for an extra newline
|
||||
* character
|
||||
*/
|
||||
const NDJSON_NEW_LINE_BYTES = 1;
|
||||
|
||||
const batches = [[]] as [SavedObjectsRawDoc[]];
|
||||
let currBatch = 0;
|
||||
let currBatchSizeBytes = 0;
|
||||
for (const doc of docs) {
|
||||
const bulkOperationBody = createBulkOperationBody(doc, index);
|
||||
const docSizeBytes =
|
||||
Buffer.byteLength(JSON.stringify(bulkOperationBody), 'utf8') -
|
||||
BRACKETS_BYTES +
|
||||
NDJSON_NEW_LINE_BYTES;
|
||||
if (docSizeBytes > maxBatchSizeBytes) {
|
||||
return Either.left({
|
||||
type: 'document_exceeds_batch_size_bytes',
|
||||
docSizeBytes,
|
||||
maxBatchSizeBytes,
|
||||
document: doc,
|
||||
});
|
||||
} else if (currBatchSizeBytes + docSizeBytes <= maxBatchSizeBytes) {
|
||||
batches[currBatch].push(doc);
|
||||
currBatchSizeBytes = currBatchSizeBytes + docSizeBytes;
|
||||
} else {
|
||||
currBatch++;
|
||||
batches[currBatch] = [doc];
|
||||
currBatchSizeBytes = docSizeBytes;
|
||||
}
|
||||
}
|
||||
|
||||
return Either.right(batches);
|
||||
}
|
|
@ -58,6 +58,7 @@ describe('migrations v2 model', () => {
|
|||
retryDelay: 0,
|
||||
retryAttempts: 15,
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: 1e8,
|
||||
indexPrefix: '.kibana',
|
||||
outdatedDocumentsQuery: {},
|
||||
targetIndexMappings: {
|
||||
|
@ -1065,6 +1066,8 @@ describe('migrations v2 model', () => {
|
|||
});
|
||||
const newState = model(state, res) as ReindexSourceToTempIndexBulk;
|
||||
expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_INDEX_BULK');
|
||||
expect(newState.currentBatch).toEqual(0);
|
||||
expect(newState.transformedDocBatches).toEqual([processedDocs]);
|
||||
expect(newState.progress.processed).toBe(0); // Result of `(undefined ?? 0) + corruptDocumentsId.length`
|
||||
});
|
||||
|
||||
|
@ -1119,16 +1122,19 @@ describe('migrations v2 model', () => {
|
|||
});
|
||||
|
||||
describe('REINDEX_SOURCE_TO_TEMP_INDEX_BULK', () => {
|
||||
const transformedDocs = [
|
||||
{
|
||||
_id: 'a:b',
|
||||
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
|
||||
},
|
||||
] as SavedObjectsRawDoc[];
|
||||
const transformedDocBatches = [
|
||||
[
|
||||
{
|
||||
_id: 'a:b',
|
||||
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
|
||||
},
|
||||
],
|
||||
] as [SavedObjectsRawDoc[]];
|
||||
const reindexSourceToTempIndexBulkState: ReindexSourceToTempIndexBulk = {
|
||||
...baseState,
|
||||
controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK',
|
||||
transformedDocs,
|
||||
transformedDocBatches,
|
||||
currentBatch: 0,
|
||||
versionIndexReadyActions: Option.none,
|
||||
sourceIndex: Option.some('.kibana') as Option.Some<string>,
|
||||
sourceIndexPitId: 'pit_id',
|
||||
|
@ -1171,7 +1177,7 @@ describe('migrations v2 model', () => {
|
|||
const newState = model(reindexSourceToTempIndexBulkState, res) as FatalState;
|
||||
expect(newState.controlState).toEqual('FATAL');
|
||||
expect(newState.reason).toMatchInlineSnapshot(
|
||||
`"While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana."`
|
||||
`"While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.maxBatchSizeBytes' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option."`
|
||||
);
|
||||
});
|
||||
test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK should throw a throwBadResponse error if action failed', () => {
|
||||
|
@ -1438,7 +1444,8 @@ describe('migrations v2 model', () => {
|
|||
res
|
||||
) as TransformedDocumentsBulkIndex;
|
||||
expect(newState.controlState).toEqual('TRANSFORMED_DOCUMENTS_BULK_INDEX');
|
||||
expect(newState.transformedDocs).toEqual(processedDocs);
|
||||
expect(newState.transformedDocBatches).toEqual([processedDocs]);
|
||||
expect(newState.currentBatch).toEqual(0);
|
||||
expect(newState.retryCount).toEqual(0);
|
||||
expect(newState.retryDelay).toEqual(0);
|
||||
expect(newState.progress.processed).toBe(outdatedDocuments.length);
|
||||
|
@ -1521,16 +1528,31 @@ describe('migrations v2 model', () => {
|
|||
});
|
||||
|
||||
describe('TRANSFORMED_DOCUMENTS_BULK_INDEX', () => {
|
||||
const transformedDocs = [
|
||||
{
|
||||
_id: 'a:b',
|
||||
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
|
||||
},
|
||||
] as SavedObjectsRawDoc[];
|
||||
const transformedDocBatches = [
|
||||
[
|
||||
// batch 0
|
||||
{
|
||||
_id: 'a:b',
|
||||
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
|
||||
},
|
||||
{
|
||||
_id: 'a:c',
|
||||
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
|
||||
},
|
||||
],
|
||||
[
|
||||
// batch 1
|
||||
{
|
||||
_id: 'a:d',
|
||||
_source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] },
|
||||
},
|
||||
],
|
||||
] as SavedObjectsRawDoc[][];
|
||||
const transformedDocumentsBulkIndexState: TransformedDocumentsBulkIndex = {
|
||||
...baseState,
|
||||
controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX',
|
||||
transformedDocs,
|
||||
transformedDocBatches,
|
||||
currentBatch: 0,
|
||||
versionIndexReadyActions: Option.none,
|
||||
sourceIndex: Option.some('.kibana') as Option.Some<string>,
|
||||
targetIndex: '.kibana_7.11.0_001',
|
||||
|
@ -1540,6 +1562,29 @@ describe('migrations v2 model', () => {
|
|||
progress: createInitialProgress(),
|
||||
};
|
||||
|
||||
test('TRANSFORMED_DOCUMENTS_BULK_INDEX -> TRANSFORMED_DOCUMENTS_BULK_INDEX and increments currentBatch if more batches are left', () => {
|
||||
const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.right(
|
||||
'bulk_index_succeeded'
|
||||
);
|
||||
const newState = model(
|
||||
transformedDocumentsBulkIndexState,
|
||||
res
|
||||
) as TransformedDocumentsBulkIndex;
|
||||
expect(newState.controlState).toEqual('TRANSFORMED_DOCUMENTS_BULK_INDEX');
|
||||
expect(newState.currentBatch).toEqual(1);
|
||||
});
|
||||
|
||||
test('TRANSFORMED_DOCUMENTS_BULK_INDEX -> OUTDATED_DOCUMENTS_SEARCH_READ if all batches were written', () => {
|
||||
const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.right(
|
||||
'bulk_index_succeeded'
|
||||
);
|
||||
const newState = model(
|
||||
{ ...transformedDocumentsBulkIndexState, ...{ currentBatch: 1 } },
|
||||
res
|
||||
);
|
||||
expect(newState.controlState).toEqual('OUTDATED_DOCUMENTS_SEARCH_READ');
|
||||
});
|
||||
|
||||
test('TRANSFORMED_DOCUMENTS_BULK_INDEX throws if action returns left index_not_found_exception', () => {
|
||||
const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.left({
|
||||
type: 'index_not_found_exception',
|
||||
|
@ -1570,7 +1615,7 @@ describe('migrations v2 model', () => {
|
|||
const newState = model(transformedDocumentsBulkIndexState, res) as FatalState;
|
||||
expect(newState.controlState).toEqual('FATAL');
|
||||
expect(newState.reason).toMatchInlineSnapshot(
|
||||
`"While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana."`
|
||||
`"While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.maxBatchSizeBytes' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option."`
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -31,6 +31,19 @@ import {
|
|||
throwBadControlState,
|
||||
throwBadResponse,
|
||||
} from './helpers';
|
||||
import { createBatches } from './create_batches';
|
||||
|
||||
const FATAL_REASON_REQUEST_ENTITY_TOO_LARGE = `While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.maxBatchSizeBytes' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option.`;
|
||||
const fatalReasonDocumentExceedsMaxBatchSizeBytes = ({
|
||||
_id,
|
||||
docSizeBytes,
|
||||
maxBatchSizeBytes,
|
||||
}: {
|
||||
_id: string;
|
||||
docSizeBytes: number;
|
||||
maxBatchSizeBytes: number;
|
||||
}) =>
|
||||
`The document with _id "${_id}" is ${docSizeBytes} bytes which exceeds the configured maximum batch size of ${maxBatchSizeBytes} bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.`;
|
||||
|
||||
export const model = (currentState: State, resW: ResponseType<AllActionStates>): State => {
|
||||
// The action response `resW` is weakly typed, the type includes all action
|
||||
|
@ -489,12 +502,30 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
|
||||
if (Either.isRight(res)) {
|
||||
if (stateP.corruptDocumentIds.length === 0 && stateP.transformErrors.length === 0) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK', // handles the actual bulk indexing into temp index
|
||||
transformedDocs: [...res.right.processedDocs],
|
||||
progress,
|
||||
};
|
||||
const batches = createBatches(
|
||||
res.right.processedDocs,
|
||||
stateP.tempIndex,
|
||||
stateP.maxBatchSizeBytes
|
||||
);
|
||||
if (Either.isRight(batches)) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK', // handles the actual bulk indexing into temp index
|
||||
transformedDocBatches: batches.right,
|
||||
currentBatch: 0,
|
||||
progress,
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'FATAL',
|
||||
reason: fatalReasonDocumentExceedsMaxBatchSizeBytes({
|
||||
_id: batches.left.document._id,
|
||||
docSizeBytes: batches.left.docSizeBytes,
|
||||
maxBatchSizeBytes: batches.left.maxBatchSizeBytes,
|
||||
}),
|
||||
};
|
||||
}
|
||||
} else {
|
||||
// we don't have any transform issues with the current batch of outdated docs but
|
||||
// we have carried through previous transformation issues.
|
||||
|
@ -525,13 +556,21 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK') {
|
||||
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
|
||||
if (Either.isRight(res)) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'REINDEX_SOURCE_TO_TEMP_READ',
|
||||
// we're still on the happy path with no transformation failures seen.
|
||||
corruptDocumentIds: [],
|
||||
transformErrors: [],
|
||||
};
|
||||
if (stateP.currentBatch + 1 < stateP.transformedDocBatches.length) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK',
|
||||
currentBatch: stateP.currentBatch + 1,
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'REINDEX_SOURCE_TO_TEMP_READ',
|
||||
// we're still on the happy path with no transformation failures seen.
|
||||
corruptDocumentIds: [],
|
||||
transformErrors: [],
|
||||
};
|
||||
}
|
||||
} else {
|
||||
if (
|
||||
isLeftTypeof(res.left, 'target_index_had_write_block') ||
|
||||
|
@ -548,7 +587,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
return {
|
||||
...stateP,
|
||||
controlState: 'FATAL',
|
||||
reason: `While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana.`,
|
||||
reason: FATAL_REASON_REQUEST_ENTITY_TOO_LARGE,
|
||||
};
|
||||
}
|
||||
throwBadResponse(stateP, res.left);
|
||||
|
@ -677,13 +716,31 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
// we haven't seen corrupt documents or any transformation errors thus far in the migration
|
||||
// index the migrated docs
|
||||
if (stateP.corruptDocumentIds.length === 0 && stateP.transformErrors.length === 0) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX',
|
||||
transformedDocs: [...res.right.processedDocs],
|
||||
hasTransformedDocs: true,
|
||||
progress,
|
||||
};
|
||||
const batches = createBatches(
|
||||
res.right.processedDocs,
|
||||
stateP.targetIndex,
|
||||
stateP.maxBatchSizeBytes
|
||||
);
|
||||
if (Either.isRight(batches)) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX',
|
||||
transformedDocBatches: batches.right,
|
||||
currentBatch: 0,
|
||||
hasTransformedDocs: true,
|
||||
progress,
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'FATAL',
|
||||
reason: fatalReasonDocumentExceedsMaxBatchSizeBytes({
|
||||
_id: batches.left.document._id,
|
||||
docSizeBytes: batches.left.docSizeBytes,
|
||||
maxBatchSizeBytes: batches.left.maxBatchSizeBytes,
|
||||
}),
|
||||
};
|
||||
}
|
||||
} else {
|
||||
// We have seen corrupt documents and/or transformation errors
|
||||
// skip indexing and go straight to reading and transforming more docs
|
||||
|
@ -711,6 +768,13 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
} else if (stateP.controlState === 'TRANSFORMED_DOCUMENTS_BULK_INDEX') {
|
||||
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
|
||||
if (Either.isRight(res)) {
|
||||
if (stateP.currentBatch + 1 < stateP.transformedDocBatches.length) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX',
|
||||
currentBatch: stateP.currentBatch + 1,
|
||||
};
|
||||
}
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ',
|
||||
|
@ -723,7 +787,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
return {
|
||||
...stateP,
|
||||
controlState: 'FATAL',
|
||||
reason: `While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana.`,
|
||||
reason: FATAL_REASON_REQUEST_ENTITY_TOO_LARGE,
|
||||
};
|
||||
} else if (
|
||||
isLeftTypeof(res.left, 'target_index_had_write_block') ||
|
||||
|
|
|
@ -111,7 +111,7 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra
|
|||
Actions.bulkOverwriteTransformedDocuments({
|
||||
client,
|
||||
index: state.tempIndex,
|
||||
transformedDocs: state.transformedDocs,
|
||||
transformedDocs: state.transformedDocBatches[state.currentBatch],
|
||||
/**
|
||||
* Since we don't run a search against the target index, we disable "refresh" to speed up
|
||||
* the migration process.
|
||||
|
@ -160,7 +160,7 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra
|
|||
Actions.bulkOverwriteTransformedDocuments({
|
||||
client,
|
||||
index: state.targetIndex,
|
||||
transformedDocs: state.transformedDocs,
|
||||
transformedDocs: state.transformedDocBatches[state.currentBatch],
|
||||
/**
|
||||
* Since we don't run a search against the target index, we disable "refresh" to speed up
|
||||
* the migration process.
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { retryAsync } from './retry_async';
|
||||
|
||||
describe('retry', () => {
|
||||
it('retries throwing functions until they succeed', async () => {
|
||||
let i = 0;
|
||||
await expect(
|
||||
retryAsync(
|
||||
() => {
|
||||
if (i++ < 2) {
|
||||
return Promise.reject(new Error('boom'));
|
||||
} else {
|
||||
return Promise.resolve('done');
|
||||
}
|
||||
},
|
||||
{ retryAttempts: 3, retryDelayMs: 1 }
|
||||
)
|
||||
).resolves.toEqual('done');
|
||||
});
|
||||
|
||||
it('throws if all attempts are exhausted before success', async () => {
|
||||
let attempts = 0;
|
||||
await expect(() =>
|
||||
retryAsync<Error>(
|
||||
() => {
|
||||
attempts++;
|
||||
return Promise.reject(new Error('boom'));
|
||||
},
|
||||
{ retryAttempts: 3, retryDelayMs: 1 }
|
||||
)
|
||||
).rejects.toMatchInlineSnapshot(`[Error: boom]`);
|
||||
expect(attempts).toEqual(3);
|
||||
});
|
||||
|
||||
it('waits retryDelayMs between each attempt ', async () => {
|
||||
const now = Date.now();
|
||||
let i = 0;
|
||||
await retryAsync(
|
||||
() => {
|
||||
if (i++ < 2) {
|
||||
return Promise.reject(new Error('boom'));
|
||||
} else {
|
||||
return Promise.resolve('done');
|
||||
}
|
||||
},
|
||||
{ retryAttempts: 3, retryDelayMs: 100 }
|
||||
);
|
||||
expect(Date.now() - now).toBeGreaterThanOrEqual(200);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
function delay(delayInMs: number) {
|
||||
return new Promise((resolve) => setTimeout(resolve, delayInMs));
|
||||
}
|
||||
|
||||
export async function retryAsync<T>(
|
||||
fn: () => Promise<T>,
|
||||
options: { retryAttempts: number; retryDelayMs: number }
|
||||
): Promise<T> {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (e) {
|
||||
if (options.retryAttempts > 1) {
|
||||
await delay(options.retryDelayMs);
|
||||
return retryAsync(fn, {
|
||||
retryAttempts: options.retryAttempts - 1,
|
||||
retryDelayMs: options.retryDelayMs,
|
||||
});
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -76,19 +76,31 @@ export interface BaseState extends ControlState {
|
|||
readonly retryAttempts: number;
|
||||
|
||||
/**
|
||||
* The number of documents to fetch from Elasticsearch server to run migration over.
|
||||
* The number of documents to process in each batch. This determines the
|
||||
* maximum number of documents that will be read and written in a single
|
||||
* request.
|
||||
*
|
||||
* The higher the value, the faster the migration process will be performed since it reduces
|
||||
* the number of round trips between Kibana and Elasticsearch servers.
|
||||
* For the migration speed, we have to pay the price of increased memory consumption.
|
||||
* The higher the value, the faster the migration process will be performed
|
||||
* since it reduces the number of round trips between Kibana and
|
||||
* Elasticsearch servers. For the migration speed, we have to pay the price
|
||||
* of increased memory consumption and HTTP payload size.
|
||||
*
|
||||
* Since batchSize defines the number of documents, not their size, it might happen that
|
||||
* Elasticsearch fails a request with circuit_breaking_exception when it retrieves a set of
|
||||
* saved objects of significant size.
|
||||
* Since we cannot control the size in bytes of a batch when reading,
|
||||
* Elasticsearch might fail with a circuit_breaking_exception when it
|
||||
* retrieves a set of saved objects of significant size. In this case, you
|
||||
* should set a smaller batchSize value and restart the migration process
|
||||
* again.
|
||||
*
|
||||
* In this case, you should set a smaller batchSize value and restart the migration process again.
|
||||
* When writing batches, we limit the number of documents in a batch
|
||||
* (batchSize) as well as the size of the batch in bytes (maxBatchSizeBytes).
|
||||
*/
|
||||
readonly batchSize: number;
|
||||
/**
|
||||
* When writing batches, limits the batch size in bytes to ensure that we
|
||||
* don't construct HTTP requests which would exceed Elasticsearch's
|
||||
* http.max_content_length which defaults to 100mb.
|
||||
*/
|
||||
readonly maxBatchSizeBytes: number;
|
||||
readonly logs: MigrationLog[];
|
||||
/**
|
||||
* The current alias e.g. `.kibana` which always points to the latest
|
||||
|
@ -233,7 +245,8 @@ export interface ReindexSourceToTempIndex extends PostInitState {
|
|||
|
||||
export interface ReindexSourceToTempIndexBulk extends PostInitState {
|
||||
readonly controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK';
|
||||
readonly transformedDocs: SavedObjectsRawDoc[];
|
||||
readonly transformedDocBatches: [SavedObjectsRawDoc[]];
|
||||
readonly currentBatch: number;
|
||||
readonly sourceIndexPitId: string;
|
||||
readonly lastHitSortValue: number[] | undefined;
|
||||
readonly progress: Progress;
|
||||
|
@ -318,7 +331,8 @@ export interface TransformedDocumentsBulkIndex extends PostInitState {
|
|||
* Write the up-to-date transformed documents to the target index
|
||||
*/
|
||||
readonly controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX';
|
||||
readonly transformedDocs: SavedObjectsRawDoc[];
|
||||
readonly transformedDocBatches: SavedObjectsRawDoc[][];
|
||||
readonly currentBatch: number;
|
||||
readonly lastHitSortValue: number[] | undefined;
|
||||
readonly hasTransformedDocs: boolean;
|
||||
readonly pitId: string;
|
||||
|
|
|
@ -12,6 +12,7 @@ import type { ConfigDeprecationProvider } from '../config';
|
|||
|
||||
const migrationSchema = schema.object({
|
||||
batchSize: schema.number({ defaultValue: 1_000 }),
|
||||
maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value
|
||||
scrollDuration: schema.string({ defaultValue: '15m' }),
|
||||
pollInterval: schema.number({ defaultValue: 1_500 }),
|
||||
skip: schema.boolean({ defaultValue: false }),
|
||||
|
|
|
@ -108,6 +108,7 @@ kibana_vars=(
|
|||
map.tilemap.options.subdomains
|
||||
map.tilemap.url
|
||||
migrations.batchSize
|
||||
migrations.maxBatchSizeBytes
|
||||
migrations.enableV2
|
||||
migrations.pollInterval
|
||||
migrations.retryAttempts
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue