mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
dynamically reduce batch size before it reaches MAX_STRING_LENGTH
This commit is contained in:
parent
6594a2456b
commit
2a42ae1015
7 changed files with 289 additions and 80 deletions
|
@ -18,7 +18,7 @@ const migrationSchema = schema.object({
|
|||
batchSize: schema.number({ defaultValue: 1_000 }),
|
||||
maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value
|
||||
maxReadBatchSizeBytes: schema.byteSize({
|
||||
defaultValue: buffer.constants.MAX_STRING_LENGTH,
|
||||
defaultValue: '100mb',
|
||||
max: buffer.constants.MAX_STRING_LENGTH,
|
||||
}),
|
||||
discardUnknownObjects: schema.maybe(
|
||||
|
|
|
@ -24,6 +24,7 @@ export interface ReadWithPit {
|
|||
outdatedDocuments: SavedObjectsRawDoc[];
|
||||
readonly lastHitSortValue: number[] | undefined;
|
||||
readonly totalHits: number | undefined;
|
||||
readonly contentLength: number;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
|
@ -79,9 +80,9 @@ export const readWithPit =
|
|||
track_total_hits: typeof searchAfter === 'undefined',
|
||||
query,
|
||||
},
|
||||
{ maxResponseSize: maxResponseSizeBytes }
|
||||
{ maxResponseSize: maxResponseSizeBytes, meta: true }
|
||||
)
|
||||
.then((body) => {
|
||||
.then(({ body, headers }) => {
|
||||
const totalHits =
|
||||
typeof body.hits.total === 'number'
|
||||
? body.hits.total // This format is to be removed in 8.0
|
||||
|
@ -94,6 +95,7 @@ export const readWithPit =
|
|||
outdatedDocuments: hits as SavedObjectsRawDoc[],
|
||||
lastHitSortValue: hits[hits.length - 1].sort as number[],
|
||||
totalHits,
|
||||
contentLength: Number.parseInt(headers['content-length'] ?? '0', 10),
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -5,8 +5,10 @@
|
|||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import * as Option from 'fp-ts/lib/Option';
|
||||
import { FetchIndexResponse } from '../actions/fetch_indices';
|
||||
import { BaseState } from '../state';
|
||||
import {
|
||||
addExcludedTypesToBoolQuery,
|
||||
addMustClausesToBoolQuery,
|
||||
|
@ -20,6 +22,7 @@ import {
|
|||
createBulkIndexOperationTuple,
|
||||
hasLaterVersionAlias,
|
||||
aliasVersion,
|
||||
adjustBatchSize,
|
||||
} from './helpers';
|
||||
|
||||
describe('addExcludedTypesToBoolQuery', () => {
|
||||
|
@ -444,3 +447,116 @@ describe('getTempIndexName', () => {
|
|||
expect(getTempIndexName('.kibana_cases', '8.8.0')).toEqual('.kibana_cases_8.8.0_reindex_temp');
|
||||
});
|
||||
});
|
||||
|
||||
describe('adjustBatchSize', () => {
|
||||
const baseState: BaseState = {
|
||||
controlState: '',
|
||||
legacyIndex: '.kibana',
|
||||
kibanaVersion: '7.11.0',
|
||||
logs: [],
|
||||
retryCount: 0,
|
||||
retryDelay: 0,
|
||||
retryAttempts: 15,
|
||||
batchSize: 1000,
|
||||
maxBatchSize: 1000,
|
||||
maxBatchSizeBytes: 1e8,
|
||||
maxReadBatchSizeBytes: 1234,
|
||||
discardUnknownObjects: false,
|
||||
discardCorruptObjects: false,
|
||||
indexPrefix: '.kibana',
|
||||
outdatedDocumentsQuery: {},
|
||||
targetIndexMappings: { properties: {} },
|
||||
tempIndexMappings: { properties: {} },
|
||||
preMigrationScript: Option.none,
|
||||
currentAlias: '.kibana',
|
||||
versionAlias: '.kibana_7.11.0',
|
||||
versionIndex: '.kibana_7.11.0_001',
|
||||
tempIndex: '.kibana_7.11.0_reindex_temp',
|
||||
tempIndexAlias: '.kibana_7.11.0_reindex_temp_alias',
|
||||
excludeOnUpgradeQuery: {
|
||||
bool: {
|
||||
must_not: [
|
||||
{
|
||||
term: {
|
||||
type: 'unused-fleet-agent-events',
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
knownTypes: ['dashboard', 'config'],
|
||||
excludeFromUpgradeFilterHooks: {},
|
||||
migrationDocLinks: {
|
||||
resolveMigrationFailures: 'https://someurl.co/',
|
||||
repeatedTimeoutRequests: 'repeatedTimeoutRequests',
|
||||
routingAllocationDisabled: 'routingAllocationDisabled',
|
||||
clusterShardLimitExceeded: 'clusterShardLimitExceeded',
|
||||
},
|
||||
waitForMigrationCompletion: false,
|
||||
mustRelocateDocuments: false,
|
||||
indexTypesMap: { '.kibana': ['action'] },
|
||||
};
|
||||
|
||||
test('should increase batchSize by 20% up to maxBatchSize', () => {
|
||||
const state: BaseState = {
|
||||
...baseState,
|
||||
batchSize: 10,
|
||||
maxBatchSize: 20,
|
||||
maxReadBatchSizeBytes: 100,
|
||||
};
|
||||
const newState = adjustBatchSize(state, 50);
|
||||
expect(newState).toEqual(Either.right({ ...state, batchSize: 12 }));
|
||||
});
|
||||
|
||||
test('should not increase batchSize above maxBatchSize', () => {
|
||||
const state: BaseState = {
|
||||
...baseState,
|
||||
batchSize: 19,
|
||||
maxBatchSize: 20,
|
||||
maxReadBatchSizeBytes: 100,
|
||||
};
|
||||
const newState = adjustBatchSize(state, 50);
|
||||
expect(newState).toEqual(Either.right({ ...state, batchSize: 20 }));
|
||||
});
|
||||
|
||||
test('should halve batchSize if contentLength exceeds maxReadBatchSizeBytes', () => {
|
||||
const state: BaseState = {
|
||||
...baseState,
|
||||
batchSize: 20,
|
||||
maxBatchSize: 40,
|
||||
maxReadBatchSizeBytes: 100,
|
||||
};
|
||||
const newState = adjustBatchSize(state, 150);
|
||||
expect(newState).toEqual(
|
||||
Either.right({
|
||||
...state,
|
||||
batchSize: 10,
|
||||
logs: [
|
||||
{
|
||||
level: 'warning',
|
||||
message:
|
||||
'Read a batch with a response content length of 150 bytes which exceeds migrations.maxReadBatchSizeBytes, halving the batch size to 10.',
|
||||
},
|
||||
],
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
test('should set controlState to FATAL if batchSize is 1 and contentLength still exceeds maxReadBatchSizeBytes', () => {
|
||||
const state: BaseState = {
|
||||
...baseState,
|
||||
batchSize: 1,
|
||||
maxBatchSize: 5,
|
||||
maxReadBatchSizeBytes: 10,
|
||||
};
|
||||
const newState = adjustBatchSize(state, 20);
|
||||
expect(newState).toEqual(
|
||||
Either.left({
|
||||
...state,
|
||||
controlState: 'FATAL' as const,
|
||||
reason:
|
||||
'After reducing the read batch size to a single document, the Elasticsearch response content length was 20 bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again.',
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -17,7 +17,7 @@ import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
|
|||
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
|
||||
import type { AliasAction, FetchIndexResponse } from '../actions';
|
||||
import type { BulkIndexOperationTuple } from './create_batches';
|
||||
import { OutdatedDocumentsSearchRead, ReindexSourceToTempRead } from '../state';
|
||||
import { BaseState, FatalState } from '../state';
|
||||
|
||||
/** @internal */
|
||||
export const REINDEX_TEMP_SUFFIX = '_reindex_temp';
|
||||
|
@ -317,9 +317,48 @@ export const getTempIndexName = (indexPrefix: string, kibanaVersion: string): st
|
|||
`${indexPrefix}_${kibanaVersion}${REINDEX_TEMP_SUFFIX}`;
|
||||
|
||||
/** Increase batchSize by 20% until a maximum of maxBatchSize */
|
||||
export const increaseBatchSize = (
|
||||
stateP: OutdatedDocumentsSearchRead | ReindexSourceToTempRead
|
||||
) => {
|
||||
const increaseBatchSize = (stateP: BaseState) => {
|
||||
const increasedBatchSize = Math.floor(stateP.batchSize * 1.2);
|
||||
return increasedBatchSize > stateP.maxBatchSize ? stateP.maxBatchSize : increasedBatchSize;
|
||||
};
|
||||
|
||||
/**
|
||||
* Adjusts the batch size according to the contentLength of the last received batch
|
||||
* - If the contentLength is less than or equal to the maxReadBatchSizeBytes,
|
||||
* the batchSize is increased by 20% up to the maxBatchSize
|
||||
* - If the batchSize is already 1, the controlState is set to 'FATAL' and the
|
||||
* reason is set to a message indicating that the maxReadBatchSizeBytes should be increased
|
||||
* - If the contentLength is greater than the maxReadBatchSizeBytes, the
|
||||
* batchSize is reduced by half
|
||||
*
|
||||
* @param state current state
|
||||
* @param contentLength content length of the last received batch
|
||||
* @returns updated state
|
||||
*/
|
||||
export const adjustBatchSize = (
|
||||
state: BaseState,
|
||||
contentLength: number
|
||||
): Either.Either<FatalState, BaseState> => {
|
||||
if (contentLength <= state.maxReadBatchSizeBytes) {
|
||||
return Either.right({ ...state, batchSize: increaseBatchSize(state) });
|
||||
} else if (state.batchSize === 1) {
|
||||
return Either.left({
|
||||
...state,
|
||||
controlState: 'FATAL' as const,
|
||||
reason: `After reducing the read batch size to a single document, the Elasticsearch response content length was ${contentLength} bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again.`,
|
||||
});
|
||||
} else {
|
||||
const batchSize = Math.max(Math.floor(state.batchSize / 2), 1);
|
||||
return Either.right({
|
||||
...state,
|
||||
batchSize,
|
||||
logs: [
|
||||
...state.logs,
|
||||
{
|
||||
level: 'warning',
|
||||
message: `Read a batch with a response content length of ${contentLength} bytes which exceeds migrations.maxReadBatchSizeBytes, halving the batch size to ${batchSize}.`,
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
};
|
||||
|
|
|
@ -1847,6 +1847,7 @@ describe('migrations v2 model', () => {
|
|||
outdatedDocuments,
|
||||
lastHitSortValue,
|
||||
totalHits: 1,
|
||||
contentLength: 1, // dummy value
|
||||
});
|
||||
const newState = model(state, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_TRANSFORM');
|
||||
|
@ -1874,6 +1875,7 @@ describe('migrations v2 model', () => {
|
|||
lastHitSortValue,
|
||||
totalHits: 1,
|
||||
processedDocs: 1,
|
||||
contentLength: 1, // dummy value
|
||||
});
|
||||
let newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(600);
|
||||
|
@ -1887,6 +1889,26 @@ describe('migrations v2 model', () => {
|
|||
expect(newState.maxBatchSize).toBe(1000);
|
||||
});
|
||||
|
||||
it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM halves batchSize if content length exceeds maxReadBatchSizeBytes', () => {
|
||||
const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }];
|
||||
const lastHitSortValue = [123456];
|
||||
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({
|
||||
outdatedDocuments,
|
||||
lastHitSortValue,
|
||||
totalHits: 1,
|
||||
processedDocs: 1,
|
||||
contentLength: 3456, // dummy value
|
||||
});
|
||||
let newState = model({ ...state, batchSize: 600 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(300);
|
||||
newState = model({ ...state, batchSize: 300 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(150);
|
||||
newState = model({ ...state, batchSize: 150 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(75);
|
||||
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_TRANSFORM');
|
||||
expect(newState.maxBatchSize).toBe(1000);
|
||||
});
|
||||
|
||||
it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ if left es_response_too_large', () => {
|
||||
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
|
@ -1902,7 +1924,7 @@ describe('migrations v2 model', () => {
|
|||
Array [
|
||||
Object {
|
||||
"level": "warning",
|
||||
"message": "Read a batch with a response content length of 4567 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 500.",
|
||||
"message": "Read a batch with a response content length of 4567 bytes which exceeds migrations.maxReadBatchSizeBytes, halving the batch size to 500.",
|
||||
},
|
||||
]
|
||||
`);
|
||||
|
@ -1923,7 +1945,7 @@ describe('migrations v2 model', () => {
|
|||
Array [
|
||||
Object {
|
||||
"level": "warning",
|
||||
"message": "Read a batch with a response content length of 2345 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 1.",
|
||||
"message": "Read a batch with a response content length of 2345 bytes which exceeds migrations.maxReadBatchSizeBytes, halving the batch size to 1.",
|
||||
},
|
||||
]
|
||||
`);
|
||||
|
@ -1939,7 +1961,7 @@ describe('migrations v2 model', () => {
|
|||
expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.reason).toMatchInlineSnapshot(
|
||||
`"After reducing the read batch size to a single document, the Elasticsearch response content length was 2345bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again."`
|
||||
`"After reducing the read batch size to a single document, the Elasticsearch response content length was 2345 bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again."`
|
||||
);
|
||||
});
|
||||
|
||||
|
@ -1948,6 +1970,7 @@ describe('migrations v2 model', () => {
|
|||
outdatedDocuments: [],
|
||||
lastHitSortValue: undefined,
|
||||
totalHits: undefined,
|
||||
contentLength: 0, // dummy value
|
||||
});
|
||||
const newState = model(state, res) as ReindexSourceToTempClosePit;
|
||||
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT');
|
||||
|
@ -1966,6 +1989,7 @@ describe('migrations v2 model', () => {
|
|||
outdatedDocuments: [],
|
||||
lastHitSortValue: undefined,
|
||||
totalHits: undefined,
|
||||
contentLength: 1, // dummy value
|
||||
});
|
||||
const newState = model(testState, res) as FatalState;
|
||||
expect(newState.controlState).toBe('FATAL');
|
||||
|
@ -1990,6 +2014,7 @@ describe('migrations v2 model', () => {
|
|||
outdatedDocuments: [],
|
||||
lastHitSortValue: undefined,
|
||||
totalHits: undefined,
|
||||
contentLength: 1, // dummy value
|
||||
});
|
||||
const newState = model(testState, res) as ReindexSourceToTempClosePit;
|
||||
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT');
|
||||
|
@ -2399,6 +2424,7 @@ describe('migrations v2 model', () => {
|
|||
outdatedDocuments,
|
||||
lastHitSortValue,
|
||||
totalHits: 10,
|
||||
contentLength: 1, // dummy value
|
||||
});
|
||||
const newState = model(state, res) as OutdatedDocumentsTransform;
|
||||
expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_TRANSFORM');
|
||||
|
@ -2425,6 +2451,7 @@ describe('migrations v2 model', () => {
|
|||
outdatedDocuments,
|
||||
lastHitSortValue,
|
||||
totalHits: undefined,
|
||||
contentLength: 1, // dummy value
|
||||
});
|
||||
const testState = {
|
||||
...state,
|
||||
|
@ -2457,6 +2484,7 @@ describe('migrations v2 model', () => {
|
|||
lastHitSortValue,
|
||||
totalHits: 1,
|
||||
processedDocs: [],
|
||||
contentLength: 1, // dummy value
|
||||
});
|
||||
let newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(600);
|
||||
|
@ -2470,6 +2498,26 @@ describe('migrations v2 model', () => {
|
|||
expect(newState.maxBatchSize).toBe(1000);
|
||||
});
|
||||
|
||||
it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_TRANSFORM halves batchSize if content length exceeds maxReadBatchSizeBytes', () => {
|
||||
const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }];
|
||||
const lastHitSortValue = [123456];
|
||||
const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.right({
|
||||
outdatedDocuments,
|
||||
lastHitSortValue,
|
||||
totalHits: 1,
|
||||
processedDocs: [],
|
||||
contentLength: 3456, // dummy value
|
||||
});
|
||||
let newState = model({ ...state, batchSize: 600 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(300);
|
||||
newState = model({ ...state, batchSize: 300 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(150);
|
||||
newState = model({ ...state, batchSize: 150 }, res) as ReindexSourceToTempTransform;
|
||||
expect(newState.batchSize).toBe(75);
|
||||
expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_TRANSFORM');
|
||||
expect(newState.maxBatchSize).toBe(1000);
|
||||
});
|
||||
|
||||
it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_SEARCH_READ if left es_response_too_large', () => {
|
||||
const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.left({
|
||||
type: 'es_response_too_large',
|
||||
|
@ -2485,7 +2533,7 @@ describe('migrations v2 model', () => {
|
|||
Array [
|
||||
Object {
|
||||
"level": "warning",
|
||||
"message": "Read a batch with a response content length of 3456 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 500.",
|
||||
"message": "Read a batch with a response content length of 3456 bytes which exceeds migrations.maxReadBatchSizeBytes, halving the batch size to 500.",
|
||||
},
|
||||
]
|
||||
`);
|
||||
|
@ -2506,7 +2554,7 @@ describe('migrations v2 model', () => {
|
|||
Array [
|
||||
Object {
|
||||
"level": "warning",
|
||||
"message": "Read a batch with a response content length of 2345 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 1.",
|
||||
"message": "Read a batch with a response content length of 2345 bytes which exceeds migrations.maxReadBatchSizeBytes, halving the batch size to 1.",
|
||||
},
|
||||
]
|
||||
`);
|
||||
|
@ -2522,7 +2570,7 @@ describe('migrations v2 model', () => {
|
|||
expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1
|
||||
expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged
|
||||
expect(newState.reason).toMatchInlineSnapshot(
|
||||
`"After reducing the read batch size to a single document, the response content length was 2345 bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again."`
|
||||
`"After reducing the read batch size to a single document, the Elasticsearch response content length was 2345 bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again."`
|
||||
);
|
||||
});
|
||||
|
||||
|
@ -2531,6 +2579,7 @@ describe('migrations v2 model', () => {
|
|||
outdatedDocuments: [],
|
||||
lastHitSortValue: undefined,
|
||||
totalHits: undefined,
|
||||
contentLength: 0, // dummy value
|
||||
});
|
||||
const newState = model(state, res) as OutdatedDocumentsSearchClosePit;
|
||||
expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT');
|
||||
|
@ -2552,6 +2601,7 @@ describe('migrations v2 model', () => {
|
|||
outdatedDocuments: [],
|
||||
lastHitSortValue: undefined,
|
||||
totalHits: undefined,
|
||||
contentLength: 0, // dummy value
|
||||
});
|
||||
const transformErrorsState: OutdatedDocumentsSearchRead = {
|
||||
...state,
|
||||
|
|
|
@ -10,9 +10,10 @@ import * as Either from 'fp-ts/lib/Either';
|
|||
import * as Option from 'fp-ts/lib/Option';
|
||||
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
|
||||
|
||||
import { pipe } from 'fp-ts/lib/function';
|
||||
import { isTypeof } from '../actions';
|
||||
import type { AliasAction } from '../actions';
|
||||
import type { AllActionStates, State } from '../state';
|
||||
import type { AllActionStates, BaseState, State, FatalState } from '../state';
|
||||
import type { ResponseType } from '../next';
|
||||
import {
|
||||
createInitialProgress,
|
||||
|
@ -43,10 +44,10 @@ import {
|
|||
versionMigrationCompleted,
|
||||
buildRemoveAliasActions,
|
||||
MigrationType,
|
||||
increaseBatchSize,
|
||||
hasLaterVersionAlias,
|
||||
aliasVersion,
|
||||
REINDEX_TEMP_SUFFIX,
|
||||
adjustBatchSize,
|
||||
} from './helpers';
|
||||
import { buildTempIndexMap, createBatches } from './create_batches';
|
||||
import type { MigrationLog } from '../types';
|
||||
|
@ -860,19 +861,29 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
// we carry through any failures we've seen with transforming documents on state
|
||||
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
|
||||
if (Either.isRight(res)) {
|
||||
// Update and log progress from the previously processed batch
|
||||
const progress = setProgressTotal(stateP.progress, res.right.totalHits);
|
||||
logs = logProgress(stateP.logs, progress);
|
||||
stateP = {
|
||||
...stateP,
|
||||
progress,
|
||||
logs: [...stateP.logs, ...logProgress(stateP.logs, progress)],
|
||||
};
|
||||
|
||||
if (res.right.outdatedDocuments.length > 0) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'REINDEX_SOURCE_TO_TEMP_TRANSFORM',
|
||||
outdatedDocuments: res.right.outdatedDocuments,
|
||||
lastHitSortValue: res.right.lastHitSortValue,
|
||||
progress,
|
||||
logs,
|
||||
// We succeeded in reading this batch, so increase the batch size for the next request.
|
||||
batchSize: increaseBatchSize(stateP),
|
||||
};
|
||||
return pipe(
|
||||
adjustBatchSize({ ...stateP, logs: [...stateP.logs, ...logs] }, res.right.contentLength),
|
||||
Either.fold(
|
||||
(fatalState: FatalState) => fatalState,
|
||||
(state: BaseState) => {
|
||||
return {
|
||||
...state,
|
||||
controlState: 'REINDEX_SOURCE_TO_TEMP_TRANSFORM',
|
||||
outdatedDocuments: res.right.outdatedDocuments,
|
||||
lastHitSortValue: res.right.lastHitSortValue,
|
||||
} as State;
|
||||
}
|
||||
)
|
||||
);
|
||||
} else {
|
||||
// we don't have any more outdated documents and need to either fail or move on to updating the target mappings.
|
||||
if (stateP.corruptDocumentIds.length > 0 || stateP.transformErrors.length > 0) {
|
||||
|
@ -916,27 +927,18 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
} else {
|
||||
const left = res.left;
|
||||
if (isTypeof(left, 'es_response_too_large')) {
|
||||
if (stateP.batchSize === 1) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'FATAL',
|
||||
reason: `After reducing the read batch size to a single document, the Elasticsearch response content length was ${left.contentLength}bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again.`,
|
||||
};
|
||||
} else {
|
||||
const batchSize = Math.max(Math.floor(stateP.batchSize / 2), 1);
|
||||
return {
|
||||
...stateP,
|
||||
batchSize,
|
||||
controlState: 'REINDEX_SOURCE_TO_TEMP_READ',
|
||||
logs: [
|
||||
...stateP.logs,
|
||||
{
|
||||
level: 'warning',
|
||||
message: `Read a batch with a response content length of ${left.contentLength} bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to ${batchSize}.`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
return pipe(
|
||||
adjustBatchSize(stateP, left.contentLength),
|
||||
Either.fold(
|
||||
(fatalState: FatalState) => fatalState,
|
||||
(state: BaseState) => {
|
||||
return {
|
||||
...state,
|
||||
controlState: 'REINDEX_SOURCE_TO_TEMP_READ',
|
||||
} as State;
|
||||
}
|
||||
)
|
||||
);
|
||||
} else {
|
||||
throwBadResponse(stateP, left);
|
||||
}
|
||||
|
@ -1199,19 +1201,28 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
|
||||
if (Either.isRight(res)) {
|
||||
if (res.right.outdatedDocuments.length > 0) {
|
||||
// Update and log progress
|
||||
const progress = setProgressTotal(stateP.progress, res.right.totalHits);
|
||||
logs = logProgress(stateP.logs, progress);
|
||||
|
||||
return {
|
||||
stateP = {
|
||||
...stateP,
|
||||
controlState: 'OUTDATED_DOCUMENTS_TRANSFORM',
|
||||
outdatedDocuments: res.right.outdatedDocuments,
|
||||
lastHitSortValue: res.right.lastHitSortValue,
|
||||
progress,
|
||||
logs,
|
||||
// We succeeded in reading this batch, so increase the batch size for the next request.
|
||||
batchSize: increaseBatchSize(stateP),
|
||||
logs: [...stateP.logs, ...logProgress(stateP.logs, progress)],
|
||||
};
|
||||
|
||||
return pipe(
|
||||
adjustBatchSize(stateP, res.right.contentLength),
|
||||
Either.fold(
|
||||
(fatalState: FatalState) => fatalState,
|
||||
(state: BaseState) => {
|
||||
return {
|
||||
...state,
|
||||
controlState: 'OUTDATED_DOCUMENTS_TRANSFORM',
|
||||
outdatedDocuments: res.right.outdatedDocuments,
|
||||
lastHitSortValue: res.right.lastHitSortValue,
|
||||
} as State;
|
||||
}
|
||||
)
|
||||
);
|
||||
} else {
|
||||
// we don't have any more outdated documents and need to either fail or move on to updating the target mappings.
|
||||
if (stateP.corruptDocumentIds.length > 0 || stateP.transformErrors.length > 0) {
|
||||
|
@ -1253,27 +1264,18 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
} else {
|
||||
const left = res.left;
|
||||
if (isTypeof(left, 'es_response_too_large')) {
|
||||
if (stateP.batchSize === 1) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'FATAL',
|
||||
reason: `After reducing the read batch size to a single document, the response content length was ${left.contentLength} bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again.`,
|
||||
};
|
||||
} else {
|
||||
const batchSize = Math.max(Math.floor(stateP.batchSize / 2), 1);
|
||||
return {
|
||||
...stateP,
|
||||
batchSize,
|
||||
controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ',
|
||||
logs: [
|
||||
...stateP.logs,
|
||||
{
|
||||
level: 'warning',
|
||||
message: `Read a batch with a response content length of ${left.contentLength} bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to ${batchSize}.`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
return pipe(
|
||||
adjustBatchSize(stateP, left.contentLength),
|
||||
Either.fold(
|
||||
(fatalState: FatalState) => fatalState,
|
||||
(state: BaseState) => {
|
||||
return {
|
||||
...state,
|
||||
controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ',
|
||||
} as State;
|
||||
}
|
||||
)
|
||||
);
|
||||
} else {
|
||||
throwBadResponse(stateP, left);
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ describe('migration v2 - read batch size', () => {
|
|||
logs = await fs.readFile(logFilePath, 'utf-8');
|
||||
|
||||
expect(logs).toMatch(
|
||||
/Read a batch with a response content length of \d+ bytes which exceeds migrations\.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 15/
|
||||
/Read a batch with a response content length of \d+ bytes which exceeds migrations\.maxReadBatchSizeBytes, halving the batch size to 15/
|
||||
);
|
||||
expect(logs).toMatch('[.kibana] Migration completed');
|
||||
});
|
||||
|
@ -60,7 +60,7 @@ describe('migration v2 - read batch size', () => {
|
|||
// Check for migration steps present in the logs
|
||||
logs = await fs.readFile(logFilePath, 'utf-8');
|
||||
|
||||
expect(logs).not.toMatch('retrying by reducing the batch size in half to');
|
||||
expect(logs).not.toMatch('halving the batch size');
|
||||
expect(logs).toMatch('[.kibana] Migration completed');
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue