mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
* Refactor: Move checking of closed index to single point We should rather only check if an index is currently closed the moment before starting to reindex. We still store a flag to indicate that we opened an index that was closed, but this should not be set from the reindex handlers because the reindex task may only start some time later in which case the closed index could have been opened and our reindex job will open it and close it again. * Added debug log * Added comment Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com> Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
parent
51760f3b40
commit
378e394d53
7 changed files with 41 additions and 33 deletions
|
@ -4,27 +4,24 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { IScopedClusterClient } from 'kibana/server';
|
||||
import { APICaller } from 'kibana/server';
|
||||
import { getIndexStateFromClusterState } from '../../common/get_index_state_from_cluster_state';
|
||||
import { ClusterStateAPIResponse } from '../../common/types';
|
||||
|
||||
type StatusCheckResult = Record<string, 'open' | 'close'>;
|
||||
|
||||
export const esIndicesStateCheck = async (
|
||||
dataClient: IScopedClusterClient,
|
||||
callAsUser: APICaller,
|
||||
indices: string[]
|
||||
): Promise<StatusCheckResult> => {
|
||||
// According to https://www.elastic.co/guide/en/elasticsearch/reference/7.6/cluster-state.html
|
||||
// The response from this call is considered internal and subject to change. We have an API
|
||||
// integration test for asserting that the current ES version still returns what we expect.
|
||||
// This lives in x-pack/test/upgrade_assistant_integration
|
||||
const clusterState: ClusterStateAPIResponse = await dataClient.callAsCurrentUser(
|
||||
'cluster.state',
|
||||
{
|
||||
index: indices,
|
||||
metric: 'metadata',
|
||||
}
|
||||
);
|
||||
const clusterState: ClusterStateAPIResponse = await callAsUser('cluster.state', {
|
||||
index: indices,
|
||||
metric: 'metadata',
|
||||
});
|
||||
|
||||
const result: StatusCheckResult = {};
|
||||
|
||||
|
|
|
@ -32,7 +32,10 @@ export async function getUpgradeAssistantStatus(
|
|||
// If we have found deprecation information for index/indices check whether the index is
|
||||
// open or closed.
|
||||
if (indexNames.length) {
|
||||
const indexStates = await esIndicesStateCheck(dataClient, indexNames);
|
||||
const indexStates = await esIndicesStateCheck(
|
||||
dataClient.callAsCurrentUser.bind(dataClient),
|
||||
indexNames
|
||||
);
|
||||
|
||||
indices.forEach(indexData => {
|
||||
indexData.blockerForReindexing =
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
jest.mock('../es_indices_state_check', () => ({ esIndicesStateCheck: jest.fn() }));
|
||||
import { BehaviorSubject } from 'rxjs';
|
||||
import { Logger } from 'src/core/server';
|
||||
import { loggingServiceMock } from 'src/core/server/mocks';
|
||||
|
@ -22,6 +22,8 @@ import { LicensingPluginSetup } from '../../../../licensing/server';
|
|||
import { apmReindexScript } from '../apm';
|
||||
import apmMappings from '../apm/mapping.json';
|
||||
|
||||
import { esIndicesStateCheck } from '../es_indices_state_check';
|
||||
|
||||
import {
|
||||
isMlIndex,
|
||||
isWatcherIndex,
|
||||
|
@ -46,6 +48,7 @@ describe('reindexService', () => {
|
|||
Promise.reject(`Mock function ${name} was not implemented!`);
|
||||
|
||||
beforeEach(() => {
|
||||
(esIndicesStateCheck as jest.Mock).mockResolvedValue({});
|
||||
actions = {
|
||||
createReindexOp: jest.fn(unimplemented('createReindexOp')),
|
||||
deleteReindexOp: jest.fn(unimplemented('deleteReindexOp')),
|
||||
|
@ -896,7 +899,6 @@ describe('reindexService', () => {
|
|||
attributes: {
|
||||
...defaultAttributes,
|
||||
lastCompletedStep: ReindexStep.newIndexCreated,
|
||||
reindexOptions: { openAndClose: false },
|
||||
},
|
||||
} as ReindexSavedObject;
|
||||
|
||||
|
|
|
@ -6,6 +6,8 @@
|
|||
import { APICaller, Logger } from 'src/core/server';
|
||||
import { first } from 'rxjs/operators';
|
||||
|
||||
import { LicensingPluginSetup } from '../../../../licensing/server';
|
||||
|
||||
import {
|
||||
IndexGroup,
|
||||
ReindexOptions,
|
||||
|
@ -18,14 +20,16 @@ import { CURRENT_MAJOR_VERSION } from '../../../common/version';
|
|||
import { apmReindexScript, isLegacyApmIndex } from '../apm';
|
||||
import apmMappings from '../apm/mapping.json';
|
||||
|
||||
import { esIndicesStateCheck } from '../es_indices_state_check';
|
||||
|
||||
import {
|
||||
generateNewIndexName,
|
||||
getReindexWarnings,
|
||||
sourceNameForIndex,
|
||||
transformFlatSettings,
|
||||
} from './index_settings';
|
||||
|
||||
import { ReindexActions } from './reindex_actions';
|
||||
import { LicensingPluginSetup } from '../../../../licensing/server';
|
||||
|
||||
import { error } from './error';
|
||||
|
||||
|
@ -322,7 +326,12 @@ export const reindexServiceFactory = (
|
|||
const startReindexing = async (reindexOp: ReindexSavedObject) => {
|
||||
const { indexName, reindexOptions } = reindexOp.attributes;
|
||||
|
||||
if (reindexOptions?.openAndClose === true) {
|
||||
// Where possible, derive reindex options at the last moment before reindexing
|
||||
// to prevent them from becoming stale as they wait in the queue.
|
||||
const indicesState = await esIndicesStateCheck(callAsUser, [indexName]);
|
||||
const openAndClose = indicesState[indexName] === 'close';
|
||||
if (indicesState[indexName] === 'close') {
|
||||
log.debug(`Detected closed index ${indexName}, opening...`);
|
||||
await callAsUser('indices.open', { index: indexName });
|
||||
}
|
||||
|
||||
|
@ -353,6 +362,12 @@ export const reindexServiceFactory = (
|
|||
lastCompletedStep: ReindexStep.reindexStarted,
|
||||
reindexTaskId: startReindex.task,
|
||||
reindexTaskPercComplete: 0,
|
||||
reindexOptions: {
|
||||
...(reindexOptions ?? {}),
|
||||
// Indicate to downstream states whether we opened a closed index that should be
|
||||
// closed again.
|
||||
openAndClose,
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
|
@ -680,9 +695,16 @@ export const reindexServiceFactory = (
|
|||
throw new Error(`Reindex operation must be paused in order to be resumed.`);
|
||||
}
|
||||
|
||||
const reindexOptions: ReindexOptions | undefined = opts
|
||||
? {
|
||||
...(op.attributes.reindexOptions ?? {}),
|
||||
...opts,
|
||||
}
|
||||
: undefined;
|
||||
|
||||
return actions.updateReindexOp(op, {
|
||||
status: ReindexStatus.inProgress,
|
||||
reindexOptions: opts ?? op.attributes.reindexOptions,
|
||||
reindexOptions,
|
||||
});
|
||||
});
|
||||
},
|
||||
|
|
|
@ -24,7 +24,6 @@ interface ReindexHandlerArgs {
|
|||
headers: Record<string, any>;
|
||||
credentialStore: CredentialStore;
|
||||
reindexOptions?: {
|
||||
openAndClose?: boolean;
|
||||
enqueue?: boolean;
|
||||
};
|
||||
}
|
||||
|
@ -56,7 +55,6 @@ export const reindexHandler = async ({
|
|||
|
||||
const opts: ReindexOptions | undefined = reindexOptions
|
||||
? {
|
||||
openAndClose: reindexOptions.openAndClose,
|
||||
queueSettings: reindexOptions.enqueue ? { queuedAt: Date.now() } : undefined,
|
||||
}
|
||||
: undefined;
|
||||
|
|
|
@ -21,7 +21,6 @@ const mockReindexService = {
|
|||
resumeReindexOperation: jest.fn(),
|
||||
cancelReindexing: jest.fn(),
|
||||
};
|
||||
jest.mock('../../lib/es_indices_state_check', () => ({ esIndicesStateCheck: jest.fn() }));
|
||||
jest.mock('../../lib/es_version_precheck', () => ({
|
||||
versionCheckHandlerWrapper: (a: any) => a,
|
||||
}));
|
||||
|
@ -40,7 +39,6 @@ import {
|
|||
} from '../../../common/types';
|
||||
import { credentialStoreFactory } from '../../lib/reindexing/credential_store';
|
||||
import { registerReindexIndicesRoutes } from './reindex_indices';
|
||||
import { esIndicesStateCheck } from '../../lib/es_indices_state_check';
|
||||
|
||||
/**
|
||||
* Since these route callbacks are so thin, these serve simply as integration tests
|
||||
|
@ -58,7 +56,6 @@ describe('reindex API', () => {
|
|||
} as any;
|
||||
|
||||
beforeEach(() => {
|
||||
(esIndicesStateCheck as jest.Mock).mockResolvedValue({});
|
||||
mockRouter = createMockRouter();
|
||||
routeDependencies = {
|
||||
credentialStore,
|
||||
|
@ -170,9 +167,7 @@ describe('reindex API', () => {
|
|||
);
|
||||
|
||||
// It called create correctly
|
||||
expect(mockReindexService.createReindexOperation).toHaveBeenCalledWith('theIndex', {
|
||||
openAndClose: false,
|
||||
});
|
||||
expect(mockReindexService.createReindexOperation).toHaveBeenCalledWith('theIndex', undefined);
|
||||
|
||||
// It returned the right results
|
||||
expect(resp.status).toEqual(200);
|
||||
|
@ -239,10 +234,7 @@ describe('reindex API', () => {
|
|||
kibanaResponseFactory
|
||||
);
|
||||
// It called resume correctly
|
||||
expect(mockReindexService.resumeReindexOperation).toHaveBeenCalledWith('theIndex', {
|
||||
openAndClose: false,
|
||||
queueSettings: undefined,
|
||||
});
|
||||
expect(mockReindexService.resumeReindexOperation).toHaveBeenCalledWith('theIndex', undefined);
|
||||
expect(mockReindexService.createReindexOperation).not.toHaveBeenCalled();
|
||||
|
||||
// It returned the right results
|
||||
|
@ -271,7 +263,6 @@ describe('reindex API', () => {
|
|||
|
||||
describe('POST /api/upgrade_assistant/reindex/batch', () => {
|
||||
const queueSettingsArg = {
|
||||
openAndClose: false,
|
||||
queueSettings: { queuedAt: expect.any(Number) },
|
||||
};
|
||||
it('creates a collection of index operations', async () => {
|
||||
|
|
|
@ -16,7 +16,6 @@ import { LicensingPluginSetup } from '../../../../licensing/server';
|
|||
import { ReindexStatus } from '../../../common/types';
|
||||
|
||||
import { versionCheckHandlerWrapper } from '../../lib/es_version_precheck';
|
||||
import { esIndicesStateCheck } from '../../lib/es_indices_state_check';
|
||||
import { reindexServiceFactory, ReindexWorker } from '../../lib/reindexing';
|
||||
import { CredentialStore } from '../../lib/reindexing/credential_store';
|
||||
import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions';
|
||||
|
@ -121,7 +120,6 @@ export function registerReindexIndicesRoutes(
|
|||
response
|
||||
) => {
|
||||
const { indexName } = request.params;
|
||||
const indexStates = await esIndicesStateCheck(dataClient, [indexName]);
|
||||
try {
|
||||
const result = await reindexHandler({
|
||||
savedObjects: savedObjectsClient,
|
||||
|
@ -131,7 +129,6 @@ export function registerReindexIndicesRoutes(
|
|||
licensing,
|
||||
headers: request.headers,
|
||||
credentialStore,
|
||||
reindexOptions: { openAndClose: indexStates[indexName] === 'close' },
|
||||
});
|
||||
|
||||
// Kick the worker on this node to immediately pickup the new reindex operation.
|
||||
|
@ -203,7 +200,6 @@ export function registerReindexIndicesRoutes(
|
|||
response
|
||||
) => {
|
||||
const { indexNames } = request.body;
|
||||
const indexStates = await esIndicesStateCheck(dataClient, indexNames);
|
||||
const results: PostBatchResponse = {
|
||||
enqueued: [],
|
||||
errors: [],
|
||||
|
@ -219,7 +215,6 @@ export function registerReindexIndicesRoutes(
|
|||
headers: request.headers,
|
||||
credentialStore,
|
||||
reindexOptions: {
|
||||
openAndClose: indexStates[indexName] === 'close',
|
||||
enqueue: true,
|
||||
},
|
||||
});
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue