mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
# Backport This will backport the following commits from `main` to `8.8`: - [Migrations: dynamically adjust batchSize when reading (#157494)](https://github.com/elastic/kibana/pull/157494) <!--- Backport version: 8.9.7 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) <!--BACKPORT [{"author":{"name":"Rudolf Meijering","email":"skaapgif@gmail.com"},"sourceCommit":{"committedDate":"2023-05-30T13:25:07Z","message":"Migrations: dynamically adjust batchSize when reading (#157494)\n\n## Summary\r\n\r\nMigrations read 1000 documents by default which works well for most\r\ndeployments. But if any batch happens to be > ~512MB we hit NodeJS' max\r\nstring length limit and cannot process that batch. This forces users to\r\nreduce the batch size to a smaller number which could severely slow down\r\nmigrations.\r\n\r\nThis PR reduces the impact of large batches by catching\r\nelasticsearch-js' `RequestAbortedError` and reducing the batch size in\r\nhalf. When subsequent batches are successful the batchSize increases by\r\n20%. This means we'll have a sequence like:\r\n\r\n1. Read 1000 docs ✅ (small batch)\r\n2. Read 1000 docs 🔴 (too large batch)\r\n3. Read 500 docs ✅ \r\n4. Read 600 docs ✅ \r\n5. Read 720 docs ✅\r\n6. Read 864 docs ✅\r\n7. Read 1000 docs ✅ (small batch)\r\n\r\nThis assumes that most clusters just have a few large batches exceeding\r\nthe limit. If all batches exceed the limit we'd have 1 failure for every\r\n4 successful reads so we pay a 20% throughput penalty. In such a case it\r\nwould be better to configure a lower `migrations.batchSize`.\r\n\r\nTested this manually:\r\n1. Start ES with more heap than the default, otherwise reading large\r\nbatches will cause it to run out of memory\r\n`ES_JAVA_OPTS=' -Xms6g -Xmx6g' yarn es snapshot\r\n--data-archive=/Users/rudolf/dev/kibana/src/core/server/integration_tests/saved_objects/migrations/archives/8.4.0_with_sample_data_logs.zip`\r\n2. Ingest lots of large documents of ~5mb\r\n ```\r\ncurl -XPUT\r\n\"elastic:changeme@localhost:9200/_security/role/grant_kibana_system_indices\"\r\n-H \"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n { \r\n \"indices\": [ \r\n {\r\n \"names\": [\r\n \".kibana*\"\r\n ],\r\n \"privileges\": [\r\n \"all\"\r\n ],\r\n \"allow_restricted_indices\": true\r\n }\r\n ]\r\n }'\r\n\r\ncurl -XPOST \"elastic:changeme@localhost:9200/_security/user/superuser\"\r\n-H \"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n \"password\" : \"changeme\", \r\n \"roles\" : [ \"superuser\", \"grant_kibana_system_indices\" ]\r\n }'\r\n\r\ncurl -XPUT\r\n\"superuser:changeme@localhost:9200/.kibana_8.4.0_001/_mappings\" -H\r\n\"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n\"dynamic\": false,\r\n \"properties\": {\r\n\r\n }\r\n\r\n }'\r\n\r\n set -B # enable brace expansion\r\n for i in {1..400}; do\r\ncurl -k --data-binary\r\n\"@/Users/rudolf/dev/kibana/src/core/server/integration_tests/saved_objects/migrations/group3/body.json\"\r\n-X PUT\r\n\"http://superuser:changeme@localhost:9200/.kibana_8.4.0_001/_doc/cases-comments:\"{$i}\"?&pretty=true\"\r\n-H \"Content-Type: application/json\"\r\n done\r\n ```\r\n3. Start Kibana with a modest batchSize otherwise we could OOM ES `node\r\nscripts/kibana --dev --migrations.batchSize=120`\r\n\r\n\r\n\r\n<details><summary>Example logs. Note the \"Processed x documents\" only\r\nlogs when the next batch is successfull read, so the order seems wrong.\r\nTo improve it we'd need to log progress after a batch is successfully\r\nwritten instead 🤷 </summary>\r\n```\r\n[.kibana] Processed 120 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3667ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1740ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1376ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1402ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1311ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1388ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 900ms.\r\n[.kibana] Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to 60.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ. took: 1538ms.\r\n[.kibana] Processed 240 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 2054ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1042ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1310ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1388ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 1130ms.\r\n[.kibana] Processed 300 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 2610ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1262ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1299ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1363ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1341ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 572ms.\r\n[.kibana] Processed 372 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3330ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1488ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1349ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1312ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1380ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1310ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 139ms.\r\n[.kibana] Processed 458 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3278ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1460ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1370ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1303ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1384ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 1298ms.\r\n[.kibana] Processed 542 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT. took: 4ms.\r\n```\r\n</details>\r\n### Checklist\r\n\r\nDelete any items that are not applicable to this PR.\r\n\r\n- [ ] Any text added follows [EUI's writing\r\nguidelines](https://elastic.github.io/eui/#/guidelines/writing), uses\r\nsentence case text and includes [i18n\r\nsupport](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)\r\n- [ ]\r\n[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)\r\nwas added for features that require explanation or tutorials\r\n- [ ] [Unit or functional\r\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\r\nwere updated or added to match the most common scenarios\r\n- [ ] Any UI touched in this PR is usable by keyboard only (learn more\r\nabout [keyboard accessibility](https://webaim.org/techniques/keyboard/))\r\n- [ ] Any UI touched in this PR does not create any new axe failures\r\n(run axe in browser:\r\n[FF](https://addons.mozilla.org/en-US/firefox/addon/axe-devtools/),\r\n[Chrome](https://chrome.google.com/webstore/detail/axe-web-accessibility-tes/lhdoppojpmngadmnindnejefpokejbdd?hl=en-US))\r\n- [ ] If a plugin configuration key changed, check if it needs to be\r\nallowlisted in the cloud and added to the [docker\r\nlist](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker)\r\n- [ ] This renders correctly on smaller devices using a responsive\r\nlayout. (You can test this [in your\r\nbrowser](https://www.browserstack.com/guide/responsive-testing-on-local-server))\r\n- [ ] This was checked for [cross-browser\r\ncompatibility](https://www.elastic.co/support/matrix#matrix_browsers)\r\n\r\n\r\n### Risks\r\n\r\n\r\n### For maintainers\r\n\r\n- [ ] This was checked for breaking API changes and was [labeled\r\nappropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)\r\n\r\n---------\r\n\r\nCo-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>\r\nCo-authored-by: Gerard Soldevila <gerard.soldevila@elastic.co>","sha":"094b62a6d6afd30914584e03bb6616e7c2eaec4a","branchLabelMapping":{"^v8.9.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["bug","Team:Core","release_note:fix","Feature:Migrations","backport:prev-minor","v8.9.0","v8.8.1"],"number":157494,"url":"https://github.com/elastic/kibana/pull/157494","mergeCommit":{"message":"Migrations: dynamically adjust batchSize when reading (#157494)\n\n## Summary\r\n\r\nMigrations read 1000 documents by default which works well for most\r\ndeployments. But if any batch happens to be > ~512MB we hit NodeJS' max\r\nstring length limit and cannot process that batch. This forces users to\r\nreduce the batch size to a smaller number which could severely slow down\r\nmigrations.\r\n\r\nThis PR reduces the impact of large batches by catching\r\nelasticsearch-js' `RequestAbortedError` and reducing the batch size in\r\nhalf. When subsequent batches are successful the batchSize increases by\r\n20%. This means we'll have a sequence like:\r\n\r\n1. Read 1000 docs ✅ (small batch)\r\n2. Read 1000 docs 🔴 (too large batch)\r\n3. Read 500 docs ✅ \r\n4. Read 600 docs ✅ \r\n5. Read 720 docs ✅\r\n6. Read 864 docs ✅\r\n7. Read 1000 docs ✅ (small batch)\r\n\r\nThis assumes that most clusters just have a few large batches exceeding\r\nthe limit. If all batches exceed the limit we'd have 1 failure for every\r\n4 successful reads so we pay a 20% throughput penalty. In such a case it\r\nwould be better to configure a lower `migrations.batchSize`.\r\n\r\nTested this manually:\r\n1. Start ES with more heap than the default, otherwise reading large\r\nbatches will cause it to run out of memory\r\n`ES_JAVA_OPTS=' -Xms6g -Xmx6g' yarn es snapshot\r\n--data-archive=/Users/rudolf/dev/kibana/src/core/server/integration_tests/saved_objects/migrations/archives/8.4.0_with_sample_data_logs.zip`\r\n2. Ingest lots of large documents of ~5mb\r\n ```\r\ncurl -XPUT\r\n\"elastic:changeme@localhost:9200/_security/role/grant_kibana_system_indices\"\r\n-H \"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n { \r\n \"indices\": [ \r\n {\r\n \"names\": [\r\n \".kibana*\"\r\n ],\r\n \"privileges\": [\r\n \"all\"\r\n ],\r\n \"allow_restricted_indices\": true\r\n }\r\n ]\r\n }'\r\n\r\ncurl -XPOST \"elastic:changeme@localhost:9200/_security/user/superuser\"\r\n-H \"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n \"password\" : \"changeme\", \r\n \"roles\" : [ \"superuser\", \"grant_kibana_system_indices\" ]\r\n }'\r\n\r\ncurl -XPUT\r\n\"superuser:changeme@localhost:9200/.kibana_8.4.0_001/_mappings\" -H\r\n\"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n\"dynamic\": false,\r\n \"properties\": {\r\n\r\n }\r\n\r\n }'\r\n\r\n set -B # enable brace expansion\r\n for i in {1..400}; do\r\ncurl -k --data-binary\r\n\"@/Users/rudolf/dev/kibana/src/core/server/integration_tests/saved_objects/migrations/group3/body.json\"\r\n-X PUT\r\n\"http://superuser:changeme@localhost:9200/.kibana_8.4.0_001/_doc/cases-comments:\"{$i}\"?&pretty=true\"\r\n-H \"Content-Type: application/json\"\r\n done\r\n ```\r\n3. Start Kibana with a modest batchSize otherwise we could OOM ES `node\r\nscripts/kibana --dev --migrations.batchSize=120`\r\n\r\n\r\n\r\n<details><summary>Example logs. Note the \"Processed x documents\" only\r\nlogs when the next batch is successfull read, so the order seems wrong.\r\nTo improve it we'd need to log progress after a batch is successfully\r\nwritten instead 🤷 </summary>\r\n```\r\n[.kibana] Processed 120 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3667ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1740ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1376ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1402ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1311ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1388ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 900ms.\r\n[.kibana] Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to 60.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ. took: 1538ms.\r\n[.kibana] Processed 240 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 2054ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1042ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1310ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1388ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 1130ms.\r\n[.kibana] Processed 300 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 2610ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1262ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1299ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1363ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1341ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 572ms.\r\n[.kibana] Processed 372 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3330ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1488ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1349ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1312ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1380ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1310ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 139ms.\r\n[.kibana] Processed 458 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3278ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1460ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1370ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1303ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1384ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 1298ms.\r\n[.kibana] Processed 542 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT. took: 4ms.\r\n```\r\n</details>\r\n### Checklist\r\n\r\nDelete any items that are not applicable to this PR.\r\n\r\n- [ ] Any text added follows [EUI's writing\r\nguidelines](https://elastic.github.io/eui/#/guidelines/writing), uses\r\nsentence case text and includes [i18n\r\nsupport](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)\r\n- [ ]\r\n[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)\r\nwas added for features that require explanation or tutorials\r\n- [ ] [Unit or functional\r\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\r\nwere updated or added to match the most common scenarios\r\n- [ ] Any UI touched in this PR is usable by keyboard only (learn more\r\nabout [keyboard accessibility](https://webaim.org/techniques/keyboard/))\r\n- [ ] Any UI touched in this PR does not create any new axe failures\r\n(run axe in browser:\r\n[FF](https://addons.mozilla.org/en-US/firefox/addon/axe-devtools/),\r\n[Chrome](https://chrome.google.com/webstore/detail/axe-web-accessibility-tes/lhdoppojpmngadmnindnejefpokejbdd?hl=en-US))\r\n- [ ] If a plugin configuration key changed, check if it needs to be\r\nallowlisted in the cloud and added to the [docker\r\nlist](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker)\r\n- [ ] This renders correctly on smaller devices using a responsive\r\nlayout. (You can test this [in your\r\nbrowser](https://www.browserstack.com/guide/responsive-testing-on-local-server))\r\n- [ ] This was checked for [cross-browser\r\ncompatibility](https://www.elastic.co/support/matrix#matrix_browsers)\r\n\r\n\r\n### Risks\r\n\r\n\r\n### For maintainers\r\n\r\n- [ ] This was checked for breaking API changes and was [labeled\r\nappropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)\r\n\r\n---------\r\n\r\nCo-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>\r\nCo-authored-by: Gerard Soldevila <gerard.soldevila@elastic.co>","sha":"094b62a6d6afd30914584e03bb6616e7c2eaec4a"}},"sourceBranch":"main","suggestedTargetBranches":["8.8"],"targetPullRequestStates":[{"branch":"main","label":"v8.9.0","labelRegex":"^v8.9.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/157494","number":157494,"mergeCommit":{"message":"Migrations: dynamically adjust batchSize when reading (#157494)\n\n## Summary\r\n\r\nMigrations read 1000 documents by default which works well for most\r\ndeployments. But if any batch happens to be > ~512MB we hit NodeJS' max\r\nstring length limit and cannot process that batch. This forces users to\r\nreduce the batch size to a smaller number which could severely slow down\r\nmigrations.\r\n\r\nThis PR reduces the impact of large batches by catching\r\nelasticsearch-js' `RequestAbortedError` and reducing the batch size in\r\nhalf. When subsequent batches are successful the batchSize increases by\r\n20%. This means we'll have a sequence like:\r\n\r\n1. Read 1000 docs ✅ (small batch)\r\n2. Read 1000 docs 🔴 (too large batch)\r\n3. Read 500 docs ✅ \r\n4. Read 600 docs ✅ \r\n5. Read 720 docs ✅\r\n6. Read 864 docs ✅\r\n7. Read 1000 docs ✅ (small batch)\r\n\r\nThis assumes that most clusters just have a few large batches exceeding\r\nthe limit. If all batches exceed the limit we'd have 1 failure for every\r\n4 successful reads so we pay a 20% throughput penalty. In such a case it\r\nwould be better to configure a lower `migrations.batchSize`.\r\n\r\nTested this manually:\r\n1. Start ES with more heap than the default, otherwise reading large\r\nbatches will cause it to run out of memory\r\n`ES_JAVA_OPTS=' -Xms6g -Xmx6g' yarn es snapshot\r\n--data-archive=/Users/rudolf/dev/kibana/src/core/server/integration_tests/saved_objects/migrations/archives/8.4.0_with_sample_data_logs.zip`\r\n2. Ingest lots of large documents of ~5mb\r\n ```\r\ncurl -XPUT\r\n\"elastic:changeme@localhost:9200/_security/role/grant_kibana_system_indices\"\r\n-H \"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n { \r\n \"indices\": [ \r\n {\r\n \"names\": [\r\n \".kibana*\"\r\n ],\r\n \"privileges\": [\r\n \"all\"\r\n ],\r\n \"allow_restricted_indices\": true\r\n }\r\n ]\r\n }'\r\n\r\ncurl -XPOST \"elastic:changeme@localhost:9200/_security/user/superuser\"\r\n-H \"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n \"password\" : \"changeme\", \r\n \"roles\" : [ \"superuser\", \"grant_kibana_system_indices\" ]\r\n }'\r\n\r\ncurl -XPUT\r\n\"superuser:changeme@localhost:9200/.kibana_8.4.0_001/_mappings\" -H\r\n\"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n\"dynamic\": false,\r\n \"properties\": {\r\n\r\n }\r\n\r\n }'\r\n\r\n set -B # enable brace expansion\r\n for i in {1..400}; do\r\ncurl -k --data-binary\r\n\"@/Users/rudolf/dev/kibana/src/core/server/integration_tests/saved_objects/migrations/group3/body.json\"\r\n-X PUT\r\n\"http://superuser:changeme@localhost:9200/.kibana_8.4.0_001/_doc/cases-comments:\"{$i}\"?&pretty=true\"\r\n-H \"Content-Type: application/json\"\r\n done\r\n ```\r\n3. Start Kibana with a modest batchSize otherwise we could OOM ES `node\r\nscripts/kibana --dev --migrations.batchSize=120`\r\n\r\n\r\n\r\n<details><summary>Example logs. Note the \"Processed x documents\" only\r\nlogs when the next batch is successfull read, so the order seems wrong.\r\nTo improve it we'd need to log progress after a batch is successfully\r\nwritten instead 🤷 </summary>\r\n```\r\n[.kibana] Processed 120 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3667ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1740ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1376ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1402ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1311ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1388ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 900ms.\r\n[.kibana] Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to 60.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ. took: 1538ms.\r\n[.kibana] Processed 240 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 2054ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1042ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1310ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1388ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 1130ms.\r\n[.kibana] Processed 300 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 2610ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1262ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1299ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1363ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1341ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 572ms.\r\n[.kibana] Processed 372 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3330ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1488ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1349ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1312ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1380ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1310ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 139ms.\r\n[.kibana] Processed 458 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3278ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1460ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1370ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1303ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1384ms.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 1298ms.\r\n[.kibana] Processed 542 documents out of 542.\r\n[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT. took: 4ms.\r\n```\r\n</details>\r\n### Checklist\r\n\r\nDelete any items that are not applicable to this PR.\r\n\r\n- [ ] Any text added follows [EUI's writing\r\nguidelines](https://elastic.github.io/eui/#/guidelines/writing), uses\r\nsentence case text and includes [i18n\r\nsupport](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)\r\n- [ ]\r\n[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)\r\nwas added for features that require explanation or tutorials\r\n- [ ] [Unit or functional\r\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\r\nwere updated or added to match the most common scenarios\r\n- [ ] Any UI touched in this PR is usable by keyboard only (learn more\r\nabout [keyboard accessibility](https://webaim.org/techniques/keyboard/))\r\n- [ ] Any UI touched in this PR does not create any new axe failures\r\n(run axe in browser:\r\n[FF](https://addons.mozilla.org/en-US/firefox/addon/axe-devtools/),\r\n[Chrome](https://chrome.google.com/webstore/detail/axe-web-accessibility-tes/lhdoppojpmngadmnindnejefpokejbdd?hl=en-US))\r\n- [ ] If a plugin configuration key changed, check if it needs to be\r\nallowlisted in the cloud and added to the [docker\r\nlist](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker)\r\n- [ ] This renders correctly on smaller devices using a responsive\r\nlayout. (You can test this [in your\r\nbrowser](https://www.browserstack.com/guide/responsive-testing-on-local-server))\r\n- [ ] This was checked for [cross-browser\r\ncompatibility](https://www.elastic.co/support/matrix#matrix_browsers)\r\n\r\n\r\n### Risks\r\n\r\n\r\n### For maintainers\r\n\r\n- [ ] This was checked for breaking API changes and was [labeled\r\nappropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)\r\n\r\n---------\r\n\r\nCo-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>\r\nCo-authored-by: Gerard Soldevila <gerard.soldevila@elastic.co>","sha":"094b62a6d6afd30914584e03bb6616e7c2eaec4a"}},{"branch":"8.8","label":"v8.8.1","labelRegex":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"}]}] BACKPORT--> --------- Co-authored-by: Rudolf Meijering <skaapgif@gmail.com>
This commit is contained in:
parent
a4a3562091
commit
c26fcf79cd
22 changed files with 511 additions and 46 deletions
|
@ -9,6 +9,7 @@
|
|||
import { valid } from 'semver';
|
||||
import { schema, TypeOf } from '@kbn/config-schema';
|
||||
import type { ServiceConfigDescriptor } from '@kbn/core-base-server-internal';
|
||||
import buffer from 'buffer';
|
||||
|
||||
const migrationSchema = schema.object({
|
||||
algorithm: schema.oneOf([schema.literal('v2'), schema.literal('zdt')], {
|
||||
|
@ -16,6 +17,10 @@ const migrationSchema = schema.object({
|
|||
}),
|
||||
batchSize: schema.number({ defaultValue: 1_000 }),
|
||||
maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value
|
||||
maxReadBatchSizeBytes: schema.byteSize({
|
||||
defaultValue: buffer.constants.MAX_STRING_LENGTH,
|
||||
max: buffer.constants.MAX_STRING_LENGTH,
|
||||
}),
|
||||
discardUnknownObjects: schema.maybe(
|
||||
schema.string({
|
||||
validate: (value: string) =>
|
||||
|
|
|
@ -47,6 +47,7 @@ export type {
|
|||
ReindexResponse,
|
||||
UpdateByQueryResponse,
|
||||
UpdateAndPickupMappingsResponse,
|
||||
EsResponseTooLargeError,
|
||||
} from './src/actions';
|
||||
export {
|
||||
isClusterShardLimitExceeded,
|
||||
|
|
|
@ -166,7 +166,9 @@ Object {
|
|||
"message": "Log from LEGACY_REINDEX control state",
|
||||
},
|
||||
],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 100000000,
|
||||
"maxReadBatchSizeBytes": 536870888,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
@ -392,7 +394,9 @@ Object {
|
|||
"message": "Log from LEGACY_DELETE control state",
|
||||
},
|
||||
],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 100000000,
|
||||
"maxReadBatchSizeBytes": 536870888,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
@ -622,7 +626,9 @@ Object {
|
|||
"message": "Log from LEGACY_DELETE control state",
|
||||
},
|
||||
],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 100000000,
|
||||
"maxReadBatchSizeBytes": 536870888,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
@ -856,7 +862,9 @@ Object {
|
|||
"message": "Log from DONE control state",
|
||||
},
|
||||
],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 100000000,
|
||||
"maxReadBatchSizeBytes": 536870888,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
@ -1122,7 +1130,9 @@ Object {
|
|||
"message": "Log from LEGACY_DELETE control state",
|
||||
},
|
||||
],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 100000000,
|
||||
"maxReadBatchSizeBytes": 536870888,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
@ -1359,7 +1369,9 @@ Object {
|
|||
"message": "Log from FATAL control state",
|
||||
},
|
||||
],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 100000000,
|
||||
"maxReadBatchSizeBytes": 536870888,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
|
|
@ -72,7 +72,7 @@ describe('catchRetryableEsClientErrors', () => {
|
|||
});
|
||||
});
|
||||
it('ResponseError with retryable status code', async () => {
|
||||
const statusCodes = [503, 401, 403, 408, 410];
|
||||
const statusCodes = [503, 401, 403, 408, 410, 429];
|
||||
return Promise.all(
|
||||
statusCodes.map(async (status) => {
|
||||
const error = new esErrors.ResponseError(
|
||||
|
|
|
@ -15,6 +15,7 @@ const retryResponseStatuses = [
|
|||
403, // AuthenticationException
|
||||
408, // RequestTimeout
|
||||
410, // Gone
|
||||
429, // TooManyRequests -> ES circuit breaker
|
||||
];
|
||||
|
||||
export interface RetryableEsClientError {
|
||||
|
|
|
@ -146,6 +146,11 @@ export interface RequestEntityTooLargeException {
|
|||
type: 'request_entity_too_large_exception';
|
||||
}
|
||||
|
||||
export interface EsResponseTooLargeError {
|
||||
type: 'es_response_too_large';
|
||||
contentLength: number;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export interface AcknowledgeResponse {
|
||||
acknowledged: boolean;
|
||||
|
@ -168,6 +173,7 @@ export interface ActionErrorTypeMap {
|
|||
index_not_green_timeout: IndexNotGreenTimeout;
|
||||
index_not_yellow_timeout: IndexNotYellowTimeout;
|
||||
cluster_shard_limit_exceeded: ClusterShardLimitExceeded;
|
||||
es_response_too_large: EsResponseTooLargeError;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,23 +32,54 @@ describe('readWithPit', () => {
|
|||
pitId: 'pitId',
|
||||
query: { match_all: {} },
|
||||
batchSize: 10_000,
|
||||
maxResponseSizeBytes: 100_000,
|
||||
})();
|
||||
|
||||
expect(client.search).toHaveBeenCalledTimes(1);
|
||||
expect(client.search).toHaveBeenCalledWith({
|
||||
allow_partial_search_results: false,
|
||||
pit: {
|
||||
id: 'pitId',
|
||||
keep_alive: '10m',
|
||||
expect(client.search).toHaveBeenCalledWith(
|
||||
{
|
||||
allow_partial_search_results: false,
|
||||
pit: {
|
||||
id: 'pitId',
|
||||
keep_alive: '10m',
|
||||
},
|
||||
query: {
|
||||
match_all: {},
|
||||
},
|
||||
search_after: undefined,
|
||||
seq_no_primary_term: undefined,
|
||||
size: 10000,
|
||||
sort: '_shard_doc:asc',
|
||||
track_total_hits: true,
|
||||
},
|
||||
query: {
|
||||
match_all: {},
|
||||
},
|
||||
search_after: undefined,
|
||||
seq_no_primary_term: undefined,
|
||||
size: 10000,
|
||||
sort: '_shard_doc:asc',
|
||||
track_total_hits: true,
|
||||
{ maxResponseSize: 100_000 }
|
||||
);
|
||||
});
|
||||
|
||||
it('returns left es_response_too_large when client throws RequestAbortedError', async () => {
|
||||
// Create a mock client that rejects all methods with a RequestAbortedError
|
||||
// response.
|
||||
const retryableError = new EsErrors.RequestAbortedError(
|
||||
'The content length (536870889) is bigger than the maximum allow string (536870888)'
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
const task = readWithPit({
|
||||
client,
|
||||
pitId: 'pitId',
|
||||
query: { match_all: {} },
|
||||
batchSize: 10_000,
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
} catch (e) {
|
||||
/** ignore */
|
||||
}
|
||||
await expect(task()).resolves.toEqual({
|
||||
_tag: 'Left',
|
||||
left: { contentLength: 536870889, type: 'es_response_too_large' },
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
import * as Either from 'fp-ts/lib/Either';
|
||||
import * as TaskEither from 'fp-ts/lib/TaskEither';
|
||||
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
import { errors as EsErrors } from '@elastic/elasticsearch';
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
|
||||
import {
|
||||
|
@ -16,6 +17,7 @@ import {
|
|||
type RetryableEsClientError,
|
||||
} from './catch_retryable_es_client_errors';
|
||||
import { DEFAULT_PIT_KEEP_ALIVE } from './open_pit';
|
||||
import { EsResponseTooLargeError } from '.';
|
||||
|
||||
/** @internal */
|
||||
export interface ReadWithPit {
|
||||
|
@ -32,6 +34,7 @@ export interface ReadWithPitParams {
|
|||
batchSize: number;
|
||||
searchAfter?: number[];
|
||||
seqNoPrimaryTerm?: boolean;
|
||||
maxResponseSizeBytes?: number;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -45,32 +48,39 @@ export const readWithPit =
|
|||
batchSize,
|
||||
searchAfter,
|
||||
seqNoPrimaryTerm,
|
||||
}: ReadWithPitParams): TaskEither.TaskEither<RetryableEsClientError, ReadWithPit> =>
|
||||
maxResponseSizeBytes,
|
||||
}: ReadWithPitParams): TaskEither.TaskEither<
|
||||
RetryableEsClientError | EsResponseTooLargeError,
|
||||
ReadWithPit
|
||||
> =>
|
||||
() => {
|
||||
return client
|
||||
.search<SavedObjectsRawDoc>({
|
||||
seq_no_primary_term: seqNoPrimaryTerm,
|
||||
// Fail if the index being searched doesn't exist or is closed
|
||||
// allow_no_indices: false,
|
||||
// By default ES returns a 200 with partial results if there are shard
|
||||
// request timeouts or shard failures which can lead to data loss for
|
||||
// migrations
|
||||
allow_partial_search_results: false,
|
||||
// Sort fields are required to use searchAfter so we sort by the
|
||||
// natural order of the index which is the most efficient option
|
||||
// as order is not important for the migration
|
||||
sort: '_shard_doc:asc',
|
||||
pit: { id: pitId, keep_alive: DEFAULT_PIT_KEEP_ALIVE },
|
||||
size: batchSize,
|
||||
search_after: searchAfter,
|
||||
/**
|
||||
* We want to know how many documents we need to process so we can log the progress.
|
||||
* But we also want to increase the performance of these requests,
|
||||
* so we ask ES to report the total count only on the first request (when searchAfter does not exist)
|
||||
*/
|
||||
track_total_hits: typeof searchAfter === 'undefined',
|
||||
query,
|
||||
})
|
||||
.search<SavedObjectsRawDoc>(
|
||||
{
|
||||
seq_no_primary_term: seqNoPrimaryTerm,
|
||||
// Fail if the index being searched doesn't exist or is closed
|
||||
// allow_no_indices: false,
|
||||
// By default ES returns a 200 with partial results if there are shard
|
||||
// request timeouts or shard failures which can lead to data loss for
|
||||
// migrations
|
||||
allow_partial_search_results: false,
|
||||
// Sort fields are required to use searchAfter so we sort by the
|
||||
// natural order of the index which is the most efficient option
|
||||
// as order is not important for the migration
|
||||
sort: '_shard_doc:asc',
|
||||
pit: { id: pitId, keep_alive: DEFAULT_PIT_KEEP_ALIVE },
|
||||
size: batchSize,
|
||||
search_after: searchAfter,
|
||||
/**
|
||||
* We want to know how many documents we need to process so we can log the progress.
|
||||
* But we also want to increase the performance of these requests,
|
||||
* so we ask ES to report the total count only on the first request (when searchAfter does not exist)
|
||||
*/
|
||||
track_total_hits: typeof searchAfter === 'undefined',
|
||||
query,
|
||||
},
|
||||
{ maxResponseSize: maxResponseSizeBytes }
|
||||
)
|
||||
.then((body) => {
|
||||
const totalHits =
|
||||
typeof body.hits.total === 'number'
|
||||
|
@ -93,5 +103,22 @@ export const readWithPit =
|
|||
totalHits,
|
||||
});
|
||||
})
|
||||
.catch((e) => {
|
||||
if (
|
||||
e instanceof EsErrors.RequestAbortedError &&
|
||||
/The content length \(\d+\) is bigger than the maximum/.test(e.message)
|
||||
) {
|
||||
return Either.left({
|
||||
type: 'es_response_too_large' as const,
|
||||
contentLength: Number.parseInt(
|
||||
e.message.match(/The content length \((\d+)\) is bigger than the maximum/)?.[1] ??
|
||||
'-1',
|
||||
10
|
||||
),
|
||||
});
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
})
|
||||
.catch(catchRetryableEsClientErrors);
|
||||
};
|
||||
|
|
|
@ -25,6 +25,7 @@ const migrationsConfig = {
|
|||
retryAttempts: 15,
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: ByteSizeValue.parse('100mb'),
|
||||
maxReadBatchSizeBytes: ByteSizeValue.parse('500mb'),
|
||||
} as unknown as SavedObjectsMigrationConfigType;
|
||||
|
||||
const createInitialStateCommonParams = {
|
||||
|
@ -214,7 +215,9 @@ describe('createInitialState', () => {
|
|||
"knownTypes": Array [],
|
||||
"legacyIndex": ".kibana_task_manager",
|
||||
"logs": Array [],
|
||||
"maxBatchSize": 1000,
|
||||
"maxBatchSizeBytes": 104857600,
|
||||
"maxReadBatchSizeBytes": 524288000,
|
||||
"migrationDocLinks": Object {
|
||||
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
|
||||
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
|
||||
|
|
|
@ -151,7 +151,9 @@ export const createInitialState = ({
|
|||
retryDelay: 0,
|
||||
retryAttempts: migrationsConfig.retryAttempts,
|
||||
batchSize: migrationsConfig.batchSize,
|
||||
maxBatchSize: migrationsConfig.batchSize,
|
||||
maxBatchSizeBytes: migrationsConfig.maxBatchSizeBytes.getValueInBytes(),
|
||||
maxReadBatchSizeBytes: migrationsConfig.maxReadBatchSizeBytes.getValueInBytes(),
|
||||
discardUnknownObjects: migrationsConfig.discardUnknownObjects === kibanaVersion,
|
||||
discardCorruptObjects: migrationsConfig.discardCorruptObjects === kibanaVersion,
|
||||
logs: [],
|
||||
|
|
|
@ -262,9 +262,9 @@ describe('KibanaMigrator', () => {
|
|||
const migrator = new KibanaMigrator(options);
|
||||
migrator.prepareMigrations();
|
||||
await expect(migrator.runMigrations()).rejects.toMatchInlineSnapshot(`
|
||||
[Error: Unable to complete saved object migrations for the [.my-index] index. Error: Reindex failed with the following error:
|
||||
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
|
||||
`);
|
||||
[Error: Unable to complete saved object migrations for the [.my-index] index. Error: Reindex failed with the following error:
|
||||
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
|
||||
`);
|
||||
expect(loggingSystemMock.collect(options.logger).error[0][0]).toMatchInlineSnapshot(`
|
||||
[Error: Reindex failed with the following error:
|
||||
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
|
||||
|
@ -552,6 +552,7 @@ const mockOptions = () => {
|
|||
algorithm: 'v2',
|
||||
batchSize: 20,
|
||||
maxBatchSizeBytes: ByteSizeValue.parse('20mb'),
|
||||
maxReadBatchSizeBytes: new ByteSizeValue(536870888),
|
||||
pollInterval: 20000,
|
||||
scrollDuration: '10m',
|
||||
skip: false,
|
||||
|
|
|
@ -50,6 +50,7 @@ describe('migrationsStateActionMachine', () => {
|
|||
algorithm: 'v2',
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: new ByteSizeValue(1e8),
|
||||
maxReadBatchSizeBytes: new ByteSizeValue(536870888),
|
||||
pollInterval: 0,
|
||||
scrollDuration: '0s',
|
||||
skip: false,
|
||||
|
|
|
@ -17,6 +17,7 @@ import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
|
|||
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
|
||||
import type { AliasAction, FetchIndexResponse } from '../actions';
|
||||
import type { BulkIndexOperationTuple } from './create_batches';
|
||||
import { OutdatedDocumentsSearchRead, ReindexSourceToTempRead } from '../state';
|
||||
|
||||
/** @internal */
|
||||
export type Aliases = Partial<Record<string, string>>;
|
||||
|
@ -285,3 +286,11 @@ export function getMigrationType({
|
|||
*/
|
||||
export const getTempIndexName = (indexPrefix: string, kibanaVersion: string): string =>
|
||||
`${indexPrefix}_${kibanaVersion}_reindex_temp`;
|
||||
|
||||
/** Increase batchSize by 20% until a maximum of maxBatchSize */
|
||||
export const increaseBatchSize = (
|
||||
stateP: OutdatedDocumentsSearchRead | ReindexSourceToTempRead
|
||||
) => {
|
||||
const increasedBatchSize = Math.floor(stateP.batchSize * 1.2);
|
||||
return increasedBatchSize > stateP.maxBatchSize ? stateP.maxBatchSize : increasedBatchSize;
|
||||
};
|
||||
|
|
|
@ -86,7 +86,9 @@ describe('migrations v2 model', () => {
|
|||
retryDelay: 0,
|
||||
retryAttempts: 15,
|
||||
batchSize: 1000,
|
||||
maxBatchSize: 1000,
|
||||
maxBatchSizeBytes: 1e8,
|
||||
maxReadBatchSizeBytes: 1234,
|
||||
discardUnknownObjects: false,
|
||||
discardCorruptObjects: false,
|
||||
indexPrefix: '.kibana',
|
||||
|
@ -1832,6 +1834,8 @@ describe('migrations v2 model', () => {
|
|||
expect(newState.lastHitSortValue).toBe(lastHitSortValue);
|
||||
expect(newState.progress.processed).toBe(undefined);
|
||||
expect(newState.progress.total).toBe(1);
|
||||
expect(newState.maxBatchSize).toBe(1000);
|
||||
expect(newState.batchSize).toBe(1000); // don't increase batchsize above default
|
||||
expect(newState.logs).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
|
@ -1842,6 +1846,83 @@ describe('migrations v2 model', () => {
|
|||
`);
|
||||
});
|
||||
|
||||
it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM increases batchSize if < maxBatchSize', () => {
|
||||
const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }];
|
||||
const lastHitSortValue = [123456];
|
||||
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({
|
||||
outdatedDocuments,
|
||||
lastHitSortValue,
|
||||
totalHits: 1,
|
||||
processedDocs: 1,
|
||||
});
|
||||
let newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(600);
|
||||
newState = model({ ...state, batchSize: 600 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(720);
|
||||
newState = model({ ...state, batchSize: 720 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(864);
|
||||
newState = model({ ...state, batchSize: 864 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(1000); // + 20% would have been 1036
|
||||
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_TRANSFORM');
|
||||
expect(newState.maxBatchSize).toBe(1000);
|
||||
});
|
||||
|
||||
it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ if left es_response_too_large', () => {
|
||||
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
contentLength: 4567,
|
||||
});
|
||||
const newState = model(state, res) as ReindexSourceToTempRead;
|
||||
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_READ');
|
||||
expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set
|
||||
expect(newState.progress.processed).toBe(undefined); // don't increment progress
|
||||
expect(newState.batchSize).toBe(500); // halves the batch size
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.logs).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"level": "warning",
|
||||
"message": "Read a batch with a response content length of 4567 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 500.",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ if left es_response_too_large will not reduce batch size below 1', () => {
|
||||
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
contentLength: 2345,
|
||||
});
|
||||
const newState = model({ ...state, batchSize: 1.5 }, res) as ReindexSourceToTempRead;
|
||||
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_READ');
|
||||
expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set
|
||||
expect(newState.progress.processed).toBe(undefined); // don't increment progress
|
||||
expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.logs).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"level": "warning",
|
||||
"message": "Read a batch with a response content length of 2345 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 1.",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('REINDEX_SOURCE_TO_TEMP_READ -> FATAL if left es_response_too_large and batchSize already 1', () => {
|
||||
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
contentLength: 2345,
|
||||
});
|
||||
const newState = model({ ...state, batchSize: 1 }, res) as FatalState;
|
||||
expect(newState.controlState).toBe('FATAL');
|
||||
expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.reason).toMatchInlineSnapshot(
|
||||
`"After reducing the read batch size to a single document, the Elasticsearch response content length was 2345bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again."`
|
||||
);
|
||||
});
|
||||
|
||||
it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT if no outdated documents to reindex', () => {
|
||||
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({
|
||||
outdatedDocuments: [],
|
||||
|
@ -2304,6 +2385,8 @@ describe('migrations v2 model', () => {
|
|||
expect(newState.lastHitSortValue).toBe(lastHitSortValue);
|
||||
expect(newState.progress.processed).toBe(undefined);
|
||||
expect(newState.progress.total).toBe(10);
|
||||
expect(newState.maxBatchSize).toBe(1000);
|
||||
expect(newState.batchSize).toBe(1000); // don't increase batchsize above default
|
||||
expect(newState.logs).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
|
@ -2345,6 +2428,83 @@ describe('migrations v2 model', () => {
|
|||
`);
|
||||
});
|
||||
|
||||
it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_TRANSFORM increases batchSize up to maxBatchSize', () => {
|
||||
const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }];
|
||||
const lastHitSortValue = [123456];
|
||||
const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.right({
|
||||
outdatedDocuments,
|
||||
lastHitSortValue,
|
||||
totalHits: 1,
|
||||
processedDocs: [],
|
||||
});
|
||||
let newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(600);
|
||||
newState = model({ ...state, batchSize: 600 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(720);
|
||||
newState = model({ ...state, batchSize: 720 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(864);
|
||||
newState = model({ ...state, batchSize: 864 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(1000); // + 20% would have been 1036
|
||||
expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_TRANSFORM');
|
||||
expect(newState.maxBatchSize).toBe(1000);
|
||||
});
|
||||
|
||||
it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_SEARCH_READ if left es_response_too_large', () => {
|
||||
const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
contentLength: 3456,
|
||||
});
|
||||
const newState = model(state, res) as ReindexSourceToTempRead;
|
||||
expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_SEARCH_READ');
|
||||
expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set
|
||||
expect(newState.progress.processed).toBe(undefined); // don't increment progress
|
||||
expect(newState.batchSize).toBe(500); // halves the batch size
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.logs).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"level": "warning",
|
||||
"message": "Read a batch with a response content length of 3456 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 500.",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_SEARCH_READ if left es_response_too_large will not reduce batch size below 1', () => {
|
||||
const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
contentLength: 2345,
|
||||
});
|
||||
const newState = model({ ...state, batchSize: 1.5 }, res) as ReindexSourceToTempRead;
|
||||
expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_SEARCH_READ');
|
||||
expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set
|
||||
expect(newState.progress.processed).toBe(undefined); // don't increment progress
|
||||
expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.logs).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"level": "warning",
|
||||
"message": "Read a batch with a response content length of 2345 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 1.",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('OUTDATED_DOCUMENTS_SEARCH_READ -> FATAL if left es_response_too_large and batchSize already 1', () => {
|
||||
const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
contentLength: 2345,
|
||||
});
|
||||
const newState = model({ ...state, batchSize: 1 }, res) as FatalState;
|
||||
expect(newState.controlState).toBe('FATAL');
|
||||
expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.reason).toMatchInlineSnapshot(
|
||||
`"After reducing the read batch size to a single document, the response content length was 2345 bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again."`
|
||||
);
|
||||
});
|
||||
|
||||
it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT if no outdated documents to transform', () => {
|
||||
const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.right({
|
||||
outdatedDocuments: [],
|
||||
|
|
|
@ -43,6 +43,7 @@ import {
|
|||
versionMigrationCompleted,
|
||||
buildRemoveAliasActions,
|
||||
MigrationType,
|
||||
increaseBatchSize,
|
||||
} from './helpers';
|
||||
import { buildTempIndexMap, createBatches } from './create_batches';
|
||||
import type { MigrationLog } from '../types';
|
||||
|
@ -833,6 +834,8 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
lastHitSortValue: res.right.lastHitSortValue,
|
||||
progress,
|
||||
logs,
|
||||
// We succeeded in reading this batch, so increase the batch size for the next request.
|
||||
batchSize: increaseBatchSize(stateP),
|
||||
};
|
||||
} else {
|
||||
// we don't have any more outdated documents and need to either fail or move on to updating the target mappings.
|
||||
|
@ -875,7 +878,32 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
};
|
||||
}
|
||||
} else {
|
||||
throwBadResponse(stateP, res);
|
||||
const left = res.left;
|
||||
if (isTypeof(left, 'es_response_too_large')) {
|
||||
if (stateP.batchSize === 1) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'FATAL',
|
||||
reason: `After reducing the read batch size to a single document, the Elasticsearch response content length was ${left.contentLength}bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again.`,
|
||||
};
|
||||
} else {
|
||||
const batchSize = Math.max(Math.floor(stateP.batchSize / 2), 1);
|
||||
return {
|
||||
...stateP,
|
||||
batchSize,
|
||||
controlState: 'REINDEX_SOURCE_TO_TEMP_READ',
|
||||
logs: [
|
||||
...stateP.logs,
|
||||
{
|
||||
level: 'warning',
|
||||
message: `Read a batch with a response content length of ${left.contentLength} bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to ${batchSize}.`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
} else {
|
||||
throwBadResponse(stateP, left);
|
||||
}
|
||||
}
|
||||
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT') {
|
||||
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
|
||||
|
@ -1139,6 +1167,8 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
lastHitSortValue: res.right.lastHitSortValue,
|
||||
progress,
|
||||
logs,
|
||||
// We succeeded in reading this batch, so increase the batch size for the next request.
|
||||
batchSize: increaseBatchSize(stateP),
|
||||
};
|
||||
} else {
|
||||
// we don't have any more outdated documents and need to either fail or move on to updating the target mappings.
|
||||
|
@ -1179,7 +1209,32 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
};
|
||||
}
|
||||
} else {
|
||||
throwBadResponse(stateP, res);
|
||||
const left = res.left;
|
||||
if (isTypeof(left, 'es_response_too_large')) {
|
||||
if (stateP.batchSize === 1) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'FATAL',
|
||||
reason: `After reducing the read batch size to a single document, the response content length was ${left.contentLength} bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again.`,
|
||||
};
|
||||
} else {
|
||||
const batchSize = Math.max(Math.floor(stateP.batchSize / 2), 1);
|
||||
return {
|
||||
...stateP,
|
||||
batchSize,
|
||||
controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ',
|
||||
logs: [
|
||||
...stateP.logs,
|
||||
{
|
||||
level: 'warning',
|
||||
message: `Read a batch with a response content length of ${left.contentLength} bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to ${batchSize}.`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
} else {
|
||||
throwBadResponse(stateP, left);
|
||||
}
|
||||
}
|
||||
} else if (stateP.controlState === 'OUTDATED_DOCUMENTS_TRANSFORM') {
|
||||
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
|
||||
|
|
|
@ -219,6 +219,7 @@ export const nextActionMap = (
|
|||
query: state.outdatedDocumentsQuery,
|
||||
batchSize: state.batchSize,
|
||||
searchAfter: state.lastHitSortValue,
|
||||
maxResponseSizeBytes: state.maxReadBatchSizeBytes,
|
||||
}),
|
||||
OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT: (state: OutdatedDocumentsSearchClosePit) =>
|
||||
Actions.closePit({ client, pitId: state.pitId }),
|
||||
|
|
|
@ -63,7 +63,6 @@ export interface BaseState extends ControlState {
|
|||
* max_retry_time = 11.7 minutes
|
||||
*/
|
||||
readonly retryAttempts: number;
|
||||
|
||||
/**
|
||||
* The number of documents to process in each batch. This determines the
|
||||
* maximum number of documents that will be read and written in a single
|
||||
|
@ -83,6 +82,12 @@ export interface BaseState extends ControlState {
|
|||
* When writing batches, we limit the number of documents in a batch
|
||||
* (batchSize) as well as the size of the batch in bytes (maxBatchSizeBytes).
|
||||
*/
|
||||
readonly maxBatchSize: number;
|
||||
/**
|
||||
* The number of documents to process in each batch. Under most circumstances
|
||||
* batchSize == maxBatchSize. But if we fail to read a batch because of a
|
||||
* Nodejs `RangeError` we'll temporarily half `batchSize` and retry.
|
||||
*/
|
||||
readonly batchSize: number;
|
||||
/**
|
||||
* When writing batches, limits the batch size in bytes to ensure that we
|
||||
|
@ -90,6 +95,12 @@ export interface BaseState extends ControlState {
|
|||
* http.max_content_length which defaults to 100mb.
|
||||
*/
|
||||
readonly maxBatchSizeBytes: number;
|
||||
/**
|
||||
* If a read batch exceeds this limit we half the batchSize and retry. By
|
||||
* not JSON.parsing and transforming large batches we can avoid RangeErrors
|
||||
* or Kibana OOMing.
|
||||
*/
|
||||
readonly maxReadBatchSizeBytes: number;
|
||||
readonly logs: MigrationLog[];
|
||||
/**
|
||||
* If saved objects exist which have an unknown type they will cause
|
||||
|
|
|
@ -40,6 +40,7 @@ export const createContextMock = (
|
|||
algorithm: 'zdt',
|
||||
batchSize: 1000,
|
||||
maxBatchSizeBytes: new ByteSizeValue(1e8),
|
||||
maxReadBatchSizeBytes: new ByteSizeValue(1e6),
|
||||
pollInterval: 0,
|
||||
scrollDuration: '0s',
|
||||
skip: false,
|
||||
|
|
|
@ -22,6 +22,7 @@ import {
|
|||
type OpenPitResponse,
|
||||
reindex,
|
||||
readWithPit,
|
||||
type EsResponseTooLargeError,
|
||||
type ReadWithPit,
|
||||
setWriteBlock,
|
||||
updateAliases,
|
||||
|
@ -87,6 +88,7 @@ describe('migration actions', () => {
|
|||
{ _source: { title: 'doc 3' } },
|
||||
{ _source: { title: 'saved object 4', type: 'another_unused_type' } },
|
||||
{ _source: { title: 'f-agent-event 5', type: 'f_agent_event' } },
|
||||
{ _source: { title: new Array(1000).fill('a').join(), type: 'large' } }, // "large" saved object
|
||||
] as unknown as SavedObjectsRawDoc[];
|
||||
await bulkOverwriteTransformedDocuments({
|
||||
client,
|
||||
|
@ -727,6 +729,7 @@ describe('migration actions', () => {
|
|||
expect((results.hits?.hits as SavedObjectsRawDoc[]).map((doc) => doc._source.title).sort())
|
||||
.toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a",
|
||||
"doc 1",
|
||||
"doc 2",
|
||||
"doc 3",
|
||||
|
@ -763,6 +766,7 @@ describe('migration actions', () => {
|
|||
expect((results.hits?.hits as SavedObjectsRawDoc[]).map((doc) => doc._source.title).sort())
|
||||
.toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a",
|
||||
"doc 1",
|
||||
"doc 2",
|
||||
"doc 3",
|
||||
|
@ -792,6 +796,7 @@ describe('migration actions', () => {
|
|||
expect((results.hits?.hits as SavedObjectsRawDoc[]).map((doc) => doc._source.title).sort())
|
||||
.toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a_updated",
|
||||
"doc 1_updated",
|
||||
"doc 2_updated",
|
||||
"doc 3_updated",
|
||||
|
@ -843,6 +848,7 @@ describe('migration actions', () => {
|
|||
expect((results.hits?.hits as SavedObjectsRawDoc[]).map((doc) => doc._source.title).sort())
|
||||
.toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a_updated",
|
||||
"doc 1_updated",
|
||||
"doc 2_updated",
|
||||
"doc 3_updated",
|
||||
|
@ -893,6 +899,7 @@ describe('migration actions', () => {
|
|||
expect((results.hits?.hits as SavedObjectsRawDoc[]).map((doc) => doc._source.title).sort())
|
||||
.toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a_updated",
|
||||
"doc 1",
|
||||
"doc 2",
|
||||
"doc 3_updated",
|
||||
|
@ -1121,7 +1128,7 @@ describe('migration actions', () => {
|
|||
});
|
||||
const docsResponse = (await readWithPitTask()) as Either.Right<ReadWithPit>;
|
||||
|
||||
await expect(docsResponse.right.outdatedDocuments.length).toBe(5);
|
||||
await expect(docsResponse.right.outdatedDocuments.length).toBe(6);
|
||||
});
|
||||
|
||||
it('requests the batchSize of documents from an index', async () => {
|
||||
|
@ -1172,6 +1179,7 @@ describe('migration actions', () => {
|
|||
expect(docsResponse.right.outdatedDocuments.map((doc) => doc._source.title).sort())
|
||||
.toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a",
|
||||
"doc 1",
|
||||
"doc 2",
|
||||
"doc 3",
|
||||
|
@ -1258,6 +1266,36 @@ describe('migration actions', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('returns a left es_response_too_large error when a read batch exceeds the maxResponseSize', async () => {
|
||||
const openPitTask = openPit({ client, index: 'existing_index_with_docs' });
|
||||
const pitResponse = (await openPitTask()) as Either.Right<OpenPitResponse>;
|
||||
|
||||
let readWithPitTask = readWithPit({
|
||||
client,
|
||||
pitId: pitResponse.right.pitId,
|
||||
query: { match_all: {} },
|
||||
batchSize: 1, // small batch size so we don't exceed the maxResponseSize
|
||||
searchAfter: undefined,
|
||||
maxResponseSizeBytes: 500, // set a small size to force the error
|
||||
});
|
||||
const rightResponse = (await readWithPitTask()) as Either.Right<ReadWithPit>;
|
||||
|
||||
await expect(Either.isRight(rightResponse)).toBe(true);
|
||||
|
||||
readWithPitTask = readWithPit({
|
||||
client,
|
||||
pitId: pitResponse.right.pitId,
|
||||
query: { match_all: {} },
|
||||
batchSize: 10, // a bigger batch will exceed the maxResponseSize
|
||||
searchAfter: undefined,
|
||||
maxResponseSizeBytes: 500, // set a small size to force the error
|
||||
});
|
||||
const leftResponse = (await readWithPitTask()) as Either.Left<EsResponseTooLargeError>;
|
||||
|
||||
expect(leftResponse.left.type).toBe('es_response_too_large');
|
||||
expect(leftResponse.left.contentLength).toBe(3184);
|
||||
});
|
||||
|
||||
it('rejects if PIT does not exist', async () => {
|
||||
const readWithPitTask = readWithPit({
|
||||
client,
|
||||
|
|
|
@ -60,6 +60,7 @@ describe('split .kibana index into multiple system indices', () => {
|
|||
beforeAll(async () => {
|
||||
esServer = await startElasticsearch({
|
||||
dataArchive: Path.join(__dirname, '..', 'archives', '7.3.0_xpack_sample_saved_objects.zip'),
|
||||
timeout: 60000,
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import Path from 'path';
|
||||
import fs from 'fs/promises';
|
||||
import { Root } from '@kbn/core-root-server-internal';
|
||||
import {
|
||||
createRootWithCorePlugins,
|
||||
type TestElasticsearchUtils,
|
||||
} from '@kbn/core-test-helpers-kbn-server';
|
||||
import { delay } from '../test_utils';
|
||||
import { startElasticsearch } from '../kibana_migrator_test_kit';
|
||||
|
||||
const logFilePath = Path.join(__dirname, 'read_batch_size.log');
|
||||
|
||||
describe('migration v2 - read batch size', () => {
|
||||
let esServer: TestElasticsearchUtils;
|
||||
let root: Root;
|
||||
let logs: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
esServer = await startElasticsearch({
|
||||
dataArchive: Path.join(__dirname, '..', 'archives', '8.4.0_with_sample_data_logs.zip'),
|
||||
});
|
||||
await fs.unlink(logFilePath).catch(() => {});
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await root?.shutdown();
|
||||
await esServer?.stop();
|
||||
await delay(10);
|
||||
});
|
||||
|
||||
it('reduces the read batchSize in half if a batch exceeds maxReadBatchSizeBytes', async () => {
|
||||
root = createRoot({ maxReadBatchSizeBytes: 15000 });
|
||||
await root.preboot();
|
||||
await root.setup();
|
||||
await root.start();
|
||||
|
||||
// Check for migration steps present in the logs
|
||||
logs = await fs.readFile(logFilePath, 'utf-8');
|
||||
|
||||
expect(logs).toMatch(
|
||||
/Read a batch with a response content length of \d+ bytes which exceeds migrations\.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 15/
|
||||
);
|
||||
expect(logs).toMatch('[.kibana] Migration completed');
|
||||
});
|
||||
|
||||
it('does not reduce the read batchSize in half if no batches exceeded maxReadBatchSizeBytes', async () => {
|
||||
root = createRoot({ maxReadBatchSizeBytes: 50000 });
|
||||
await root.preboot();
|
||||
await root.setup();
|
||||
await root.start();
|
||||
|
||||
// Check for migration steps present in the logs
|
||||
logs = await fs.readFile(logFilePath, 'utf-8');
|
||||
|
||||
expect(logs).not.toMatch('retrying by reducing the batch size in half to');
|
||||
expect(logs).toMatch('[.kibana] Migration completed');
|
||||
});
|
||||
});
|
||||
|
||||
function createRoot({ maxReadBatchSizeBytes }: { maxReadBatchSizeBytes?: number }) {
|
||||
return createRootWithCorePlugins(
|
||||
{
|
||||
migrations: {
|
||||
maxReadBatchSizeBytes,
|
||||
},
|
||||
logging: {
|
||||
appenders: {
|
||||
file: {
|
||||
type: 'file',
|
||||
fileName: logFilePath,
|
||||
layout: {
|
||||
type: 'json',
|
||||
},
|
||||
},
|
||||
},
|
||||
loggers: [
|
||||
{
|
||||
name: 'root',
|
||||
level: 'info',
|
||||
appenders: ['file'],
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
{
|
||||
oss: false,
|
||||
}
|
||||
);
|
||||
}
|
|
@ -84,12 +84,14 @@ export interface KibanaMigratorTestKit {
|
|||
export const startElasticsearch = async ({
|
||||
basePath,
|
||||
dataArchive,
|
||||
timeout,
|
||||
}: {
|
||||
basePath?: string;
|
||||
dataArchive?: string;
|
||||
timeout?: number;
|
||||
} = {}) => {
|
||||
const { startES } = createTestServers({
|
||||
adjustTimeout: (t: number) => jest.setTimeout(t),
|
||||
adjustTimeout: (t: number) => jest.setTimeout(t + (timeout ?? 0)),
|
||||
settings: {
|
||||
es: {
|
||||
license: 'basic',
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue