mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 09:19:04 -04:00
# Backport This will backport the following commits from `main` to `8.8`: - [Make updateAndPickupMappings batch size configurable (#153185)](https://github.com/elastic/kibana/pull/153185) <!--- Backport version: 8.9.7 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) <!--BACKPORT [{"author":{"name":"Rudolf Meijering","email":"skaapgif@gmail.com"},"sourceCommit":{"committedDate":"2023-05-22T12:08:02Z","message":"Make updateAndPickupMappings batch size configurable (#153185)\n\n## Summary\r\n\r\nUse migrations.batchSize config for the scroll_size in update_by_query /\r\nupdateAndPickupMappings. The default scroll_size=1000 can sometimes\r\ncause Elasticsearch to fail with `RecyclerBytesStreamOutput cannot hold\r\nmore than 2GB of data`\r\n\r\nOn CI our Elasticsearch cluster does not have enough memory to reproduce\r\n`RecyclerBytesStreamOutput` error it OOMs before it's able to load 2GB.\r\n\r\nHowever it's possible to test manually:\r\n1. Start Elasticsearch with 8GB heap `ES_JAVA_OPTS=' -Xms8g -Xmx8g' yarn\r\nes snapshot`\r\n2. Ingest > 2GB of saved objects distributed over batchSize documents (<\r\n1000)\r\n ```\r\ncurl -XPOST \"elastic:changeme@localhost:9200/_security/user/superuser\"\r\n-H \"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n \"password\" : \"changeme\", \r\n \"roles\" : [ \"superuser\", \"grant_kibana_system_indices\" ]\r\n }'\r\n\r\ncurl -XPUT\r\n\"superuser:changeme@localhost:9200/.kibana_8.4.0_001/_mappings\" -H\r\n\"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n\"dynamic\": false,\r\n \"properties\": {\r\n\r\n }\r\n\r\n }'\r\n\r\n set -B # enable brace expansion\r\n for i in {1..500}; do\r\ncurl -k --data-binary \"@/Users/rudolf/dev/kibana/body.json\" -X PUT\r\n\"http://superuser:changeme@localhost:9200/.kibana_8.4.0_001/_doc/cases-comments:\"{$i}\"?&pretty=true\"\r\n-H \"Content-Type: application/json\"\r\n done\r\n\r\ncurl -XPOST \"superuser:changeme@localhost:9200/_aliases\" -H \"kbn-xsrf:\r\nreporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n \"actions\": [\r\n {\r\n \"add\": {\r\n \"index\": \".kibana_8.4.0_001\",\r\n \"alias\": \".kibana_8.4.0\"\r\n }\r\n },\r\n {\r\n \"add\": {\r\n \"index\": \".kibana_8.4.0_001\",\r\n \"alias\": \".kibana\"\r\n }\r\n }\r\n ]\r\n }'\r\n ```\r\n body.json\r\n ```\r\n {\r\n \"cases-comments\": {\r\n \"comment\": \"...put lots of data here...\",\r\n \"type\": \"user\",\r\n \"owner\": \"cases\",\r\n \"created_at\": \"2023-05-09T08:07:50.121Z\",\r\n \"created_by\": {\r\n \"email\": null,\r\n \"full_name\": null,\r\n \"username\": \"elastic\"\r\n },\r\n \"pushed_at\": null,\r\n \"pushed_by\": null,\r\n \"updated_at\": null,\r\n \"updated_by\": null\r\n },\r\n \"type\": \"cases-comments\",\r\n \"references\": [\r\n {\r\n \"type\": \"cases\",\r\n \"name\": \"associated-cases\",\r\n \"id\": \"9563b290-ee40-11ed-8fcc-e975e7d47f63\"\r\n }\r\n ],\r\n \"namespaces\": [\r\n \"default\"\r\n ],\r\n \"migrationVersion\": {\r\n \"cases-comments\": \"8.6.0\"\r\n },\r\n \"coreMigrationVersion\": \"8.7.2\",\r\n \"updated_at\": \"2023-05-09T08:07:50.168Z\",\r\n \"created_at\": \"2023-05-09T08:07:50.168Z\"\r\n }\r\n ```\r\n3. Run Kibana with default and smaller migrations.batchSize\r\n\r\n\r\n### Checklist\r\n\r\nDelete any items that are not applicable to this PR.\r\n\r\n- [ ] [Unit or functional\r\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\r\nwere updated or added to match the most common scenarios\r\n\r\n### For maintainers\r\n\r\n- [x] This was checked for breaking API changes and was [labeled\r\nappropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)","sha":"c79c09c3d0edeb31e2b1a9b8b6181fcb7bf9c587","branchLabelMapping":{"^v8.9.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["Team:Core","release_note:skip","backport:skip","Feature:Migrations","backport:prev-minor","Epic:KBNA-7838","v8.9.0"],"number":153185,"url":"https://github.com/elastic/kibana/pull/153185","mergeCommit":{"message":"Make updateAndPickupMappings batch size configurable (#153185)\n\n## Summary\r\n\r\nUse migrations.batchSize config for the scroll_size in update_by_query /\r\nupdateAndPickupMappings. The default scroll_size=1000 can sometimes\r\ncause Elasticsearch to fail with `RecyclerBytesStreamOutput cannot hold\r\nmore than 2GB of data`\r\n\r\nOn CI our Elasticsearch cluster does not have enough memory to reproduce\r\n`RecyclerBytesStreamOutput` error it OOMs before it's able to load 2GB.\r\n\r\nHowever it's possible to test manually:\r\n1. Start Elasticsearch with 8GB heap `ES_JAVA_OPTS=' -Xms8g -Xmx8g' yarn\r\nes snapshot`\r\n2. Ingest > 2GB of saved objects distributed over batchSize documents (<\r\n1000)\r\n ```\r\ncurl -XPOST \"elastic:changeme@localhost:9200/_security/user/superuser\"\r\n-H \"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n \"password\" : \"changeme\", \r\n \"roles\" : [ \"superuser\", \"grant_kibana_system_indices\" ]\r\n }'\r\n\r\ncurl -XPUT\r\n\"superuser:changeme@localhost:9200/.kibana_8.4.0_001/_mappings\" -H\r\n\"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n\"dynamic\": false,\r\n \"properties\": {\r\n\r\n }\r\n\r\n }'\r\n\r\n set -B # enable brace expansion\r\n for i in {1..500}; do\r\ncurl -k --data-binary \"@/Users/rudolf/dev/kibana/body.json\" -X PUT\r\n\"http://superuser:changeme@localhost:9200/.kibana_8.4.0_001/_doc/cases-comments:\"{$i}\"?&pretty=true\"\r\n-H \"Content-Type: application/json\"\r\n done\r\n\r\ncurl -XPOST \"superuser:changeme@localhost:9200/_aliases\" -H \"kbn-xsrf:\r\nreporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n \"actions\": [\r\n {\r\n \"add\": {\r\n \"index\": \".kibana_8.4.0_001\",\r\n \"alias\": \".kibana_8.4.0\"\r\n }\r\n },\r\n {\r\n \"add\": {\r\n \"index\": \".kibana_8.4.0_001\",\r\n \"alias\": \".kibana\"\r\n }\r\n }\r\n ]\r\n }'\r\n ```\r\n body.json\r\n ```\r\n {\r\n \"cases-comments\": {\r\n \"comment\": \"...put lots of data here...\",\r\n \"type\": \"user\",\r\n \"owner\": \"cases\",\r\n \"created_at\": \"2023-05-09T08:07:50.121Z\",\r\n \"created_by\": {\r\n \"email\": null,\r\n \"full_name\": null,\r\n \"username\": \"elastic\"\r\n },\r\n \"pushed_at\": null,\r\n \"pushed_by\": null,\r\n \"updated_at\": null,\r\n \"updated_by\": null\r\n },\r\n \"type\": \"cases-comments\",\r\n \"references\": [\r\n {\r\n \"type\": \"cases\",\r\n \"name\": \"associated-cases\",\r\n \"id\": \"9563b290-ee40-11ed-8fcc-e975e7d47f63\"\r\n }\r\n ],\r\n \"namespaces\": [\r\n \"default\"\r\n ],\r\n \"migrationVersion\": {\r\n \"cases-comments\": \"8.6.0\"\r\n },\r\n \"coreMigrationVersion\": \"8.7.2\",\r\n \"updated_at\": \"2023-05-09T08:07:50.168Z\",\r\n \"created_at\": \"2023-05-09T08:07:50.168Z\"\r\n }\r\n ```\r\n3. Run Kibana with default and smaller migrations.batchSize\r\n\r\n\r\n### Checklist\r\n\r\nDelete any items that are not applicable to this PR.\r\n\r\n- [ ] [Unit or functional\r\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\r\nwere updated or added to match the most common scenarios\r\n\r\n### For maintainers\r\n\r\n- [x] This was checked for breaking API changes and was [labeled\r\nappropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)","sha":"c79c09c3d0edeb31e2b1a9b8b6181fcb7bf9c587"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v8.9.0","labelRegex":"^v8.9.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/153185","number":153185,"mergeCommit":{"message":"Make updateAndPickupMappings batch size configurable (#153185)\n\n## Summary\r\n\r\nUse migrations.batchSize config for the scroll_size in update_by_query /\r\nupdateAndPickupMappings. The default scroll_size=1000 can sometimes\r\ncause Elasticsearch to fail with `RecyclerBytesStreamOutput cannot hold\r\nmore than 2GB of data`\r\n\r\nOn CI our Elasticsearch cluster does not have enough memory to reproduce\r\n`RecyclerBytesStreamOutput` error it OOMs before it's able to load 2GB.\r\n\r\nHowever it's possible to test manually:\r\n1. Start Elasticsearch with 8GB heap `ES_JAVA_OPTS=' -Xms8g -Xmx8g' yarn\r\nes snapshot`\r\n2. Ingest > 2GB of saved objects distributed over batchSize documents (<\r\n1000)\r\n ```\r\ncurl -XPOST \"elastic:changeme@localhost:9200/_security/user/superuser\"\r\n-H \"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n \"password\" : \"changeme\", \r\n \"roles\" : [ \"superuser\", \"grant_kibana_system_indices\" ]\r\n }'\r\n\r\ncurl -XPUT\r\n\"superuser:changeme@localhost:9200/.kibana_8.4.0_001/_mappings\" -H\r\n\"kbn-xsrf: reporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n\"dynamic\": false,\r\n \"properties\": {\r\n\r\n }\r\n\r\n }'\r\n\r\n set -B # enable brace expansion\r\n for i in {1..500}; do\r\ncurl -k --data-binary \"@/Users/rudolf/dev/kibana/body.json\" -X PUT\r\n\"http://superuser:changeme@localhost:9200/.kibana_8.4.0_001/_doc/cases-comments:\"{$i}\"?&pretty=true\"\r\n-H \"Content-Type: application/json\"\r\n done\r\n\r\ncurl -XPOST \"superuser:changeme@localhost:9200/_aliases\" -H \"kbn-xsrf:\r\nreporting\" -H \"Content-Type: application/json\" -d'\r\n {\r\n \"actions\": [\r\n {\r\n \"add\": {\r\n \"index\": \".kibana_8.4.0_001\",\r\n \"alias\": \".kibana_8.4.0\"\r\n }\r\n },\r\n {\r\n \"add\": {\r\n \"index\": \".kibana_8.4.0_001\",\r\n \"alias\": \".kibana\"\r\n }\r\n }\r\n ]\r\n }'\r\n ```\r\n body.json\r\n ```\r\n {\r\n \"cases-comments\": {\r\n \"comment\": \"...put lots of data here...\",\r\n \"type\": \"user\",\r\n \"owner\": \"cases\",\r\n \"created_at\": \"2023-05-09T08:07:50.121Z\",\r\n \"created_by\": {\r\n \"email\": null,\r\n \"full_name\": null,\r\n \"username\": \"elastic\"\r\n },\r\n \"pushed_at\": null,\r\n \"pushed_by\": null,\r\n \"updated_at\": null,\r\n \"updated_by\": null\r\n },\r\n \"type\": \"cases-comments\",\r\n \"references\": [\r\n {\r\n \"type\": \"cases\",\r\n \"name\": \"associated-cases\",\r\n \"id\": \"9563b290-ee40-11ed-8fcc-e975e7d47f63\"\r\n }\r\n ],\r\n \"namespaces\": [\r\n \"default\"\r\n ],\r\n \"migrationVersion\": {\r\n \"cases-comments\": \"8.6.0\"\r\n },\r\n \"coreMigrationVersion\": \"8.7.2\",\r\n \"updated_at\": \"2023-05-09T08:07:50.168Z\",\r\n \"created_at\": \"2023-05-09T08:07:50.168Z\"\r\n }\r\n ```\r\n3. Run Kibana with default and smaller migrations.batchSize\r\n\r\n\r\n### Checklist\r\n\r\nDelete any items that are not applicable to this PR.\r\n\r\n- [ ] [Unit or functional\r\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\r\nwere updated or added to match the most common scenarios\r\n\r\n### For maintainers\r\n\r\n- [x] This was checked for breaking API changes and was [labeled\r\nappropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)","sha":"c79c09c3d0edeb31e2b1a9b8b6181fcb7bf9c587"}}]}] BACKPORT-->
This commit is contained in:
parent
adec917694
commit
45c322aa65
14 changed files with 96 additions and 29 deletions
|
@ -6,11 +6,6 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Batch size for updateByQuery and reindex operations.
|
||||
* Uses the default value of 1000 for Elasticsearch reindex operation.
|
||||
*/
|
||||
export const BATCH_SIZE = 1_000;
|
||||
/**
|
||||
* When a request takes a long time to complete and hits the timeout or the
|
||||
* client aborts that request due to the requestTimeout, our only course of
|
||||
|
|
|
@ -12,7 +12,6 @@ import type { RetryableEsClientError } from './catch_retryable_es_client_errors'
|
|||
import type { DocumentsTransformFailed } from '../core/migrate_raw_docs';
|
||||
|
||||
export {
|
||||
BATCH_SIZE,
|
||||
DEFAULT_TIMEOUT,
|
||||
INDEX_AUTO_EXPAND_REPLICAS,
|
||||
INDEX_NUMBER_OF_SHARDS,
|
||||
|
|
|
@ -29,7 +29,7 @@ describe('pickupUpdatedMappings', () => {
|
|||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
|
||||
const task = pickupUpdatedMappings(client, 'my_index');
|
||||
const task = pickupUpdatedMappings(client, 'my_index', 1000);
|
||||
try {
|
||||
await task();
|
||||
} catch (e) {
|
||||
|
|
|
@ -13,7 +13,6 @@ import {
|
|||
catchRetryableEsClientErrors,
|
||||
type RetryableEsClientError,
|
||||
} from './catch_retryable_es_client_errors';
|
||||
import { BATCH_SIZE } from './constants';
|
||||
|
||||
export interface UpdateByQueryResponse {
|
||||
taskId: string;
|
||||
|
@ -35,7 +34,8 @@ export interface UpdateByQueryResponse {
|
|||
export const pickupUpdatedMappings =
|
||||
(
|
||||
client: ElasticsearchClient,
|
||||
index: string
|
||||
index: string,
|
||||
batchSize: number
|
||||
): TaskEither.TaskEither<RetryableEsClientError, UpdateByQueryResponse> =>
|
||||
() => {
|
||||
return client
|
||||
|
@ -46,7 +46,7 @@ export const pickupUpdatedMappings =
|
|||
allow_no_indices: false,
|
||||
index,
|
||||
// How many documents to update per batch
|
||||
scroll_size: BATCH_SIZE,
|
||||
scroll_size: batchSize,
|
||||
// force a refresh so that we can query the updated index immediately
|
||||
// after the operation completes
|
||||
refresh: true,
|
||||
|
|
|
@ -18,19 +18,18 @@ describe('reindex', () => {
|
|||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
// Create a mock client that rejects all methods with a 503 status code
|
||||
// response.
|
||||
const retryableError = new EsErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
statusCode: 503,
|
||||
body: { error: { type: 'es_type', reason: 'es_reason' } },
|
||||
})
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
|
||||
// Create a mock client that rejects all methods with a 503 status code
|
||||
// response.
|
||||
const retryableError = new EsErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
statusCode: 503,
|
||||
body: { error: { type: 'es_type', reason: 'es_reason' } },
|
||||
})
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
const task = reindex({
|
||||
client,
|
||||
sourceIndex: 'my_source_index',
|
||||
|
@ -38,6 +37,7 @@ describe('reindex', () => {
|
|||
reindexScript: Option.none,
|
||||
requireAlias: false,
|
||||
excludeOnUpgradeQuery: {},
|
||||
batchSize: 1000,
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
|
@ -46,4 +46,47 @@ describe('reindex', () => {
|
|||
}
|
||||
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
|
||||
});
|
||||
|
||||
it('passes options to Elasticsearch client', async () => {
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createSuccessTransportRequestPromise({
|
||||
hits: {
|
||||
total: 0,
|
||||
hits: [],
|
||||
},
|
||||
})
|
||||
);
|
||||
const task = reindex({
|
||||
client,
|
||||
sourceIndex: 'my_source_index',
|
||||
targetIndex: 'my_target_index',
|
||||
reindexScript: Option.some('my script'),
|
||||
requireAlias: false,
|
||||
excludeOnUpgradeQuery: { match_all: {} },
|
||||
batchSize: 99,
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
} catch (e) {
|
||||
/** ignore */
|
||||
}
|
||||
expect(client.reindex).toHaveBeenCalledTimes(1);
|
||||
expect(client.reindex).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
body: {
|
||||
conflicts: 'proceed',
|
||||
source: {
|
||||
index: 'my_source_index',
|
||||
size: 99,
|
||||
query: { match_all: {} },
|
||||
},
|
||||
dest: {
|
||||
index: 'my_target_index',
|
||||
op_type: 'create',
|
||||
},
|
||||
script: { lang: 'painless', source: 'my script' },
|
||||
},
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -15,7 +15,6 @@ import {
|
|||
catchRetryableEsClientErrors,
|
||||
type RetryableEsClientError,
|
||||
} from './catch_retryable_es_client_errors';
|
||||
import { BATCH_SIZE } from './constants';
|
||||
|
||||
/** @internal */
|
||||
export interface ReindexResponse {
|
||||
|
@ -34,6 +33,8 @@ export interface ReindexParams {
|
|||
* index for backup purposes, but won't be available in the upgraded index.
|
||||
*/
|
||||
excludeOnUpgradeQuery: QueryDslQueryContainer;
|
||||
/** Number of documents Elasticsearch will read/write in each batch */
|
||||
batchSize: number;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -52,6 +53,7 @@ export const reindex =
|
|||
reindexScript,
|
||||
requireAlias,
|
||||
excludeOnUpgradeQuery,
|
||||
batchSize,
|
||||
}: ReindexParams): TaskEither.TaskEither<RetryableEsClientError, ReindexResponse> =>
|
||||
() => {
|
||||
return client
|
||||
|
@ -65,7 +67,7 @@ export const reindex =
|
|||
source: {
|
||||
index: sourceIndex,
|
||||
// Set reindex batch size
|
||||
size: BATCH_SIZE,
|
||||
size: batchSize,
|
||||
// Exclude saved object types
|
||||
query: excludeOnUpgradeQuery,
|
||||
},
|
||||
|
|
|
@ -36,6 +36,7 @@ describe('updateAndPickupMappings', () => {
|
|||
client,
|
||||
index: 'new_index',
|
||||
mappings: { properties: {} },
|
||||
batchSize: 1000,
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
|
@ -65,6 +66,7 @@ describe('updateAndPickupMappings', () => {
|
|||
},
|
||||
},
|
||||
},
|
||||
batchSize: 1000,
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
|
|
|
@ -28,6 +28,7 @@ export interface UpdateAndPickupMappingsParams {
|
|||
client: ElasticsearchClient;
|
||||
index: string;
|
||||
mappings: IndexMapping;
|
||||
batchSize: number;
|
||||
}
|
||||
/**
|
||||
* Updates an index's mappings and runs an pickupUpdatedMappings task so that the mapping
|
||||
|
@ -37,6 +38,7 @@ export const updateAndPickupMappings = ({
|
|||
client,
|
||||
index,
|
||||
mappings,
|
||||
batchSize,
|
||||
}: UpdateAndPickupMappingsParams): TaskEither.TaskEither<
|
||||
RetryableEsClientError,
|
||||
UpdateAndPickupMappingsResponse
|
||||
|
@ -74,7 +76,7 @@ export const updateAndPickupMappings = ({
|
|||
return pipe(
|
||||
putMappingTask,
|
||||
TaskEither.chain((res) => {
|
||||
return pickupUpdatedMappings(client, index);
|
||||
return pickupUpdatedMappings(client, index, batchSize);
|
||||
})
|
||||
);
|
||||
};
|
||||
|
|
|
@ -191,6 +191,7 @@ export const nextActionMap = (
|
|||
client,
|
||||
index: state.targetIndex,
|
||||
mappings: omit(state.targetIndexMappings, ['_meta']), // ._meta property will be updated on a later step
|
||||
batchSize: state.batchSize,
|
||||
}),
|
||||
UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK: (
|
||||
state: UpdateTargetMappingsPropertiesWaitForTaskState
|
||||
|
@ -258,6 +259,7 @@ export const nextActionMap = (
|
|||
reindexScript: state.preMigrationScript,
|
||||
requireAlias: false,
|
||||
excludeOnUpgradeQuery: state.excludeOnUpgradeQuery,
|
||||
batchSize: state.batchSize,
|
||||
}),
|
||||
LEGACY_REINDEX_WAIT_FOR_TASK: (state: LegacyReindexWaitForTaskState) =>
|
||||
Actions.waitForReindexTask({ client, taskId: state.legacyReindexTaskId, timeout: '60s' }),
|
||||
|
|
|
@ -40,6 +40,7 @@ export const createContext = ({
|
|||
maxRetryAttempts: migrationConfig.retryAttempts,
|
||||
migrationDocLinks: docLinks.links.kibanaUpgradeSavedObjects,
|
||||
deletedTypes: REMOVED_TYPES,
|
||||
batchSize: migrationConfig.batchSize,
|
||||
discardCorruptObjects: Boolean(migrationConfig.discardCorruptObjects),
|
||||
};
|
||||
};
|
||||
|
|
|
@ -46,6 +46,8 @@ export interface MigratorContext {
|
|||
readonly typeRegistry: ISavedObjectTypeRegistry;
|
||||
/** List of types that are no longer registered */
|
||||
readonly deletedTypes: string[];
|
||||
/** The number of documents to process at a time */
|
||||
readonly batchSize: number;
|
||||
/** If true, corrupted objects will be discarded instead of failing the migration */
|
||||
readonly discardCorruptObjects: boolean;
|
||||
}
|
||||
|
|
|
@ -71,6 +71,7 @@ export const nextActionMap = (context: MigratorContext) => {
|
|||
client,
|
||||
index: state.currentIndex,
|
||||
mappings: { properties: state.additiveMappingChanges },
|
||||
batchSize: context.batchSize,
|
||||
}),
|
||||
UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK: (state: UpdateIndexMappingsWaitForTaskState) =>
|
||||
Actions.waitForPickupUpdatedMappingsTask({
|
||||
|
|
|
@ -54,6 +54,7 @@ export const createContextMock = (
|
|||
typeRegistry,
|
||||
serializer: serializerMock.create(),
|
||||
deletedTypes: ['deleted-type'],
|
||||
batchSize: 1000,
|
||||
discardCorruptObjects: false,
|
||||
...parts,
|
||||
};
|
||||
|
|
|
@ -713,6 +713,7 @@ describe('migration actions', () => {
|
|||
reindexScript: Option.none,
|
||||
requireAlias: false,
|
||||
excludeOnUpgradeQuery: { match_all: {} },
|
||||
batchSize: 1000,
|
||||
})()) as Either.Right<ReindexResponse>;
|
||||
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
|
||||
await expect(task()).resolves.toMatchInlineSnapshot(`
|
||||
|
@ -748,6 +749,7 @@ describe('migration actions', () => {
|
|||
})),
|
||||
},
|
||||
},
|
||||
batchSize: 1000,
|
||||
})()) as Either.Right<ReindexResponse>;
|
||||
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
|
||||
await expect(task()).resolves.toMatchInlineSnapshot(`
|
||||
|
@ -776,6 +778,7 @@ describe('migration actions', () => {
|
|||
reindexScript: Option.some(`ctx._source.title = ctx._source.title + '_updated'`),
|
||||
requireAlias: false,
|
||||
excludeOnUpgradeQuery: { match_all: {} },
|
||||
batchSize: 1000,
|
||||
})()) as Either.Right<ReindexResponse>;
|
||||
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
|
||||
await expect(task()).resolves.toMatchInlineSnapshot(`
|
||||
|
@ -807,6 +810,7 @@ describe('migration actions', () => {
|
|||
reindexScript: Option.some(`ctx._source.title = ctx._source.title + '_updated'`),
|
||||
requireAlias: false,
|
||||
excludeOnUpgradeQuery: { match_all: {} },
|
||||
batchSize: 1000,
|
||||
})()) as Either.Right<ReindexResponse>;
|
||||
let task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
|
||||
await expect(task()).resolves.toMatchInlineSnapshot(`
|
||||
|
@ -824,6 +828,7 @@ describe('migration actions', () => {
|
|||
reindexScript: Option.none,
|
||||
requireAlias: false,
|
||||
excludeOnUpgradeQuery: { match_all: {} },
|
||||
batchSize: 1000,
|
||||
})()) as Either.Right<ReindexResponse>;
|
||||
task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
|
||||
await expect(task()).resolves.toMatchInlineSnapshot(`
|
||||
|
@ -873,6 +878,7 @@ describe('migration actions', () => {
|
|||
reindexScript: Option.some(`ctx._source.title = ctx._source.title + '_updated'`),
|
||||
requireAlias: false,
|
||||
excludeOnUpgradeQuery: { match_all: {} },
|
||||
batchSize: 1000,
|
||||
})()) as Either.Right<ReindexResponse>;
|
||||
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
|
||||
await expect(task()).resolves.toMatchInlineSnapshot(`
|
||||
|
@ -924,6 +930,7 @@ describe('migration actions', () => {
|
|||
reindexScript: Option.none,
|
||||
requireAlias: false,
|
||||
excludeOnUpgradeQuery: { match_all: {} },
|
||||
batchSize: 1000,
|
||||
})()) as Either.Right<ReindexResponse>;
|
||||
const task = waitForReindexTask({ client, taskId: reindexTaskId, timeout: '10s' });
|
||||
|
||||
|
@ -963,6 +970,7 @@ describe('migration actions', () => {
|
|||
reindexScript: Option.none,
|
||||
requireAlias: false,
|
||||
excludeOnUpgradeQuery: { match_all: {} },
|
||||
batchSize: 1000,
|
||||
})()) as Either.Right<ReindexResponse>;
|
||||
const task = waitForReindexTask({ client, taskId: reindexTaskId, timeout: '10s' });
|
||||
|
||||
|
@ -986,6 +994,7 @@ describe('migration actions', () => {
|
|||
excludeOnUpgradeQuery: {
|
||||
match_all: {},
|
||||
},
|
||||
batchSize: 1000,
|
||||
})()) as Either.Right<ReindexResponse>;
|
||||
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
|
||||
await expect(task()).resolves.toMatchInlineSnapshot(`
|
||||
|
@ -1007,6 +1016,7 @@ describe('migration actions', () => {
|
|||
reindexScript: Option.none,
|
||||
requireAlias: false,
|
||||
excludeOnUpgradeQuery: { match_all: {} },
|
||||
batchSize: 1000,
|
||||
})()) as Either.Right<ReindexResponse>;
|
||||
|
||||
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
|
||||
|
@ -1029,6 +1039,7 @@ describe('migration actions', () => {
|
|||
reindexScript: Option.none,
|
||||
requireAlias: true,
|
||||
excludeOnUpgradeQuery: { match_all: {} },
|
||||
batchSize: 1000,
|
||||
})()) as Either.Right<ReindexResponse>;
|
||||
|
||||
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
|
||||
|
@ -1057,6 +1068,7 @@ describe('migration actions', () => {
|
|||
reindexScript: Option.none,
|
||||
requireAlias: false,
|
||||
excludeOnUpgradeQuery: { match_all: {} },
|
||||
batchSize: 1000,
|
||||
})()) as Either.Right<ReindexResponse>;
|
||||
|
||||
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '0s' });
|
||||
|
@ -1319,7 +1331,8 @@ describe('migration actions', () => {
|
|||
it('rejects if there are failures', async () => {
|
||||
const res = (await pickupUpdatedMappings(
|
||||
client,
|
||||
'existing_index_with_write_block'
|
||||
'existing_index_with_write_block',
|
||||
1000
|
||||
)()) as Either.Right<UpdateByQueryResponse>;
|
||||
|
||||
const task = waitForPickupUpdatedMappingsTask({
|
||||
|
@ -1338,7 +1351,8 @@ describe('migration actions', () => {
|
|||
it('rejects if there is an error', async () => {
|
||||
const res = (await pickupUpdatedMappings(
|
||||
client,
|
||||
'no_such_index'
|
||||
'no_such_index',
|
||||
1000
|
||||
)()) as Either.Right<UpdateByQueryResponse>;
|
||||
|
||||
const task = waitForPickupUpdatedMappingsTask({
|
||||
|
@ -1353,7 +1367,8 @@ describe('migration actions', () => {
|
|||
it('resolves left wait_for_task_completion_timeout when the task does not complete within the timeout', async () => {
|
||||
const res = (await pickupUpdatedMappings(
|
||||
client,
|
||||
'.kibana_1'
|
||||
'.kibana_1',
|
||||
1000
|
||||
)()) as Either.Right<UpdateByQueryResponse>;
|
||||
|
||||
const task = waitForPickupUpdatedMappingsTask({
|
||||
|
@ -1376,7 +1391,8 @@ describe('migration actions', () => {
|
|||
it('resolves right when successful', async () => {
|
||||
const res = (await pickupUpdatedMappings(
|
||||
client,
|
||||
'existing_index_with_docs'
|
||||
'existing_index_with_docs',
|
||||
1000
|
||||
)()) as Either.Right<UpdateByQueryResponse>;
|
||||
|
||||
const task = waitForPickupUpdatedMappingsTask({
|
||||
|
@ -1438,6 +1454,7 @@ describe('migration actions', () => {
|
|||
title: { type: 'text' },
|
||||
},
|
||||
},
|
||||
batchSize: 1000,
|
||||
})();
|
||||
expect(Either.isRight(res)).toBe(true);
|
||||
const taskId = (res as Either.Right<UpdateAndPickupMappingsResponse>).right.taskId;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue