[so-migrationsv2] Use named arguments in migrationsv2 actions (#100964)

* Use named arguments in migrationsv2 actions

* Addresses some optional review feedback
This commit is contained in:
Christiane (Tina) Heiligers 2021-05-31 16:04:46 -07:00 committed by GitHub
parent af4b8e6626
commit d6828a221b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 781 additions and 475 deletions

View file

@ -37,7 +37,7 @@ describe('actions', () => {
describe('fetchIndices', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.fetchIndices(client, ['my_index']);
const task = Actions.fetchIndices({ client, indices: ['my_index'] });
try {
await task();
} catch (e) {
@ -49,7 +49,7 @@ describe('actions', () => {
describe('setWriteBlock', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.setWriteBlock(client, 'my_index');
const task = Actions.setWriteBlock({ client, index: 'my_index' });
try {
await task();
} catch (e) {
@ -58,7 +58,10 @@ describe('actions', () => {
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
it('re-throws non retry-able errors', async () => {
const task = Actions.setWriteBlock(clientWithNonRetryableError, 'my_index');
const task = Actions.setWriteBlock({
client: clientWithNonRetryableError,
index: 'my_index',
});
await task();
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError);
});
@ -66,7 +69,11 @@ describe('actions', () => {
describe('cloneIndex', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.cloneIndex(client, 'my_source_index', 'my_target_index');
const task = Actions.cloneIndex({
client,
source: 'my_source_index',
target: 'my_target_index',
});
try {
await task();
} catch (e) {
@ -75,7 +82,10 @@ describe('actions', () => {
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
it('re-throws non retry-able errors', async () => {
const task = Actions.setWriteBlock(clientWithNonRetryableError, 'my_index');
const task = Actions.setWriteBlock({
client: clientWithNonRetryableError,
index: 'my_index',
});
await task();
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError);
});
@ -95,7 +105,7 @@ describe('actions', () => {
describe('openPit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.openPit(client, 'my_index');
const task = Actions.openPit({ client, index: 'my_index' });
try {
await task();
} catch (e) {
@ -107,7 +117,12 @@ describe('actions', () => {
describe('readWithPit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.readWithPit(client, 'pitId', { match_all: {} }, 10_000);
const task = Actions.readWithPit({
client,
pitId: 'pitId',
query: { match_all: {} },
batchSize: 10_000,
});
try {
await task();
} catch (e) {
@ -119,7 +134,7 @@ describe('actions', () => {
describe('closePit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.closePit(client, 'pitId');
const task = Actions.closePit({ client, pitId: 'pitId' });
try {
await task();
} catch (e) {
@ -131,14 +146,14 @@ describe('actions', () => {
describe('reindex', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.reindex(
const task = Actions.reindex({
client,
'my_source_index',
'my_target_index',
Option.none,
false,
{}
);
sourceIndex: 'my_source_index',
targetIndex: 'my_target_index',
reindexScript: Option.none,
requireAlias: false,
unusedTypesQuery: {},
});
try {
await task();
} catch (e) {
@ -150,7 +165,7 @@ describe('actions', () => {
describe('waitForReindexTask', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.waitForReindexTask(client, 'my task id', '60s');
const task = Actions.waitForReindexTask({ client, taskId: 'my task id', timeout: '60s' });
try {
await task();
} catch (e) {
@ -160,7 +175,10 @@ describe('actions', () => {
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
it('re-throws non retry-able errors', async () => {
const task = Actions.setWriteBlock(clientWithNonRetryableError, 'my_index');
const task = Actions.setWriteBlock({
client: clientWithNonRetryableError,
index: 'my_index',
});
await task();
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError);
});
@ -168,7 +186,11 @@ describe('actions', () => {
describe('waitForPickupUpdatedMappingsTask', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.waitForPickupUpdatedMappingsTask(client, 'my task id', '60s');
const task = Actions.waitForPickupUpdatedMappingsTask({
client,
taskId: 'my task id',
timeout: '60s',
});
try {
await task();
} catch (e) {
@ -178,7 +200,10 @@ describe('actions', () => {
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
it('re-throws non retry-able errors', async () => {
const task = Actions.setWriteBlock(clientWithNonRetryableError, 'my_index');
const task = Actions.setWriteBlock({
client: clientWithNonRetryableError,
index: 'my_index',
});
await task();
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError);
});
@ -186,7 +211,7 @@ describe('actions', () => {
describe('updateAliases', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.updateAliases(client, []);
const task = Actions.updateAliases({ client, aliasActions: [] });
try {
await task();
} catch (e) {
@ -196,7 +221,10 @@ describe('actions', () => {
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
it('re-throws non retry-able errors', async () => {
const task = Actions.setWriteBlock(clientWithNonRetryableError, 'my_index');
const task = Actions.setWriteBlock({
client: clientWithNonRetryableError,
index: 'my_index',
});
await task();
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError);
});
@ -204,7 +232,11 @@ describe('actions', () => {
describe('createIndex', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.createIndex(client, 'new_index', { properties: {} });
const task = Actions.createIndex({
client,
indexName: 'new_index',
mappings: { properties: {} },
});
try {
await task();
} catch (e) {
@ -214,7 +246,10 @@ describe('actions', () => {
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
it('re-throws non retry-able errors', async () => {
const task = Actions.setWriteBlock(clientWithNonRetryableError, 'my_index');
const task = Actions.setWriteBlock({
client: clientWithNonRetryableError,
index: 'my_index',
});
await task();
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError);
});
@ -222,7 +257,11 @@ describe('actions', () => {
describe('updateAndPickupMappings', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.updateAndPickupMappings(client, 'new_index', { properties: {} });
const task = Actions.updateAndPickupMappings({
client,
index: 'new_index',
mappings: { properties: {} },
});
try {
await task();
} catch (e) {
@ -276,7 +315,12 @@ describe('actions', () => {
describe('bulkOverwriteTransformedDocuments', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.bulkOverwriteTransformedDocuments(client, 'new_index', [], 'wait_for');
const task = Actions.bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});
try {
await task();
} catch (e) {
@ -289,7 +333,7 @@ describe('actions', () => {
describe('refreshIndex', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.refreshIndex(client, 'target_index');
const task = Actions.refreshIndex({ client, targetIndex: 'target_index' });
try {
await task();
} catch (e) {

View file

@ -68,20 +68,26 @@ export type FetchIndexResponse = Record<
{ aliases: Record<string, unknown>; mappings: IndexMapping; settings: unknown }
>;
/** @internal */
export interface FetchIndicesParams {
client: ElasticsearchClient;
indices: string[];
}
/**
* Fetches information about the given indices including aliases, mappings and
* settings.
*/
export const fetchIndices = (
client: ElasticsearchClient,
indicesToFetch: string[]
): TaskEither.TaskEither<RetryableEsClientError, FetchIndexResponse> =>
export const fetchIndices = ({
client,
indices,
}: FetchIndicesParams): TaskEither.TaskEither<RetryableEsClientError, FetchIndexResponse> =>
// @ts-expect-error @elastic/elasticsearch IndexState.alias and IndexState.mappings should be required
() => {
return client.indices
.get(
{
index: indicesToFetch,
index: indices,
ignore_unavailable: true, // Don't return an error for missing indices. Note this *will* include closed indices, the docs are misleading https://github.com/elastic/elasticsearch/issues/63607
},
{ ignore: [404], maxRetries: 0 }
@ -96,6 +102,12 @@ export interface IndexNotFound {
type: 'index_not_found_exception';
index: string;
}
/** @internal */
export interface SetWriteBlockParams {
client: ElasticsearchClient;
index: string;
}
/**
* Sets a write block in place for the given index. If the response includes
* `acknowledged: true` all in-progress writes have drained and no further
@ -105,10 +117,10 @@ export interface IndexNotFound {
* include `shards_acknowledged: true` but once the block is in place,
* subsequent calls return `shards_acknowledged: false`
*/
export const setWriteBlock = (
client: ElasticsearchClient,
index: string
): TaskEither.TaskEither<
export const setWriteBlock = ({
client,
index,
}: SetWriteBlockParams): TaskEither.TaskEither<
IndexNotFound | RetryableEsClientError,
'set_write_block_succeeded'
> => () => {
@ -145,13 +157,21 @@ export const setWriteBlock = (
);
};
/** @internal */
export interface RemoveWriteBlockParams {
client: ElasticsearchClient;
index: string;
}
/**
* Removes a write block from an index
*/
export const removeWriteBlock = (
client: ElasticsearchClient,
index: string
): TaskEither.TaskEither<RetryableEsClientError, 'remove_write_block_succeeded'> => () => {
export const removeWriteBlock = ({
client,
index,
}: RemoveWriteBlockParams): TaskEither.TaskEither<
RetryableEsClientError,
'remove_write_block_succeeded'
> => () => {
return client.indices
.putSettings<{
acknowledged: boolean;
@ -182,6 +202,12 @@ export const removeWriteBlock = (
.catch(catchRetryableEsClientErrors);
};
/** @internal */
export interface WaitForIndexStatusYellowParams {
client: ElasticsearchClient;
index: string;
timeout?: string;
}
/**
* A yellow index status means the index's primary shard is allocated and the
* index is ready for searching/indexing documents, but ES wasn't able to
@ -193,11 +219,11 @@ export const removeWriteBlock = (
* yellow at any point in the future. So ultimately data-redundancy is up to
* users to maintain.
*/
export const waitForIndexStatusYellow = (
client: ElasticsearchClient,
index: string,
timeout = DEFAULT_TIMEOUT
): TaskEither.TaskEither<RetryableEsClientError, {}> => () => {
export const waitForIndexStatusYellow = ({
client,
index,
timeout = DEFAULT_TIMEOUT,
}: WaitForIndexStatusYellowParams): TaskEither.TaskEither<RetryableEsClientError, {}> => () => {
return client.cluster
.health({ index, wait_for_status: 'yellow', timeout })
.then(() => {
@ -208,6 +234,14 @@ export const waitForIndexStatusYellow = (
export type CloneIndexResponse = AcknowledgeResponse;
/** @internal */
export interface CloneIndexParams {
client: ElasticsearchClient;
source: string;
target: string;
/** only used for testing */
timeout?: string;
}
/**
* Makes a clone of the source index into the target.
*
@ -218,13 +252,15 @@ export type CloneIndexResponse = AcknowledgeResponse;
* - the first call will wait up to 120s for the cluster state and all shards
* to be updated.
*/
export const cloneIndex = (
client: ElasticsearchClient,
source: string,
target: string,
/** only used for testing */
timeout = DEFAULT_TIMEOUT
): TaskEither.TaskEither<RetryableEsClientError | IndexNotFound, CloneIndexResponse> => {
export const cloneIndex = ({
client,
source,
target,
timeout = DEFAULT_TIMEOUT,
}: CloneIndexParams): TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound,
CloneIndexResponse
> => {
const cloneTask: TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound,
AcknowledgeResponse
@ -302,7 +338,7 @@ export const cloneIndex = (
} else {
// Otherwise, wait until the target index has a 'green' status.
return pipe(
waitForIndexStatusYellow(client, target, timeout),
waitForIndexStatusYellow({ client, index: target, timeout }),
TaskEither.map((value) => {
/** When the index status is 'green' we know that all shards were started */
return { acknowledged: true, shardsAcknowledged: true };
@ -352,16 +388,22 @@ const catchWaitForTaskCompletionTimeout = (
}
};
/** @internal */
export interface WaitForTaskParams {
client: ElasticsearchClient;
taskId: string;
timeout: string;
}
/**
* Blocks for up to 60s or until a task completes.
*
* TODO: delete completed tasks
*/
const waitForTask = (
client: ElasticsearchClient,
taskId: string,
timeout: string
): TaskEither.TaskEither<
const waitForTask = ({
client,
taskId,
timeout,
}: WaitForTaskParams): TaskEither.TaskEither<
RetryableEsClientError | WaitForTaskCompletionTimeout,
WaitForTaskResponse
> => () => {
@ -433,16 +475,21 @@ export interface OpenPitResponse {
pitId: string;
}
/** @internal */
export interface OpenPitParams {
client: ElasticsearchClient;
index: string;
}
// how long ES should keep PIT alive
const pitKeepAlive = '10m';
/*
* Creates a lightweight view of data when the request has been initiated.
* See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html
* */
export const openPit = (
client: ElasticsearchClient,
index: string
): TaskEither.TaskEither<RetryableEsClientError, OpenPitResponse> => () => {
export const openPit = ({
client,
index,
}: OpenPitParams): TaskEither.TaskEither<RetryableEsClientError, OpenPitResponse> => () => {
return client
.openPointInTime({
index,
@ -459,17 +506,28 @@ export interface ReadWithPit {
readonly totalHits: number | undefined;
}
/** @internal */
export interface ReadWithPitParams {
client: ElasticsearchClient;
pitId: string;
query: estypes.QueryContainer;
batchSize: number;
searchAfter?: number[];
seqNoPrimaryTerm?: boolean;
}
/*
* Requests documents from the index using PIT mechanism.
* */
export const readWithPit = (
client: ElasticsearchClient,
pitId: string,
query: estypes.QueryContainer,
batchSize: number,
searchAfter?: number[],
seqNoPrimaryTerm?: boolean
): TaskEither.TaskEither<RetryableEsClientError, ReadWithPit> => () => {
export const readWithPit = ({
client,
pitId,
query,
batchSize,
searchAfter,
seqNoPrimaryTerm,
}: ReadWithPitParams): TaskEither.TaskEither<RetryableEsClientError, ReadWithPit> => () => {
return client
.search<SavedObjectsRawDoc>({
seq_no_primary_term: seqNoPrimaryTerm,
@ -516,14 +574,19 @@ export const readWithPit = (
.catch(catchRetryableEsClientErrors);
};
/** @internal */
export interface ClosePitParams {
client: ElasticsearchClient;
pitId: string;
}
/*
* Closes PIT.
* See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html
* */
export const closePit = (
client: ElasticsearchClient,
pitId: string
): TaskEither.TaskEither<RetryableEsClientError, {}> => () => {
export const closePit = ({
client,
pitId,
}: ClosePitParams): TaskEither.TaskEither<RetryableEsClientError, {}> => () => {
return client
.closePointInTime({
body: { id: pitId },
@ -537,27 +600,42 @@ export const closePit = (
.catch(catchRetryableEsClientErrors);
};
/** @internal */
export interface TransformDocsParams {
transformRawDocs: TransformRawDocs;
outdatedDocuments: SavedObjectsRawDoc[];
}
/*
* Transform outdated docs
* */
export const transformDocs = (
transformRawDocs: TransformRawDocs,
outdatedDocuments: SavedObjectsRawDoc[]
): TaskEither.TaskEither<DocumentsTransformFailed, DocumentsTransformSuccess> =>
transformRawDocs(outdatedDocuments);
export const transformDocs = ({
transformRawDocs,
outdatedDocuments,
}: TransformDocsParams): TaskEither.TaskEither<
DocumentsTransformFailed,
DocumentsTransformSuccess
> => transformRawDocs(outdatedDocuments);
/** @internal */
export interface ReindexResponse {
taskId: string;
}
/** @internal */
export interface RefreshIndexParams {
client: ElasticsearchClient;
targetIndex: string;
}
/**
* Wait for Elasticsearch to reindex all the changes.
*/
export const refreshIndex = (
client: ElasticsearchClient,
targetIndex: string
): TaskEither.TaskEither<RetryableEsClientError, { refreshed: boolean }> => () => {
export const refreshIndex = ({
client,
targetIndex,
}: RefreshIndexParams): TaskEither.TaskEither<
RetryableEsClientError,
{ refreshed: boolean }
> => () => {
return client.indices
.refresh({
index: targetIndex,
@ -567,6 +645,19 @@ export const refreshIndex = (
})
.catch(catchRetryableEsClientErrors);
};
/** @internal */
export interface ReindexParams {
client: ElasticsearchClient;
sourceIndex: string;
targetIndex: string;
reindexScript: Option.Option<string>;
requireAlias: boolean;
/* When reindexing we use a source query to exclude saved objects types which
* are no longer used. These saved objects will still be kept in the outdated
* index for backup purposes, but won't be available in the upgraded index.
*/
unusedTypesQuery: estypes.QueryContainer;
}
/**
* Reindex documents from the `sourceIndex` into the `targetIndex`. Returns a
* task ID which can be tracked for progress.
@ -575,18 +666,14 @@ export const refreshIndex = (
* this in parallel. By using `op_type: 'create', conflicts: 'proceed'` there
* will be only one write per reindexed document.
*/
export const reindex = (
client: ElasticsearchClient,
sourceIndex: string,
targetIndex: string,
reindexScript: Option.Option<string>,
requireAlias: boolean,
/* When reindexing we use a source query to exclude saved objects types which
* are no longer used. These saved objects will still be kept in the outdated
* index for backup purposes, but won't be available in the upgraded index.
*/
unusedTypesQuery: estypes.QueryContainer
): TaskEither.TaskEither<RetryableEsClientError, ReindexResponse> => () => {
export const reindex = ({
client,
sourceIndex,
targetIndex,
reindexScript,
requireAlias,
unusedTypesQuery,
}: ReindexParams): TaskEither.TaskEither<RetryableEsClientError, ReindexResponse> => () => {
return client
.reindex({
// Require targetIndex to be an alias. Prevents a new index from being
@ -688,11 +775,18 @@ export const waitForReindexTask = flow(
)
);
export const verifyReindex = (
client: ElasticsearchClient,
sourceIndex: string,
targetIndex: string
): TaskEither.TaskEither<
/** @internal */
export interface VerifyReindexParams {
client: ElasticsearchClient;
sourceIndex: string;
targetIndex: string;
}
export const verifyReindex = ({
client,
sourceIndex,
targetIndex,
}: VerifyReindexParams): TaskEither.TaskEither<
RetryableEsClientError | { type: 'verify_reindex_failed' },
'verify_reindex_succeeded'
> => () => {
@ -762,13 +856,18 @@ export type AliasAction =
| { remove: { index: string; alias: string; must_exist: boolean } }
| { add: { index: string; alias: string } };
/** @internal */
export interface UpdateAliasesParams {
client: ElasticsearchClient;
aliasActions: AliasAction[];
}
/**
* Calls the Update index alias API `_alias` with the provided alias actions.
*/
export const updateAliases = (
client: ElasticsearchClient,
aliasActions: AliasAction[]
): TaskEither.TaskEither<
export const updateAliases = ({
client,
aliasActions,
}: UpdateAliasesParams): TaskEither.TaskEither<
IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError,
'update_aliases_succeeded'
> => () => {
@ -836,6 +935,14 @@ function aliasArrayToRecord(aliases: string[]): Record<string, estypes.Alias> {
}
return result;
}
/** @internal */
export interface CreateIndexParams {
client: ElasticsearchClient;
indexName: string;
mappings: IndexMapping;
aliases?: string[];
}
/**
* Creates an index with the given mappings
*
@ -846,12 +953,12 @@ function aliasArrayToRecord(aliases: string[]): Record<string, estypes.Alias> {
* - the first call will wait up to 120s for the cluster state and all shards
* to be updated.
*/
export const createIndex = (
client: ElasticsearchClient,
indexName: string,
mappings: IndexMapping,
aliases: string[] = []
): TaskEither.TaskEither<RetryableEsClientError, 'create_index_succeeded'> => {
export const createIndex = ({
client,
indexName,
mappings,
aliases = [],
}: CreateIndexParams): TaskEither.TaskEither<RetryableEsClientError, 'create_index_succeeded'> => {
const createIndexTask: TaskEither.TaskEither<
RetryableEsClientError,
AcknowledgeResponse
@ -930,7 +1037,7 @@ export const createIndex = (
} else {
// Otherwise, wait until the target index has a 'yellow' status.
return pipe(
waitForIndexStatusYellow(client, indexName, DEFAULT_TIMEOUT),
waitForIndexStatusYellow({ client, index: indexName, timeout: DEFAULT_TIMEOUT }),
TaskEither.map(() => {
/** When the index status is 'yellow' we know that all shards were started */
return 'create_index_succeeded';
@ -946,15 +1053,24 @@ export interface UpdateAndPickupMappingsResponse {
taskId: string;
}
/** @internal */
export interface UpdateAndPickupMappingsParams {
client: ElasticsearchClient;
index: string;
mappings: IndexMapping;
}
/**
* Updates an index's mappings and runs an pickupUpdatedMappings task so that the mapping
* changes are "picked up". Returns a taskId to track progress.
*/
export const updateAndPickupMappings = (
client: ElasticsearchClient,
index: string,
mappings: IndexMapping
): TaskEither.TaskEither<RetryableEsClientError, UpdateAndPickupMappingsResponse> => {
export const updateAndPickupMappings = ({
client,
index,
mappings,
}: UpdateAndPickupMappingsParams): TaskEither.TaskEither<
RetryableEsClientError,
UpdateAndPickupMappingsResponse
> => {
const putMappingTask: TaskEither.TaskEither<
RetryableEsClientError,
'update_mappings_succeeded'
@ -1053,16 +1169,26 @@ export const searchForOutdatedDocuments = (
.catch(catchRetryableEsClientErrors);
};
/** @internal */
export interface BulkOverwriteTransformedDocumentsParams {
client: ElasticsearchClient;
index: string;
transformedDocs: SavedObjectsRawDoc[];
refresh?: estypes.Refresh;
}
/**
* Write the up-to-date transformed documents to the index, overwriting any
* documents that are still on their outdated version.
*/
export const bulkOverwriteTransformedDocuments = (
client: ElasticsearchClient,
index: string,
transformedDocs: SavedObjectsRawDoc[],
refresh: estypes.Refresh
): TaskEither.TaskEither<RetryableEsClientError, 'bulk_index_succeeded'> => () => {
export const bulkOverwriteTransformedDocuments = ({
client,
index,
transformedDocs,
refresh = false,
}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither<
RetryableEsClientError,
'bulk_index_succeeded'
> => () => {
return client
.bulk({
// Because we only add aliases in the MARK_VERSION_INDEX_READY step we

View file

@ -19,7 +19,7 @@ export async function cleanup(
if (!state) return;
if ('sourceIndexPitId' in state) {
try {
await Actions.closePit(client, state.sourceIndexPitId)();
await Actions.closePit({ client, pitId: state.sourceIndexPitId })();
} catch (e) {
executionLog.push({
type: 'cleanup',

View file

@ -58,38 +58,46 @@ export type ResponseType<ControlState extends AllActionStates> = UnwrapPromise<
export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: TransformRawDocs) => {
return {
INIT: (state: InitState) =>
Actions.fetchIndices(client, [state.currentAlias, state.versionAlias]),
Actions.fetchIndices({ client, indices: [state.currentAlias, state.versionAlias] }),
WAIT_FOR_YELLOW_SOURCE: (state: WaitForYellowSourceState) =>
Actions.waitForIndexStatusYellow(client, state.sourceIndex.value),
Actions.waitForIndexStatusYellow({ client, index: state.sourceIndex.value }),
SET_SOURCE_WRITE_BLOCK: (state: SetSourceWriteBlockState) =>
Actions.setWriteBlock(client, state.sourceIndex.value),
Actions.setWriteBlock({ client, index: state.sourceIndex.value }),
CREATE_NEW_TARGET: (state: CreateNewTargetState) =>
Actions.createIndex(client, state.targetIndex, state.targetIndexMappings),
CREATE_REINDEX_TEMP: (state: CreateReindexTempState) =>
Actions.createIndex(client, state.tempIndex, state.tempIndexMappings),
REINDEX_SOURCE_TO_TEMP_OPEN_PIT: (state: ReindexSourceToTempOpenPit) =>
Actions.openPit(client, state.sourceIndex.value),
REINDEX_SOURCE_TO_TEMP_READ: (state: ReindexSourceToTempRead) =>
Actions.readWithPit(
Actions.createIndex({
client,
state.sourceIndexPitId,
indexName: state.targetIndex,
mappings: state.targetIndexMappings,
}),
CREATE_REINDEX_TEMP: (state: CreateReindexTempState) =>
Actions.createIndex({
client,
indexName: state.tempIndex,
mappings: state.tempIndexMappings,
}),
REINDEX_SOURCE_TO_TEMP_OPEN_PIT: (state: ReindexSourceToTempOpenPit) =>
Actions.openPit({ client, index: state.sourceIndex.value }),
REINDEX_SOURCE_TO_TEMP_READ: (state: ReindexSourceToTempRead) =>
Actions.readWithPit({
client,
pitId: state.sourceIndexPitId,
/* When reading we use a source query to exclude saved objects types which
* are no longer used. These saved objects will still be kept in the outdated
* index for backup purposes, but won't be available in the upgraded index.
*/
state.unusedTypesQuery,
state.batchSize,
state.lastHitSortValue
),
query: state.unusedTypesQuery,
batchSize: state.batchSize,
searchAfter: state.lastHitSortValue,
}),
REINDEX_SOURCE_TO_TEMP_CLOSE_PIT: (state: ReindexSourceToTempClosePit) =>
Actions.closePit(client, state.sourceIndexPitId),
Actions.closePit({ client, pitId: state.sourceIndexPitId }),
REINDEX_SOURCE_TO_TEMP_INDEX: (state: ReindexSourceToTempIndex) =>
Actions.transformDocs(transformRawDocs, state.outdatedDocuments),
Actions.transformDocs({ transformRawDocs, outdatedDocuments: state.outdatedDocuments }),
REINDEX_SOURCE_TO_TEMP_INDEX_BULK: (state: ReindexSourceToTempIndexBulk) =>
Actions.bulkOverwriteTransformedDocuments(
Actions.bulkOverwriteTransformedDocuments({
client,
state.tempIndex,
state.transformedDocs,
index: state.tempIndex,
transformedDocs: state.transformedDocs,
/**
* Since we don't run a search against the target index, we disable "refresh" to speed up
* the migration process.
@ -97,39 +105,48 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra
* before we reach out to the OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT step.
* Right now, it's performed during REFRESH_TARGET step.
*/
false
),
refresh: false,
}),
SET_TEMP_WRITE_BLOCK: (state: SetTempWriteBlock) =>
Actions.setWriteBlock(client, state.tempIndex),
Actions.setWriteBlock({ client, index: state.tempIndex }),
CLONE_TEMP_TO_TARGET: (state: CloneTempToSource) =>
Actions.cloneIndex(client, state.tempIndex, state.targetIndex),
REFRESH_TARGET: (state: RefreshTarget) => Actions.refreshIndex(client, state.targetIndex),
Actions.cloneIndex({ client, source: state.tempIndex, target: state.targetIndex }),
REFRESH_TARGET: (state: RefreshTarget) =>
Actions.refreshIndex({ client, targetIndex: state.targetIndex }),
UPDATE_TARGET_MAPPINGS: (state: UpdateTargetMappingsState) =>
Actions.updateAndPickupMappings(client, state.targetIndex, state.targetIndexMappings),
Actions.updateAndPickupMappings({
client,
index: state.targetIndex,
mappings: state.targetIndexMappings,
}),
UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK: (state: UpdateTargetMappingsWaitForTaskState) =>
Actions.waitForPickupUpdatedMappingsTask(client, state.updateTargetMappingsTaskId, '60s'),
Actions.waitForPickupUpdatedMappingsTask({
client,
taskId: state.updateTargetMappingsTaskId,
timeout: '60s',
}),
OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT: (state: OutdatedDocumentsSearchOpenPit) =>
Actions.openPit(client, state.targetIndex),
Actions.openPit({ client, index: state.targetIndex }),
OUTDATED_DOCUMENTS_SEARCH_READ: (state: OutdatedDocumentsSearchRead) =>
Actions.readWithPit(
Actions.readWithPit({
client,
state.pitId,
pitId: state.pitId,
// search for outdated documents only
state.outdatedDocumentsQuery,
state.batchSize,
state.lastHitSortValue
),
query: state.outdatedDocumentsQuery,
batchSize: state.batchSize,
searchAfter: state.lastHitSortValue,
}),
OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT: (state: OutdatedDocumentsSearchClosePit) =>
Actions.closePit(client, state.pitId),
Actions.closePit({ client, pitId: state.pitId }),
OUTDATED_DOCUMENTS_REFRESH: (state: OutdatedDocumentsRefresh) =>
Actions.refreshIndex(client, state.targetIndex),
Actions.refreshIndex({ client, targetIndex: state.targetIndex }),
OUTDATED_DOCUMENTS_TRANSFORM: (state: OutdatedDocumentsTransform) =>
Actions.transformDocs(transformRawDocs, state.outdatedDocuments),
Actions.transformDocs({ transformRawDocs, outdatedDocuments: state.outdatedDocuments }),
TRANSFORMED_DOCUMENTS_BULK_INDEX: (state: TransformedDocumentsBulkIndex) =>
Actions.bulkOverwriteTransformedDocuments(
Actions.bulkOverwriteTransformedDocuments({
client,
state.targetIndex,
state.transformedDocs,
index: state.targetIndex,
transformedDocs: state.transformedDocs,
/**
* Since we don't run a search against the target index, we disable "refresh" to speed up
* the migration process.
@ -137,29 +154,32 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra
* before we reach out to the MARK_VERSION_INDEX_READY step.
* Right now, it's performed during OUTDATED_DOCUMENTS_REFRESH step.
*/
false
),
}),
MARK_VERSION_INDEX_READY: (state: MarkVersionIndexReady) =>
Actions.updateAliases(client, state.versionIndexReadyActions.value),
Actions.updateAliases({ client, aliasActions: state.versionIndexReadyActions.value }),
MARK_VERSION_INDEX_READY_CONFLICT: (state: MarkVersionIndexReadyConflict) =>
Actions.fetchIndices(client, [state.currentAlias, state.versionAlias]),
Actions.fetchIndices({ client, indices: [state.currentAlias, state.versionAlias] }),
LEGACY_SET_WRITE_BLOCK: (state: LegacySetWriteBlockState) =>
Actions.setWriteBlock(client, state.legacyIndex),
Actions.setWriteBlock({ client, index: state.legacyIndex }),
LEGACY_CREATE_REINDEX_TARGET: (state: LegacyCreateReindexTargetState) =>
Actions.createIndex(client, state.sourceIndex.value, state.legacyReindexTargetMappings),
LEGACY_REINDEX: (state: LegacyReindexState) =>
Actions.reindex(
Actions.createIndex({
client,
state.legacyIndex,
state.sourceIndex.value,
state.preMigrationScript,
false,
state.unusedTypesQuery
),
indexName: state.sourceIndex.value,
mappings: state.legacyReindexTargetMappings,
}),
LEGACY_REINDEX: (state: LegacyReindexState) =>
Actions.reindex({
client,
sourceIndex: state.legacyIndex,
targetIndex: state.sourceIndex.value,
reindexScript: state.preMigrationScript,
requireAlias: false,
unusedTypesQuery: state.unusedTypesQuery,
}),
LEGACY_REINDEX_WAIT_FOR_TASK: (state: LegacyReindexWaitForTaskState) =>
Actions.waitForReindexTask(client, state.legacyReindexTaskId, '60s'),
Actions.waitForReindexTask({ client, taskId: state.legacyReindexTaskId, timeout: '60s' }),
LEGACY_DELETE: (state: LegacyDeleteState) =>
Actions.updateAliases(client, state.legacyPreMigrationDoneActions),
Actions.updateAliases({ client, aliasActions: state.legacyPreMigrationDoneActions }),
};
};