Ensure requestTimeout > timeout for waitForIndexStatus & updateAndPickupMappings

This commit is contained in:
Rudolf Meijering 2022-07-19 15:36:41 +02:00
parent c8c48cf732
commit 91df1788ce
10 changed files with 59 additions and 41 deletions

View file

@ -33,7 +33,7 @@ export interface CloneIndexParams {
source: string;
target: string;
/** only used for testing */
timeout?: string;
timeout?: number;
}
/**
* Makes a clone of the source index into the target.
@ -75,7 +75,7 @@ export const cloneIndex = ({
// Set an explicit refresh interval so that we don't inherit the
// value from incorrectly configured index templates (not required
// after we adopt system indices)
refresh_interval: '1s',
refresh_interval: '1000',
// Bump priority so that recovery happens before newer indices
priority: 10,
},

View file

@ -11,7 +11,7 @@
* Uses the default value of 1000 for Elasticsearch reindex operation.
*/
export const BATCH_SIZE = 1_000;
export const DEFAULT_TIMEOUT = '60s';
export const DEFAULT_TIMEOUT = 50_000;
/** Allocate 1 replica if there are enough data nodes, otherwise continue with 0 */
export const INDEX_AUTO_EXPAND_REPLICAS = '0-1';
/** ES rule of thumb: shards should be several GB to 10's of GB, so Kibana is unlikely to cross that limit */

View file

@ -408,7 +408,7 @@ describe('migration actions', () => {
const task = waitForIndexStatus({
client,
index: 'red_index',
timeout: '1s',
timeout: 1_000,
status: 'yellow',
});
await expect(task()).resolves.toMatchInlineSnapshot(`
@ -441,7 +441,7 @@ describe('migration actions', () => {
const task = waitForIndexStatus({
client,
index: 'red_index',
timeout: '1s',
timeout: 1_000,
status: 'green',
});
await expect(task()).resolves.toMatchInlineSnapshot(`
@ -561,7 +561,7 @@ describe('migration actions', () => {
client,
source: 'existing_index_with_write_block',
target: 'clone_red_index',
timeout: '1s',
timeout: 1_000,
})();
await expect(cloneIndexPromise).resolves.toMatchInlineSnapshot(`
@ -589,7 +589,7 @@ describe('migration actions', () => {
client,
source: 'existing_index_with_write_block',
target: 'clone_red_index',
timeout: '1s',
timeout: 1_000,
})();
await expect(cloneIndexPromise).resolves.toMatchInlineSnapshot(`
@ -617,7 +617,7 @@ describe('migration actions', () => {
client,
source: 'existing_index_with_write_block',
target: 'clone_red_index',
timeout: '30s',
timeout: 30_000,
})();
await expect(cloneIndexPromise).resolves.toMatchInlineSnapshot(`
@ -674,7 +674,7 @@ describe('migration actions', () => {
requireAlias: false,
excludeOnUpgradeQuery: { match_all: {} },
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: 10_000 });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
@ -714,7 +714,7 @@ describe('migration actions', () => {
},
},
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: 10_000 });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
@ -747,7 +747,7 @@ describe('migration actions', () => {
requireAlias: false,
excludeOnUpgradeQuery: { match_all: {} },
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: 10_000 });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
@ -782,7 +782,7 @@ describe('migration actions', () => {
requireAlias: false,
excludeOnUpgradeQuery: { match_all: {} },
})()) as Either.Right<ReindexResponse>;
let task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
let task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: 10_000 });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
@ -799,7 +799,7 @@ describe('migration actions', () => {
requireAlias: false,
excludeOnUpgradeQuery: { match_all: {} },
})()) as Either.Right<ReindexResponse>;
task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: 10_000 });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
@ -858,7 +858,7 @@ describe('migration actions', () => {
requireAlias: false,
excludeOnUpgradeQuery: { match_all: {} },
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: 10_000 });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
@ -914,7 +914,7 @@ describe('migration actions', () => {
requireAlias: false,
excludeOnUpgradeQuery: { match_all: {} },
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: reindexTaskId, timeout: '10s' });
const task = waitForReindexTask({ client, taskId: reindexTaskId, timeout: 10_000 });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
@ -953,7 +953,7 @@ describe('migration actions', () => {
requireAlias: false,
excludeOnUpgradeQuery: { match_all: {} },
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: reindexTaskId, timeout: '10s' });
const task = waitForReindexTask({ client, taskId: reindexTaskId, timeout: 10_000 });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
@ -976,7 +976,7 @@ describe('migration actions', () => {
match_all: {},
},
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: 10_000 });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
@ -998,7 +998,7 @@ describe('migration actions', () => {
excludeOnUpgradeQuery: { match_all: {} },
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: 10_000 });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
@ -1020,7 +1020,7 @@ describe('migration actions', () => {
excludeOnUpgradeQuery: { match_all: {} },
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: 10_000 });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
@ -1049,7 +1049,7 @@ describe('migration actions', () => {
excludeOnUpgradeQuery: { match_all: {} },
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '0s' });
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: 0 });
await expect(task()).resolves.toMatchObject({
_tag: 'Left',
@ -1315,7 +1315,7 @@ describe('migration actions', () => {
const task = waitForPickupUpdatedMappingsTask({
client,
taskId: res.right.taskId,
timeout: '10s',
timeout: 10_000,
});
// We can't do a snapshot match because the response includes an index
@ -1334,7 +1334,7 @@ describe('migration actions', () => {
const task = waitForPickupUpdatedMappingsTask({
client,
taskId: res.right.taskId,
timeout: '10s',
timeout: 10_000,
});
await expect(task()).rejects.toThrow('index_not_found_exception');
@ -1349,7 +1349,7 @@ describe('migration actions', () => {
const task = waitForPickupUpdatedMappingsTask({
client,
taskId: res.right.taskId,
timeout: '0s',
timeout: 0,
});
await expect(task()).resolves.toMatchObject({
@ -1372,7 +1372,7 @@ describe('migration actions', () => {
const task = waitForPickupUpdatedMappingsTask({
client,
taskId: res.right.taskId,
timeout: '10s',
timeout: 10_000,
});
await expect(task()).resolves.toMatchInlineSnapshot(`
@ -1432,7 +1432,7 @@ describe('migration actions', () => {
})();
expect(Either.isRight(res)).toBe(true);
const taskId = (res as Either.Right<UpdateAndPickupMappingsResponse>).right.taskId;
await waitForPickupUpdatedMappingsTask({ client, taskId, timeout: '60s' })();
await waitForPickupUpdatedMappingsTask({ client, taskId, timeout: 60_000 })();
// Repeat the search expecting to be able to find the existing documents
const pickedUpSearchResults = (

View file

@ -46,11 +46,20 @@ export const updateAndPickupMappings = ({
'update_mappings_succeeded'
> = () => {
return client.indices
.putMapping({
index,
timeout: DEFAULT_TIMEOUT,
body: mappings,
})
.putMapping(
{
index,
timeout: DEFAULT_TIMEOUT,
body: mappings,
},
{
// The client defaults to a 30s request timeout whereas we're
// specifying a 50s timeout to Elasticsearch so we need to ensure
// the client doesn't timeout the connection before the ES server
// responds
requestTimeout: DEFAULT_TIMEOUT + 5,
}
)
.then(() => {
// Ignore `acknowledged: false`. When the coordinating node accepts
// the new cluster state update but not all nodes have applied the

View file

@ -19,7 +19,7 @@ import { DEFAULT_TIMEOUT } from './constants';
export interface WaitForIndexStatusParams {
client: ElasticsearchClient;
index: string;
timeout?: string;
timeout?: number;
status: 'yellow' | 'green';
}
@ -81,9 +81,15 @@ export function waitForIndexStatus({
wait_for_status: status,
timeout,
},
// Don't reject on status code 408 so that we can handle the timeout
// explicitly with a custom response type and provide more context in the error message
{ ignore: [408] }
{
// Don't reject on status code 408 so that we can handle the timeout
// explicitly with a custom response type and provide more context in the error message
ignore: [408],
// The client defaults to a 30s timeout whereas we use a default wait_for_status timeout
// of 50s so we need to make sure the client doesn't timeout the connection before the
// ES server responds
requestTimeout: timeout + 5,
}
)
.then((res) => {
if (res.timed_out === true) {
@ -97,9 +103,12 @@ export function waitForIndexStatus({
type: 'index_not_yellow_timeout' as const,
message: `[index_not_yellow_timeout] Timeout waiting for the status of the [${index}] index to become '${status}'`,
});
} else {
throw new Error(`Invalid index status: ${status}. Must be one of 'green' or 'yellow'`);
}
} else {
return Either.right({});
}
return Either.right({});
})
.catch(catchRetryableEsClientErrors);
};

View file

@ -38,7 +38,7 @@ describe('waitForPickupUpdatedMappingsTask', () => {
const task = waitForPickupUpdatedMappingsTask({
client,
taskId: 'my task id',
timeout: '60s',
timeout: 60000,
});
try {
await task();

View file

@ -36,7 +36,7 @@ describe('waitForReindexTask', () => {
);
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = waitForReindexTask({ client, taskId: 'my task id', timeout: '60s' });
const task = waitForReindexTask({ client, taskId: 'my task id', timeout: 60_000 });
try {
await task();
} catch (e) {

View file

@ -33,7 +33,7 @@ describe('waitForTask', () => {
const task = waitForTask({
client,
taskId: 'my task id',
timeout: '60s',
timeout: 60_000,
});
try {
await task();

View file

@ -59,7 +59,7 @@ const catchWaitForTaskCompletionTimeout = (
export interface WaitForTaskParams {
client: ElasticsearchClient;
taskId: string;
timeout: string;
timeout: number;
}
/**
* Blocks for up to 60s or until a task completes.

View file

@ -136,7 +136,7 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra
Actions.waitForPickupUpdatedMappingsTask({
client,
taskId: state.updateTargetMappingsTaskId,
timeout: '60s',
timeout: 60_000,
}),
OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT: (state: OutdatedDocumentsSearchOpenPit) =>
Actions.openPit({ client, index: state.targetIndex }),
@ -190,7 +190,7 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra
excludeOnUpgradeQuery: state.excludeOnUpgradeQuery,
}),
LEGACY_REINDEX_WAIT_FOR_TASK: (state: LegacyReindexWaitForTaskState) =>
Actions.waitForReindexTask({ client, taskId: state.legacyReindexTaskId, timeout: '60s' }),
Actions.waitForReindexTask({ client, taskId: state.legacyReindexTaskId, timeout: 60_000 }),
LEGACY_DELETE: (state: LegacyDeleteState) =>
Actions.updateAliases({ client, aliasActions: state.legacyPreMigrationDoneActions }),
};