mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 09:19:04 -04:00
Migrations: dynamically adjust batchSize when reading (#157494)
## Summary Migrations read 1000 documents by default which works well for most deployments. But if any batch happens to be > ~512MB we hit NodeJS' max string length limit and cannot process that batch. This forces users to reduce the batch size to a smaller number which could severely slow down migrations. This PR reduces the impact of large batches by catching elasticsearch-js' `RequestAbortedError` and reducing the batch size in half. When subsequent batches are successful the batchSize increases by 20%. This means we'll have a sequence like: 1. Read 1000 docs ✅ (small batch) 2. Read 1000 docs 🔴 (too large batch) 3. Read 500 docs ✅ 4. Read 600 docs ✅ 5. Read 720 docs ✅ 6. Read 864 docs ✅ 7. Read 1000 docs ✅ (small batch) This assumes that most clusters just have a few large batches exceeding the limit. If all batches exceed the limit we'd have 1 failure for every 4 successful reads so we pay a 20% throughput penalty. In such a case it would be better to configure a lower `migrations.batchSize`. Tested this manually: 1. Start ES with more heap than the default, otherwise reading large batches will cause it to run out of memory `ES_JAVA_OPTS=' -Xms6g -Xmx6g' yarn es snapshot --data-archive=/Users/rudolf/dev/kibana/src/core/server/integration_tests/saved_objects/migrations/archives/8.4.0_with_sample_data_logs.zip` 2. Ingest lots of large documents of ~5mb ``` curl -XPUT "elastic:changeme@localhost:9200/_security/role/grant_kibana_system_indices" -H "kbn-xsrf: reporting" -H "Content-Type: application/json" -d' { "indices": [ { "names": [ ".kibana*" ], "privileges": [ "all" ], "allow_restricted_indices": true } ] }' curl -XPOST "elastic:changeme@localhost:9200/_security/user/superuser" -H "kbn-xsrf: reporting" -H "Content-Type: application/json" -d' { "password" : "changeme", "roles" : [ "superuser", "grant_kibana_system_indices" ] }' curl -XPUT "superuser:changeme@localhost:9200/.kibana_8.4.0_001/_mappings" -H "kbn-xsrf: reporting" -H "Content-Type: application/json" -d' { "dynamic": false, "properties": { } }' set -B # enable brace expansion for i in {1..400}; do curl -k --data-binary "@/Users/rudolf/dev/kibana/src/core/server/integration_tests/saved_objects/migrations/group3/body.json" -X PUT "http://superuser:changeme@localhost:9200/.kibana_8.4.0_001/_doc/cases-comments:"{$i}"?&pretty=true" -H "Content-Type: application/json" done ``` 3. Start Kibana with a modest batchSize otherwise we could OOM ES `node scripts/kibana --dev --migrations.batchSize=120` <details><summary>Example logs. Note the "Processed x documents" only logs when the next batch is successfull read, so the order seems wrong. To improve it we'd need to log progress after a batch is successfully written instead 🤷 </summary> ``` [.kibana] Processed 120 documents out of 542. [.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3667ms. [.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1740ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1376ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1402ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1311ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1388ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 900ms. [.kibana] Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to 60. [.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ. took: 1538ms. [.kibana] Processed 240 documents out of 542. [.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 2054ms. [.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1042ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1310ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1388ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 1130ms. [.kibana] Processed 300 documents out of 542. [.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 2610ms. [.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1262ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1299ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1363ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1341ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 572ms. [.kibana] Processed 372 documents out of 542. [.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3330ms. [.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1488ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1349ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1312ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1380ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1310ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 139ms. [.kibana] Processed 458 documents out of 542. [.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3278ms. [.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1460ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1370ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1303ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1384ms. [.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 1298ms. [.kibana] Processed 542 documents out of 542. [.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT. took: 4ms. ``` </details> ### Checklist Delete any items that are not applicable to this PR. - [ ] Any text added follows [EUI's writing guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses sentence case text and includes [i18n support](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md) - [ ] [Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html) was added for features that require explanation or tutorials - [ ] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios - [ ] Any UI touched in this PR is usable by keyboard only (learn more about [keyboard accessibility](https://webaim.org/techniques/keyboard/)) - [ ] Any UI touched in this PR does not create any new axe failures (run axe in browser: [FF](https://addons.mozilla.org/en-US/firefox/addon/axe-devtools/), [Chrome](https://chrome.google.com/webstore/detail/axe-web-accessibility-tes/lhdoppojpmngadmnindnejefpokejbdd?hl=en-US)) - [ ] If a plugin configuration key changed, check if it needs to be allowlisted in the cloud and added to the [docker list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker) - [ ] This renders correctly on smaller devices using a responsive layout. (You can test this [in your browser](https://www.browserstack.com/guide/responsive-testing-on-local-server)) - [ ] This was checked for [cross-browser compatibility](https://www.elastic.co/support/matrix#matrix_browsers) ### Risks ### For maintainers - [ ] This was checked for breaking API changes and was [labeled appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process) --------- Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Gerard Soldevila <gerard.soldevila@elastic.co>
This commit is contained in:
parent
c06b5efe17
commit
094b62a6d6
22 changed files with 511 additions and 46 deletions
|
@ -9,6 +9,7 @@
|
|||
import { valid } from 'semver';
|
||||
import { schema, TypeOf } from '@kbn/config-schema';
|
||||
import type { ServiceConfigDescriptor } from '@kbn/core-base-server-internal';
|
||||
import buffer from 'buffer';
|
||||
|
||||
const migrationSchema = schema.object({
|
||||
algorithm: schema.oneOf([schema.literal('v2'), schema.literal('zdt')], {
|
||||
|
@ -16,6 +17,10 @@ const migrationSchema = schema.object({
|
|||
}),
|
||||
batchSize: schema.number({ defaultValue: 1_000 }),
|
||||
maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value
|
||||
maxReadBatchSizeBytes: schema.byteSize({
|
||||
defaultValue: buffer.constants.MAX_STRING_LENGTH,
|
||||
max: buffer.constants.MAX_STRING_LENGTH,
|
||||
}),
|
||||
discardUnknownObjects: schema.maybe(
|
||||
schema.string({
|
||||
validate: (value: string) =>
|
||||
|
|
|
@ -47,6 +47,7 @@ export type {
|
|||
ReindexResponse,
|
||||
UpdateByQueryResponse,
|
||||
UpdateAndPickupMappingsResponse,
|
||||
EsResponseTooLargeError,
|
||||
} from './src/actions';
|
||||
export {
|
||||
isClusterShardLimitExceeded,
|
||||
|
|
|
@ -166,7 +166,9 @@ Object {
|
|||
"message": "Log from LEGACY_REINDEX control state",
|
||||
},
|
||||
],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 100000000,
|
||||
"maxReadBatchSizeBytes": 536870888,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
@ -392,7 +394,9 @@ Object {
|
|||
"message": "Log from LEGACY_DELETE control state",
|
||||
},
|
||||
],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 100000000,
|
||||
"maxReadBatchSizeBytes": 536870888,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
@ -622,7 +626,9 @@ Object {
|
|||
"message": "Log from LEGACY_DELETE control state",
|
||||
},
|
||||
],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 100000000,
|
||||
"maxReadBatchSizeBytes": 536870888,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
@ -856,7 +862,9 @@ Object {
|
|||
"message": "Log from DONE control state",
|
||||
},
|
||||
],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 100000000,
|
||||
"maxReadBatchSizeBytes": 536870888,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
@ -1110,7 +1118,9 @@ Object {
|
|||
"message": "Log from LEGACY_DELETE control state",
|
||||
},
|
||||
],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 100000000,
|
||||
"maxReadBatchSizeBytes": 536870888,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
@ -1347,7 +1357,9 @@ Object {
|
|||
"message": "Log from FATAL control state",
|
||||
},
|
||||
],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 100000000,
|
||||
"maxReadBatchSizeBytes": 536870888,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
|
|
@ -72,7 +72,7 @@ describe('catchRetryableEsClientErrors', () => {
|
|||
});
|
||||
});
|
||||
it('ResponseError with retryable status code', async () => {
|
||||
const statusCodes = [503, 401, 403, 408, 410];
|
||||
const statusCodes = [503, 401, 403, 408, 410, 429];
|
||||
return Promise.all(
|
||||
statusCodes.map(async (status) => {
|
||||
const error = new esErrors.ResponseError(
|
||||
|
|
|
@ -15,6 +15,7 @@ const retryResponseStatuses = [
|
|||
403, // AuthenticationException
|
||||
408, // RequestTimeout
|
||||
410, // Gone
|
||||
429, // TooManyRequests -> ES circuit breaker
|
||||
];
|
||||
|
||||
export interface RetryableEsClientError {
|
||||
|
|
|
@ -146,6 +146,11 @@ export interface RequestEntityTooLargeException {
|
|||
type: 'request_entity_too_large_exception';
|
||||
}
|
||||
|
||||
export interface EsResponseTooLargeError {
|
||||
type: 'es_response_too_large';
|
||||
contentLength: number;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export interface AcknowledgeResponse {
|
||||
acknowledged: boolean;
|
||||
|
@ -168,6 +173,7 @@ export interface ActionErrorTypeMap {
|
|||
index_not_green_timeout: IndexNotGreenTimeout;
|
||||
index_not_yellow_timeout: IndexNotYellowTimeout;
|
||||
cluster_shard_limit_exceeded: ClusterShardLimitExceeded;
|
||||
es_response_too_large: EsResponseTooLargeError;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,23 +32,54 @@ describe('readWithPit', () => {
|
|||
pitId: 'pitId',
|
||||
query: { match_all: {} },
|
||||
batchSize: 10_000,
|
||||
maxResponseSizeBytes: 100_000,
|
||||
})();
|
||||
|
||||
expect(client.search).toHaveBeenCalledTimes(1);
|
||||
expect(client.search).toHaveBeenCalledWith({
|
||||
allow_partial_search_results: false,
|
||||
pit: {
|
||||
id: 'pitId',
|
||||
keep_alive: '10m',
|
||||
expect(client.search).toHaveBeenCalledWith(
|
||||
{
|
||||
allow_partial_search_results: false,
|
||||
pit: {
|
||||
id: 'pitId',
|
||||
keep_alive: '10m',
|
||||
},
|
||||
query: {
|
||||
match_all: {},
|
||||
},
|
||||
search_after: undefined,
|
||||
seq_no_primary_term: undefined,
|
||||
size: 10000,
|
||||
sort: '_shard_doc:asc',
|
||||
track_total_hits: true,
|
||||
},
|
||||
query: {
|
||||
match_all: {},
|
||||
},
|
||||
search_after: undefined,
|
||||
seq_no_primary_term: undefined,
|
||||
size: 10000,
|
||||
sort: '_shard_doc:asc',
|
||||
track_total_hits: true,
|
||||
{ maxResponseSize: 100_000 }
|
||||
);
|
||||
});
|
||||
|
||||
it('returns left es_response_too_large when client throws RequestAbortedError', async () => {
|
||||
// Create a mock client that rejects all methods with a RequestAbortedError
|
||||
// response.
|
||||
const retryableError = new EsErrors.RequestAbortedError(
|
||||
'The content length (536870889) is bigger than the maximum allow string (536870888)'
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
const task = readWithPit({
|
||||
client,
|
||||
pitId: 'pitId',
|
||||
query: { match_all: {} },
|
||||
batchSize: 10_000,
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
} catch (e) {
|
||||
/** ignore */
|
||||
}
|
||||
await expect(task()).resolves.toEqual({
|
||||
_tag: 'Left',
|
||||
left: { contentLength: 536870889, type: 'es_response_too_large' },
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
import * as Either from 'fp-ts/lib/Either';
|
||||
import * as TaskEither from 'fp-ts/lib/TaskEither';
|
||||
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
import { errors as EsErrors } from '@elastic/elasticsearch';
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
|
||||
import {
|
||||
|
@ -16,6 +17,7 @@ import {
|
|||
type RetryableEsClientError,
|
||||
} from './catch_retryable_es_client_errors';
|
||||
import { DEFAULT_PIT_KEEP_ALIVE } from './open_pit';
|
||||
import { EsResponseTooLargeError } from '.';
|
||||
|
||||
/** @internal */
|
||||
export interface ReadWithPit {
|
||||
|
@ -32,6 +34,7 @@ export interface ReadWithPitParams {
|
|||
batchSize: number;
|
||||
searchAfter?: number[];
|
||||
seqNoPrimaryTerm?: boolean;
|
||||
maxResponseSizeBytes?: number;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -45,32 +48,39 @@ export const readWithPit =
|
|||
batchSize,
|
||||
searchAfter,
|
||||
seqNoPrimaryTerm,
|
||||
}: ReadWithPitParams): TaskEither.TaskEither<RetryableEsClientError, ReadWithPit> =>
|
||||
maxResponseSizeBytes,
|
||||
}: ReadWithPitParams): TaskEither.TaskEither<
|
||||
RetryableEsClientError | EsResponseTooLargeError,
|
||||
ReadWithPit
|
||||
> =>
|
||||
() => {
|
||||
return client
|
||||
.search<SavedObjectsRawDoc>({
|
||||
seq_no_primary_term: seqNoPrimaryTerm,
|
||||
// Fail if the index being searched doesn't exist or is closed
|
||||
// allow_no_indices: false,
|
||||
// By default ES returns a 200 with partial results if there are shard
|
||||
// request timeouts or shard failures which can lead to data loss for
|
||||
// migrations
|
||||
allow_partial_search_results: false,
|
||||
// Sort fields are required to use searchAfter so we sort by the
|
||||
// natural order of the index which is the most efficient option
|
||||
// as order is not important for the migration
|
||||
sort: '_shard_doc:asc',
|
||||
pit: { id: pitId, keep_alive: DEFAULT_PIT_KEEP_ALIVE },
|
||||
size: batchSize,
|
||||
search_after: searchAfter,
|
||||
/**
|
||||
* We want to know how many documents we need to process so we can log the progress.
|
||||
* But we also want to increase the performance of these requests,
|
||||
* so we ask ES to report the total count only on the first request (when searchAfter does not exist)
|
||||
*/
|
||||
track_total_hits: typeof searchAfter === 'undefined',
|
||||
query,
|
||||
})
|
||||
.search<SavedObjectsRawDoc>(
|
||||
{
|
||||
seq_no_primary_term: seqNoPrimaryTerm,
|
||||
// Fail if the index being searched doesn't exist or is closed
|
||||
// allow_no_indices: false,
|
||||
// By default ES returns a 200 with partial results if there are shard
|
||||
// request timeouts or shard failures which can lead to data loss for
|
||||
// migrations
|
||||
allow_partial_search_results: false,
|
||||
// Sort fields are required to use searchAfter so we sort by the
|
||||
// natural order of the index which is the most efficient option
|
||||
// as order is not important for the migration
|
||||
sort: '_shard_doc:asc',
|
||||
pit: { id: pitId, keep_alive: DEFAULT_PIT_KEEP_ALIVE },
|
||||
size: batchSize,
|
||||
search_after: searchAfter,
|
||||
/**
|
||||
* We want to know how many documents we need to process so we can log the progress.
|
||||
* But we also want to increase the performance of these requests,
|
||||
* so we ask ES to report the total count only on the first request (when searchAfter does not exist)
|
||||
*/
|
||||
track_total_hits: typeof searchAfter === 'undefined',
|
||||
query,
|
||||
},
|
||||
{ maxResponseSize: maxResponseSizeBytes }
|
||||
)
|
||||
.then((body) => {
|
||||
const totalHits =
|
||||
typeof body.hits.total === 'number'
|
||||
|
@ -93,5 +103,22 @@ export const readWithPit =
|
|||
totalHits,
|
||||
});
|
||||
})
|
||||
.catch((e) => {
|
||||
if (
|
||||
e instanceof EsErrors.RequestAbortedError &&
|
||||
/The content length \(\d+\) is bigger than the maximum/.test(e.message)
|
||||
) {
|
||||
return Either.left({
|
||||
type: 'es_response_too_large' as const,
|
||||
contentLength: Number.parseInt(
|
||||
e.message.match(/The content length \((\d+)\) is bigger than the maximum/)?.[1] ??
|
||||
'-1',
|
||||
10
|
||||
),
|
||||
});
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
})
|
||||
.catch(catchRetryableEsClientErrors);
|
||||
};
|
||||
|
|
|
@ -27,6 +27,7 @@ const migrationsConfig = {
|
|||
retryAttempts: 15,
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: ByteSizeValue.parse('100mb'),
|
||||
maxReadBatchSizeBytes: ByteSizeValue.parse('500mb'),
|
||||
} as unknown as SavedObjectsMigrationConfigType;
|
||||
|
||||
const createInitialStateCommonParams = {
|
||||
|
@ -217,7 +218,9 @@ describe('createInitialState', () => {
|
|||
"knownTypes": Array [],
|
||||
"legacyIndex": ".kibana_task_manager",
|
||||
"logs": Array [],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 104857600,
|
||||
"maxReadBatchSizeBytes": 524288000,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
|
|
@ -126,7 +126,9 @@ export const createInitialState = ({
|
|||
retryDelay: 0,
|
||||
retryAttempts: migrationsConfig.retryAttempts,
|
||||
batchSize: migrationsConfig.batchSize,
|
||||
maxBatchSize: migrationsConfig.batchSize,
|
||||
maxBatchSizeBytes: migrationsConfig.maxBatchSizeBytes.getValueInBytes(),
|
||||
maxReadBatchSizeBytes: migrationsConfig.maxReadBatchSizeBytes.getValueInBytes(),
|
||||
discardUnknownObjects: migrationsConfig.discardUnknownObjects === kibanaVersion,
|
||||
discardCorruptObjects: migrationsConfig.discardCorruptObjects === kibanaVersion,
|
||||
logs: [],
|
||||
|
|
|
@ -243,9 +243,9 @@ describe('KibanaMigrator', () => {
|
|||
const migrator = new KibanaMigrator(options);
|
||||
migrator.prepareMigrations();
|
||||
await expect(migrator.runMigrations()).rejects.toMatchInlineSnapshot(`
|
||||
[Error: Unable to complete saved object migrations for the [.my-index] index. Error: Reindex failed with the following error:
|
||||
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
|
||||
`);
|
||||
[Error: Unable to complete saved object migrations for the [.my-index] index. Error: Reindex failed with the following error:
|
||||
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
|
||||
`);
|
||||
expect(loggingSystemMock.collect(options.logger).error[0][0]).toMatchInlineSnapshot(`
|
||||
[Error: Reindex failed with the following error:
|
||||
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
|
||||
|
@ -533,6 +533,7 @@ const mockOptions = () => {
|
|||
algorithm: 'v2',
|
||||
batchSize: 20,
|
||||
maxBatchSizeBytes: ByteSizeValue.parse('20mb'),
|
||||
maxReadBatchSizeBytes: new ByteSizeValue(536870888),
|
||||
pollInterval: 20000,
|
||||
scrollDuration: '10m',
|
||||
skip: false,
|
||||
|
|
|
@ -51,6 +51,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
algorithm: 'v2',
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: new ByteSizeValue(1e8),
|
||||
maxReadBatchSizeBytes: new ByteSizeValue(536870888),
|
||||
pollInterval: 0,
|
||||
scrollDuration: '0s',
|
||||
skip: false,
|
||||
|
|
|
@ -17,6 +17,7 @@ import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
|
|||
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
|
||||
import type { AliasAction, FetchIndexResponse } from '../actions';
|
||||
import type { BulkIndexOperationTuple } from './create_batches';
|
||||
import { OutdatedDocumentsSearchRead, ReindexSourceToTempRead } from '../state';
|
||||
|
||||
/** @internal */
|
||||
export type Aliases = Partial<Record<string, string>>;
|
||||
|
@ -285,3 +286,11 @@ export function getMigrationType({
|
|||
*/
|
||||
export const getTempIndexName = (indexPrefix: string, kibanaVersion: string): string =>
|
||||
`${indexPrefix}_${kibanaVersion}_reindex_temp`;
|
||||
|
||||
/** Increase batchSize by 20% until a maximum of maxBatchSize */
|
||||
export const increaseBatchSize = (
|
||||
stateP: OutdatedDocumentsSearchRead | ReindexSourceToTempRead
|
||||
) => {
|
||||
const increasedBatchSize = Math.floor(stateP.batchSize * 1.2);
|
||||
return increasedBatchSize > stateP.maxBatchSize ? stateP.maxBatchSize : increasedBatchSize;
|
||||
};
|
||||
|
|
|
@ -86,7 +86,9 @@ describe('migrations v2 model', () => {
|
|||
retryDelay: 0,
|
||||
retryAttempts: 15,
|
||||
batchSize: 1000,
|
||||
maxBatchSize: 1000,
|
||||
maxBatchSizeBytes: 1e8,
|
||||
maxReadBatchSizeBytes: 1234,
|
||||
discardUnknownObjects: false,
|
||||
discardCorruptObjects: false,
|
||||
indexPrefix: '.kibana',
|
||||
|
@ -1832,6 +1834,8 @@ describe('migrations v2 model', () => {
|
|||
expect(newState.lastHitSortValue).toBe(lastHitSortValue);
|
||||
expect(newState.progress.processed).toBe(undefined);
|
||||
expect(newState.progress.total).toBe(1);
|
||||
expect(newState.maxBatchSize).toBe(1000);
|
||||
expect(newState.batchSize).toBe(1000); // don't increase batchsize above default
|
||||
expect(newState.logs).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
|
@ -1842,6 +1846,83 @@ describe('migrations v2 model', () => {
|
|||
`);
|
||||
});
|
||||
|
||||
it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM increases batchSize if < maxBatchSize', () => {
|
||||
const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }];
|
||||
const lastHitSortValue = [123456];
|
||||
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({
|
||||
outdatedDocuments,
|
||||
lastHitSortValue,
|
||||
totalHits: 1,
|
||||
processedDocs: 1,
|
||||
});
|
||||
let newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(600);
|
||||
newState = model({ ...state, batchSize: 600 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(720);
|
||||
newState = model({ ...state, batchSize: 720 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(864);
|
||||
newState = model({ ...state, batchSize: 864 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(1000); // + 20% would have been 1036
|
||||
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_TRANSFORM');
|
||||
expect(newState.maxBatchSize).toBe(1000);
|
||||
});
|
||||
|
||||
it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ if left es_response_too_large', () => {
|
||||
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
contentLength: 4567,
|
||||
});
|
||||
const newState = model(state, res) as ReindexSourceToTempRead;
|
||||
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_READ');
|
||||
expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set
|
||||
expect(newState.progress.processed).toBe(undefined); // don't increment progress
|
||||
expect(newState.batchSize).toBe(500); // halves the batch size
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.logs).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"level": "warning",
|
||||
"message": "Read a batch with a response content length of 4567 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 500.",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ if left es_response_too_large will not reduce batch size below 1', () => {
|
||||
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
contentLength: 2345,
|
||||
});
|
||||
const newState = model({ ...state, batchSize: 1.5 }, res) as ReindexSourceToTempRead;
|
||||
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_READ');
|
||||
expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set
|
||||
expect(newState.progress.processed).toBe(undefined); // don't increment progress
|
||||
expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.logs).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"level": "warning",
|
||||
"message": "Read a batch with a response content length of 2345 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 1.",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('REINDEX_SOURCE_TO_TEMP_READ -> FATAL if left es_response_too_large and batchSize already 1', () => {
|
||||
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
contentLength: 2345,
|
||||
});
|
||||
const newState = model({ ...state, batchSize: 1 }, res) as FatalState;
|
||||
expect(newState.controlState).toBe('FATAL');
|
||||
expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.reason).toMatchInlineSnapshot(
|
||||
`"After reducing the read batch size to a single document, the Elasticsearch response content length was 2345bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again."`
|
||||
);
|
||||
});
|
||||
|
||||
it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT if no outdated documents to reindex', () => {
|
||||
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({
|
||||
outdatedDocuments: [],
|
||||
|
@ -2304,6 +2385,8 @@ describe('migrations v2 model', () => {
|
|||
expect(newState.lastHitSortValue).toBe(lastHitSortValue);
|
||||
expect(newState.progress.processed).toBe(undefined);
|
||||
expect(newState.progress.total).toBe(10);
|
||||
expect(newState.maxBatchSize).toBe(1000);
|
||||
expect(newState.batchSize).toBe(1000); // don't increase batchsize above default
|
||||
expect(newState.logs).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
|
@ -2345,6 +2428,83 @@ describe('migrations v2 model', () => {
|
|||
`);
|
||||
});
|
||||
|
||||
it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_TRANSFORM increases batchSize up to maxBatchSize', () => {
|
||||
const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }];
|
||||
const lastHitSortValue = [123456];
|
||||
const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.right({
|
||||
outdatedDocuments,
|
||||
lastHitSortValue,
|
||||
totalHits: 1,
|
||||
processedDocs: [],
|
||||
});
|
||||
let newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(600);
|
||||
newState = model({ ...state, batchSize: 600 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(720);
|
||||
newState = model({ ...state, batchSize: 720 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(864);
|
||||
newState = model({ ...state, batchSize: 864 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(1000); // + 20% would have been 1036
|
||||
expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_TRANSFORM');
|
||||
expect(newState.maxBatchSize).toBe(1000);
|
||||
});
|
||||
|
||||
it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_SEARCH_READ if left es_response_too_large', () => {
|
||||
const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
contentLength: 3456,
|
||||
});
|
||||
const newState = model(state, res) as ReindexSourceToTempRead;
|
||||
expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_SEARCH_READ');
|
||||
expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set
|
||||
expect(newState.progress.processed).toBe(undefined); // don't increment progress
|
||||
expect(newState.batchSize).toBe(500); // halves the batch size
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.logs).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"level": "warning",
|
||||
"message": "Read a batch with a response content length of 3456 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 500.",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_SEARCH_READ if left es_response_too_large will not reduce batch size below 1', () => {
|
||||
const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
contentLength: 2345,
|
||||
});
|
||||
const newState = model({ ...state, batchSize: 1.5 }, res) as ReindexSourceToTempRead;
|
||||
expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_SEARCH_READ');
|
||||
expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set
|
||||
expect(newState.progress.processed).toBe(undefined); // don't increment progress
|
||||
expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.logs).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"level": "warning",
|
||||
"message": "Read a batch with a response content length of 2345 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 1.",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('OUTDATED_DOCUMENTS_SEARCH_READ -> FATAL if left es_response_too_large and batchSize already 1', () => {
|
||||
const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
contentLength: 2345,
|
||||
});
|
||||
const newState = model({ ...state, batchSize: 1 }, res) as FatalState;
|
||||
expect(newState.controlState).toBe('FATAL');
|
||||
expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.reason).toMatchInlineSnapshot(
|
||||
`"After reducing the read batch size to a single document, the response content length was 2345 bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again."`
|
||||
);
|
||||
});
|
||||
|
||||
it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT if no outdated documents to transform', () => {
|
||||
const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.right({
|
||||
outdatedDocuments: [],
|
||||
|
|
|
@ -43,6 +43,7 @@ import {
|
|||
versionMigrationCompleted,
|
||||
buildRemoveAliasActions,
|
||||
MigrationType,
|
||||
increaseBatchSize,
|
||||
} from './helpers';
|
||||
import { buildTempIndexMap, createBatches } from './create_batches';
|
||||
import type { MigrationLog } from '../types';
|
||||
|
@ -833,6 +834,8 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
lastHitSortValue: res.right.lastHitSortValue,
|
||||
progress,
|
||||
logs,
|
||||
// We succeeded in reading this batch, so increase the batch size for the next request.
|
||||
batchSize: increaseBatchSize(stateP),
|
||||
};
|
||||
} else {
|
||||
// we don't have any more outdated documents and need to either fail or move on to updating the target mappings.
|
||||
|
@ -875,7 +878,32 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
};
|
||||
}
|
||||
} else {
|
||||
throwBadResponse(stateP, res);
|
||||
const left = res.left;
|
||||
if (isTypeof(left, 'es_response_too_large')) {
|
||||
if (stateP.batchSize === 1) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'FATAL',
|
||||
reason: `After reducing the read batch size to a single document, the Elasticsearch response content length was ${left.contentLength}bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again.`,
|
||||
};
|
||||
} else {
|
||||
const batchSize = Math.max(Math.floor(stateP.batchSize / 2), 1);
|
||||
return {
|
||||
...stateP,
|
||||
batchSize,
|
||||
controlState: 'REINDEX_SOURCE_TO_TEMP_READ',
|
||||
logs: [
|
||||
...stateP.logs,
|
||||
{
|
||||
level: 'warning',
|
||||
message: `Read a batch with a response content length of ${left.contentLength} bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to ${batchSize}.`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
} else {
|
||||
throwBadResponse(stateP, left);
|
||||
}
|
||||
}
|
||||
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT') {
|
||||
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
|
||||
|
@ -1139,6 +1167,8 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
lastHitSortValue: res.right.lastHitSortValue,
|
||||
progress,
|
||||
logs,
|
||||
// We succeeded in reading this batch, so increase the batch size for the next request.
|
||||
batchSize: increaseBatchSize(stateP),
|
||||
};
|
||||
} else {
|
||||
// we don't have any more outdated documents and need to either fail or move on to updating the target mappings.
|
||||
|
@ -1179,7 +1209,32 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
};
|
||||
}
|
||||
} else {
|
||||
throwBadResponse(stateP, res);
|
||||
const left = res.left;
|
||||
if (isTypeof(left, 'es_response_too_large')) {
|
||||
if (stateP.batchSize === 1) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'FATAL',
|
||||
reason: `After reducing the read batch size to a single document, the response content length was ${left.contentLength} bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again.`,
|
||||
};
|
||||
} else {
|
||||
const batchSize = Math.max(Math.floor(stateP.batchSize / 2), 1);
|
||||
return {
|
||||
...stateP,
|
||||
batchSize,
|
||||
controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ',
|
||||
logs: [
|
||||
...stateP.logs,
|
||||
{
|
||||
level: 'warning',
|
||||
message: `Read a batch with a response content length of ${left.contentLength} bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to ${batchSize}.`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
} else {
|
||||
throwBadResponse(stateP, left);
|
||||
}
|
||||
}
|
||||
} else if (stateP.controlState === 'OUTDATED_DOCUMENTS_TRANSFORM') {
|
||||
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
|
||||
|
|
|
@ -219,6 +219,7 @@ export const nextActionMap = (
|
|||
query: state.outdatedDocumentsQuery,
|
||||
batchSize: state.batchSize,
|
||||
searchAfter: state.lastHitSortValue,
|
||||
maxResponseSizeBytes: state.maxReadBatchSizeBytes,
|
||||
}),
|
||||
OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT: (state: OutdatedDocumentsSearchClosePit) =>
|
||||
Actions.closePit({ client, pitId: state.pitId }),
|
||||
|
|
|
@ -63,7 +63,6 @@ export interface BaseState extends ControlState {
|
|||
* max_retry_time = 11.7 minutes
|
||||
*/
|
||||
readonly retryAttempts: number;
|
||||
|
||||
/**
|
||||
* The number of documents to process in each batch. This determines the
|
||||
* maximum number of documents that will be read and written in a single
|
||||
|
@ -83,6 +82,12 @@ export interface BaseState extends ControlState {
|
|||
* When writing batches, we limit the number of documents in a batch
|
||||
* (batchSize) as well as the size of the batch in bytes (maxBatchSizeBytes).
|
||||
*/
|
||||
readonly maxBatchSize: number;
|
||||
/**
|
||||
* The number of documents to process in each batch. Under most circumstances
|
||||
* batchSize == maxBatchSize. But if we fail to read a batch because of a
|
||||
* Nodejs `RangeError` we'll temporarily half `batchSize` and retry.
|
||||
*/
|
||||
readonly batchSize: number;
|
||||
/**
|
||||
* When writing batches, limits the batch size in bytes to ensure that we
|
||||
|
@ -90,6 +95,12 @@ export interface BaseState extends ControlState {
|
|||
* http.max_content_length which defaults to 100mb.
|
||||
*/
|
||||
readonly maxBatchSizeBytes: number;
|
||||
/**
|
||||
* If a read batch exceeds this limit we half the batchSize and retry. By
|
||||
* not JSON.parsing and transforming large batches we can avoid RangeErrors
|
||||
* or Kibana OOMing.
|
||||
*/
|
||||
readonly maxReadBatchSizeBytes: number;
|
||||
readonly logs: MigrationLog[];
|
||||
/**
|
||||
* If saved objects exist which have an unknown type they will cause
|
||||
|
|
|
@ -31,6 +31,7 @@ export const createMigrationConfigMock = (
|
|||
algorithm: 'zdt',
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: new ByteSizeValue(1e8),
|
||||
maxReadBatchSizeBytes: new ByteSizeValue(1e6),
|
||||
pollInterval: 0,
|
||||
scrollDuration: '0s',
|
||||
skip: false,
|
||||
|
|
|
@ -22,6 +22,7 @@ import {
|
|||
type OpenPitResponse,
|
||||
reindex,
|
||||
readWithPit,
|
||||
type EsResponseTooLargeError,
|
||||
type ReadWithPit,
|
||||
setWriteBlock,
|
||||
updateAliases,
|
||||
|
@ -87,6 +88,7 @@ describe('migration actions', () => {
|
|||
{ _source: { title: 'doc 3' } },
|
||||
{ _source: { title: 'saved object 4', type: 'another_unused_type' } },
|
||||
{ _source: { title: 'f-agent-event 5', type: 'f_agent_event' } },
|
||||
{ _source: { title: new Array(1000).fill('a').join(), type: 'large' } }, // "large" saved object
|
||||
] as unknown as SavedObjectsRawDoc[];
|
||||
await bulkOverwriteTransformedDocuments({
|
||||
client,
|
||||
|
@ -727,6 +729,7 @@ describe('migration actions', () => {
|
|||
expect((results.hits?.hits as SavedObjectsRawDoc[]).map((doc) => doc._source.title).sort())
|
||||
.toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a",
|
||||
"doc 1",
|
||||
"doc 2",
|
||||
"doc 3",
|
||||
|
@ -763,6 +766,7 @@ describe('migration actions', () => {
|
|||
expect((results.hits?.hits as SavedObjectsRawDoc[]).map((doc) => doc._source.title).sort())
|
||||
.toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a",
|
||||
"doc 1",
|
||||
"doc 2",
|
||||
"doc 3",
|
||||
|
@ -792,6 +796,7 @@ describe('migration actions', () => {
|
|||
expect((results.hits?.hits as SavedObjectsRawDoc[]).map((doc) => doc._source.title).sort())
|
||||
.toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a_updated",
|
||||
"doc 1_updated",
|
||||
"doc 2_updated",
|
||||
"doc 3_updated",
|
||||
|
@ -843,6 +848,7 @@ describe('migration actions', () => {
|
|||
expect((results.hits?.hits as SavedObjectsRawDoc[]).map((doc) => doc._source.title).sort())
|
||||
.toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a_updated",
|
||||
"doc 1_updated",
|
||||
"doc 2_updated",
|
||||
"doc 3_updated",
|
||||
|
@ -893,6 +899,7 @@ describe('migration actions', () => {
|
|||
expect((results.hits?.hits as SavedObjectsRawDoc[]).map((doc) => doc._source.title).sort())
|
||||
.toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a_updated",
|
||||
"doc 1",
|
||||
"doc 2",
|
||||
"doc 3_updated",
|
||||
|
@ -1119,7 +1126,7 @@ describe('migration actions', () => {
|
|||
});
|
||||
const docsResponse = (await readWithPitTask()) as Either.Right<ReadWithPit>;
|
||||
|
||||
await expect(docsResponse.right.outdatedDocuments.length).toBe(5);
|
||||
await expect(docsResponse.right.outdatedDocuments.length).toBe(6);
|
||||
});
|
||||
|
||||
it('requests the batchSize of documents from an index', async () => {
|
||||
|
@ -1170,6 +1177,7 @@ describe('migration actions', () => {
|
|||
expect(docsResponse.right.outdatedDocuments.map((doc) => doc._source.title).sort())
|
||||
.toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a",
|
||||
"doc 1",
|
||||
"doc 2",
|
||||
"doc 3",
|
||||
|
@ -1256,6 +1264,36 @@ describe('migration actions', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('returns a left es_response_too_large error when a read batch exceeds the maxResponseSize', async () => {
|
||||
const openPitTask = openPit({ client, index: 'existing_index_with_docs' });
|
||||
const pitResponse = (await openPitTask()) as Either.Right<OpenPitResponse>;
|
||||
|
||||
let readWithPitTask = readWithPit({
|
||||
client,
|
||||
pitId: pitResponse.right.pitId,
|
||||
query: { match_all: {} },
|
||||
batchSize: 1, // small batch size so we don't exceed the maxResponseSize
|
||||
searchAfter: undefined,
|
||||
maxResponseSizeBytes: 500, // set a small size to force the error
|
||||
});
|
||||
const rightResponse = (await readWithPitTask()) as Either.Right<ReadWithPit>;
|
||||
|
||||
await expect(Either.isRight(rightResponse)).toBe(true);
|
||||
|
||||
readWithPitTask = readWithPit({
|
||||
client,
|
||||
pitId: pitResponse.right.pitId,
|
||||
query: { match_all: {} },
|
||||
batchSize: 10, // a bigger batch will exceed the maxResponseSize
|
||||
searchAfter: undefined,
|
||||
maxResponseSizeBytes: 500, // set a small size to force the error
|
||||
});
|
||||
const leftResponse = (await readWithPitTask()) as Either.Left<EsResponseTooLargeError>;
|
||||
|
||||
expect(leftResponse.left.type).toBe('es_response_too_large');
|
||||
expect(leftResponse.left.contentLength).toBe(3184);
|
||||
});
|
||||
|
||||
it('rejects if PIT does not exist', async () => {
|
||||
const readWithPitTask = readWithPit({
|
||||
client,
|
||||
|
|
|
@ -60,6 +60,7 @@ describe('split .kibana index into multiple system indices', () => {
|
|||
beforeAll(async () => {
|
||||
esServer = await startElasticsearch({
|
||||
dataArchive: Path.join(__dirname, '..', 'archives', '7.3.0_xpack_sample_saved_objects.zip'),
|
||||
timeout: 60000,
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import Path from 'path';
|
||||
import fs from 'fs/promises';
|
||||
import { Root } from '@kbn/core-root-server-internal';
|
||||
import {
|
||||
createRootWithCorePlugins,
|
||||
type TestElasticsearchUtils,
|
||||
} from '@kbn/core-test-helpers-kbn-server';
|
||||
import { delay } from '../test_utils';
|
||||
import { startElasticsearch } from '../kibana_migrator_test_kit';
|
||||
|
||||
const logFilePath = Path.join(__dirname, 'read_batch_size.log');
|
||||
|
||||
describe('migration v2 - read batch size', () => {
|
||||
let esServer: TestElasticsearchUtils;
|
||||
let root: Root;
|
||||
let logs: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
esServer = await startElasticsearch({
|
||||
dataArchive: Path.join(__dirname, '..', 'archives', '8.4.0_with_sample_data_logs.zip'),
|
||||
});
|
||||
await fs.unlink(logFilePath).catch(() => {});
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await root?.shutdown();
|
||||
await esServer?.stop();
|
||||
await delay(10);
|
||||
});
|
||||
|
||||
it('reduces the read batchSize in half if a batch exceeds maxReadBatchSizeBytes', async () => {
|
||||
root = createRoot({ maxReadBatchSizeBytes: 15000 });
|
||||
await root.preboot();
|
||||
await root.setup();
|
||||
await root.start();
|
||||
|
||||
// Check for migration steps present in the logs
|
||||
logs = await fs.readFile(logFilePath, 'utf-8');
|
||||
|
||||
expect(logs).toMatch(
|
||||
/Read a batch with a response content length of \d+ bytes which exceeds migrations\.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 15/
|
||||
);
|
||||
expect(logs).toMatch('[.kibana] Migration completed');
|
||||
});
|
||||
|
||||
it('does not reduce the read batchSize in half if no batches exceeded maxReadBatchSizeBytes', async () => {
|
||||
root = createRoot({ maxReadBatchSizeBytes: 50000 });
|
||||
await root.preboot();
|
||||
await root.setup();
|
||||
await root.start();
|
||||
|
||||
// Check for migration steps present in the logs
|
||||
logs = await fs.readFile(logFilePath, 'utf-8');
|
||||
|
||||
expect(logs).not.toMatch('retrying by reducing the batch size in half to');
|
||||
expect(logs).toMatch('[.kibana] Migration completed');
|
||||
});
|
||||
});
|
||||
|
||||
function createRoot({ maxReadBatchSizeBytes }: { maxReadBatchSizeBytes?: number }) {
|
||||
return createRootWithCorePlugins(
|
||||
{
|
||||
migrations: {
|
||||
maxReadBatchSizeBytes,
|
||||
},
|
||||
logging: {
|
||||
appenders: {
|
||||
file: {
|
||||
type: 'file',
|
||||
fileName: logFilePath,
|
||||
layout: {
|
||||
type: 'json',
|
||||
},
|
||||
},
|
||||
},
|
||||
loggers: [
|
||||
{
|
||||
name: 'root',
|
||||
level: 'info',
|
||||
appenders: ['file'],
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
{
|
||||
oss: false,
|
||||
}
|
||||
);
|
||||
}
|
|
@ -87,12 +87,14 @@ export interface KibanaMigratorTestKit {
|
|||
export const startElasticsearch = async ({
|
||||
basePath,
|
||||
dataArchive,
|
||||
timeout,
|
||||
}: {
|
||||
basePath?: string;
|
||||
dataArchive?: string;
|
||||
timeout?: number;
|
||||
} = {}) => {
|
||||
const { startES } = createTestServers({
|
||||
adjustTimeout: (t: number) => jest.setTimeout(t),
|
||||
adjustTimeout: (t: number) => jest.setTimeout(t + (timeout ?? 0)),
|
||||
settings: {
|
||||
es: {
|
||||
license: 'basic',
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue