[upgrade assistant] Stop rollup jobs before reindexing - forwardport to 9.1 (#218049)

## Summary

forward port of https://github.com/elastic/kibana/pull/212815

---

This PR improve support for rollup indices. Rollup indices can be
handled like normal indices but jobs should be stopped before reindexing
begins or index is marked read only. Also handles case where the rollup
job is already stopped.

To review: Mark the following read only and make sure rollup jobs are
handled as appropriate: Rollup index with and without job running,
normal index.

Follow up to https://github.com/elastic/kibana/pull/212592 and
https://github.com/elastic/kibana/pull/214656

Closes: https://github.com/elastic/kibana/issues/211850
This commit is contained in:
Matthew Kime 2025-04-21 09:48:08 -05:00 committed by GitHub
parent a6ae117dd3
commit fcee0a8c73
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 118 additions and 15 deletions

View file

@ -115,6 +115,7 @@ export interface ReindexOperation {
errorMessage: string | null;
// This field is only used for the singleton IndexConsumerType documents.
runningReindexCount: number | null;
rollupJob?: string;
/**
* The original index settings to set after reindex is completed.

View file

@ -6,7 +6,7 @@
*/
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import type { ScopedClusterClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import moment from 'moment';
@ -22,10 +22,15 @@ import { getMockVersionInfo } from '../__fixtures__/version';
const { currentMajor, prevMajor } = getMockVersionInfo();
jest.mock('../rollup_job', () => ({
getRollupJobByIndexName: jest.fn(),
}));
describe('ReindexActions', () => {
let client: jest.Mocked<any>;
let clusterClient: ScopedClusterClientMock;
let actions: ReindexActions;
const log = loggingSystemMock.createLogger();
const unimplemented = (name: string) => () =>
Promise.reject(`Mock function ${name} was not implemented!`);
@ -45,7 +50,7 @@ describe('ReindexActions', () => {
) as any,
};
clusterClient = elasticsearchServiceMock.createScopedClusterClient();
actions = reindexActionsFactory(client, clusterClient.asCurrentUser);
actions = reindexActionsFactory(client, clusterClient.asCurrentUser, log);
});
describe('createReindexOp', () => {

View file

@ -11,6 +11,7 @@ import {
SavedObjectsFindResponse,
SavedObjectsClientContract,
ElasticsearchClient,
Logger,
} from '@kbn/core/server';
import {
REINDEX_OP_TYPE,
@ -22,6 +23,7 @@ import {
} from '../../../common/types';
import { generateNewIndexName } from './index_settings';
import { FlatSettings } from './types';
import { getRollupJobByIndexName } from '../rollup_job';
// TODO: base on elasticsearch.requestTimeout?
export const LOCK_WINDOW = moment.duration(90, 'seconds');
@ -84,7 +86,8 @@ export interface ReindexActions {
export const reindexActionsFactory = (
client: SavedObjectsClientContract,
esClient: ElasticsearchClient
esClient: ElasticsearchClient,
log: Logger
): ReindexActions => {
// ----- Internal functions
const isLocked = (reindexOp: ReindexSavedObject) => {
@ -125,6 +128,9 @@ export const reindexActionsFactory = (
// ----- Public interface
return {
async createReindexOp(indexName: string, opts?: ReindexOptions) {
// gets rollup job if it exists and needs stopping, otherwise returns undefined
const rollupJob = await getRollupJobByIndexName(esClient, log, indexName);
return client.create<ReindexOperation>(REINDEX_OP_TYPE, {
indexName,
newIndexName: generateNewIndexName(indexName),
@ -136,6 +142,7 @@ export const reindexActionsFactory = (
errorMessage: null,
runningReindexCount: null,
reindexOptions: opts,
rollupJob,
});
},

View file

@ -169,7 +169,12 @@ export const reindexServiceFactory = (
* @param reindexOp
*/
const setReadonly = async (reindexOp: ReindexSavedObject) => {
const { indexName } = reindexOp.attributes;
const { indexName, rollupJob } = reindexOp.attributes;
if (rollupJob) {
await esClient.rollup.stopJob({ id: rollupJob, wait_for_completion: true });
}
const putReadonly = await esClient.indices.putSettings({
index: indexName,
body: { blocks: { write: true } },
@ -428,6 +433,11 @@ export const reindexServiceFactory = (
await esClient.indices.close({ index: indexName });
}
if (reindexOp.attributes.rollupJob) {
// start the rollup job. rollupJob is undefined if the rollup job is stopped
await esClient.rollup.startJob({ id: reindexOp.attributes.rollupJob });
}
return actions.updateReindexOp(reindexOp, {
lastCompletedStep: ReindexStep.aliasCreated,
});

View file

@ -88,7 +88,7 @@ export class ReindexWorker {
this.reindexService = reindexServiceFactory(
callAsInternalUser,
reindexActionsFactory(this.client, callAsInternalUser),
reindexActionsFactory(this.client, callAsInternalUser, this.log),
log,
this.licensing
);
@ -173,7 +173,7 @@ export class ReindexWorker {
const fakeRequest: FakeRequest = { headers: credential };
const scopedClusterClient = this.clusterClient.asScoped(fakeRequest);
const callAsCurrentUser = scopedClusterClient.asCurrentUser;
const actions = reindexActionsFactory(this.client, callAsCurrentUser);
const actions = reindexActionsFactory(this.client, callAsCurrentUser, this.log);
return reindexServiceFactory(callAsCurrentUser, actions, this.log, this.licensing);
};

View file

@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import {
RollupGetRollupIndexCapsResponse,
RollupGetJobsResponse,
} from '@elastic/elasticsearch/lib/api/types';
export async function getRollupJobByIndexName(
esClient: ElasticsearchClient,
log: Logger,
index: string
) {
let rollupCaps: RollupGetRollupIndexCapsResponse;
try {
rollupCaps = await esClient.rollup.getRollupIndexCaps({ index }, { ignore: [404] });
// may catch if not found in some circumstances, such as a closed index, etc
// would be nice to handle the error better but little info is provided
} catch (e) {
log.warn(`Get rollup index capabilities failed: ${e}`);
return;
}
const rollupIndices = Object.keys(rollupCaps);
let rollupJob: string | undefined;
// there should only be one job
if (rollupIndices.length === 1) {
rollupJob = rollupCaps[rollupIndices[0]].rollup_jobs[0].job_id;
let jobs: RollupGetJobsResponse;
try {
jobs = await esClient.rollup.getJobs({ id: rollupJob }, { ignore: [404] });
// may catch if not found in some circumstances, such as a closed index, etc
// would be nice to handle the error better but little info is provided
} catch (e) {
log.warn(`Get rollup job failed: ${e}`);
return;
}
// there can only be one job. If its stopped then we don't need rollup handling
if (
// zero jobs shouldn't happen but we can handle it gracefully
jobs.jobs.length === 0 ||
// rollup job is stopped so we can treat it like a regular index
(jobs.jobs.length === 1 && jobs.jobs[0].status.job_state === 'stopped')
) {
rollupJob = undefined;
// this shouldn't be possible but just in case
} else if (jobs.jobs.length > 1) {
throw new Error(`Multiple jobs returned for a single rollup job id: + ${rollupJob}`);
}
// this shouldn't be possible but just in case
} else if (rollupIndices.length > 1) {
throw new Error(`Multiple indices returned for a single index name: + ${index}`);
}
return rollupJob;
}

View file

@ -5,13 +5,15 @@
* 2.0.
*/
import type { ElasticsearchClient } from '@kbn/core/server';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { UpdateIndexOperation } from '../../../common/update_index';
import { getRollupJobByIndexName } from '../rollup_job';
export interface UpdateIndexParams {
esClient: ElasticsearchClient;
index: string;
operations: UpdateIndexOperation[];
log: Logger;
}
/**
@ -20,12 +22,18 @@ export interface UpdateIndexParams {
* @param index The index to update
* @param operations The operations to perform on the specified index
*/
export async function updateIndex({ esClient, index, operations }: UpdateIndexParams) {
export async function updateIndex({ esClient, index, operations, log }: UpdateIndexParams) {
for (const operation of operations) {
let res;
switch (operation) {
case 'blockWrite': {
// stop related rollup job if it exists
const rollupJob = await getRollupJobByIndexName(esClient, log, index);
if (rollupJob) {
await esClient.rollup.stopJob({ id: rollupJob, wait_for_completion: true });
}
res = await esClient.indices.addBlock({ index, block: 'write' });
break;
}

View file

@ -41,7 +41,7 @@ export function registerESDeprecationRoutes({
dataSourceExclusions,
});
const asCurrentUser = client.asCurrentUser;
const reindexActions = reindexActionsFactory(savedObjectsClient, asCurrentUser);
const reindexActions = reindexActionsFactory(savedObjectsClient, asCurrentUser, log);
const reindexService = reindexServiceFactory(asCurrentUser, reindexActions, log, licensing);
const indexNames = [...status.migrationsDeprecations, ...status.enrichedHealthIndicators]
.filter(({ index }) => typeof index !== 'undefined')

View file

@ -57,7 +57,8 @@ export function registerBatchReindexIndicesRoutes(
const callAsCurrentUser = esClient.asCurrentUser;
const reindexActions = reindexActionsFactory(
getClient({ includedHiddenTypes: [REINDEX_OP_TYPE] }),
callAsCurrentUser
callAsCurrentUser,
log
);
try {
const inProgressOps = await reindexActions.findAllByStatus(ReindexStatus.inProgress);

View file

@ -49,7 +49,7 @@ export const reindexHandler = async ({
security,
}: ReindexHandlerArgs): Promise<ReindexOperation> => {
const callAsCurrentUser = dataClient.asCurrentUser;
const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser);
const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser, log);
const reindexService = reindexServiceFactory(callAsCurrentUser, reindexActions, log, licensing);
if (!(await reindexService.hasRequiredPrivileges(indexName))) {

View file

@ -113,7 +113,8 @@ export function registerReindexIndicesRoutes(
const asCurrentUser = esClient.asCurrentUser;
const reindexActions = reindexActionsFactory(
getClient({ includedHiddenTypes: [REINDEX_OP_TYPE] }),
asCurrentUser
asCurrentUser,
log
);
const reindexService = reindexServiceFactory(asCurrentUser, reindexActions, log, licensing);
@ -184,7 +185,8 @@ export function registerReindexIndicesRoutes(
const callAsCurrentUser = esClient.asCurrentUser;
const reindexActions = reindexActionsFactory(
getClient({ includedHiddenTypes: [REINDEX_OP_TYPE] }),
callAsCurrentUser
callAsCurrentUser,
log
);
const reindexService = reindexServiceFactory(
callAsCurrentUser,

View file

@ -13,7 +13,11 @@ import { versionCheckHandlerWrapper } from '../lib/es_version_precheck';
import type { RouteDependencies } from '../types';
import { updateIndex } from '../lib/update_index';
export function registerUpdateIndexRoute({ router, lib: { handleEsError } }: RouteDependencies) {
export function registerUpdateIndexRoute({
router,
log,
lib: { handleEsError },
}: RouteDependencies) {
const BASE_PATH = `${API_BASE_PATH}/update_index`;
router.post(
{
@ -46,7 +50,7 @@ export function registerUpdateIndexRoute({ router, lib: { handleEsError } }: Rou
const { index } = request.params;
const { operations } = request.body;
try {
await updateIndex({ esClient: client.asCurrentUser, index, operations });
await updateIndex({ esClient: client.asCurrentUser, index, operations, log });
return response.ok();
} catch (err) {
if (err instanceof errors.ResponseError) {