Upgrade Assistant Follow-ups (#29663)

* Check security privileges before allowing reindexing

* Add global readyForUpgrade flag for Cloud

* Add ml_settings to cluster_settings

* Generalize locking mechanism for stop/starting watcher

* Display ML/Watcher stopping/resuming steps in UI

* Fix type issues

* Handle security being disabled for privilege check

* Use xpack_main.info + add types

* Fix x-pack builds
This commit is contained in:
Josh Dover 2019-02-04 13:36:42 -06:00 committed by GitHub
parent 564cb528c2
commit 888217dc0f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 1075 additions and 437 deletions

View file

@ -204,6 +204,7 @@ export interface DeprecationInfo {
export interface DeprecationAPIResponse {
cluster_settings: DeprecationInfo[];
ml_settings: DeprecationInfo[];
node_settings: DeprecationInfo[];
index_settings: {
[indexName: string]: DeprecationInfo[];

View file

@ -16,6 +16,7 @@
"common/**/*",
"plugins/**/*",
"server/**/*",
"typings/**/*",
"webpackShims/*",
"!**/README.md",
"!__tests__",

View file

@ -12,13 +12,13 @@ import {
export enum ReindexStep {
// Enum values are spaced out by 10 to give us room to insert steps in between.
created = 0,
indexConsumersStopped = 10,
indexGroupServicesStopped = 10,
readonly = 20,
newIndexCreated = 30,
reindexStarted = 40,
reindexCompleted = 50,
aliasCreated = 60,
indexConsumersStarted = 70,
indexGroupServicesStarted = 70,
}
export enum ReindexStatus {
@ -38,7 +38,9 @@ export interface ReindexOperation extends SavedObjectAttributes {
reindexTaskId: string | null;
reindexTaskPercComplete: number | null;
errorMessage: string | null;
mlReindexCount: number | null;
// This field is only used for the singleton IndexConsumerType documents.
runningReindexCount: number | null;
}
export type ReindexSavedObject = SavedObject<ReindexOperation>;
@ -50,3 +52,8 @@ export enum ReindexWarning {
// 7.0 -> 8.0 warnings
}
export enum IndexGroup {
ml = '___ML_REINDEX_LOCK___',
watcher = '___WATCHER_REINDEX_LOCK___',
}

View file

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { get } from 'lodash';
import React from 'react';
import { EuiCallOut } from '@elastic/eui';
import { FormattedMessage } from '@kbn/i18n/react';
import { UpgradeAssistantTabProps } from './types';
export const LoadingErrorBanner: React.StatelessComponent<
Pick<UpgradeAssistantTabProps, 'loadingError'>
> = ({ loadingError }) => {
if (get(loadingError, 'response.status') === 403) {
return (
<EuiCallOut
title={
<FormattedMessage
id="xpack.upgradeAssistant.forbiddenErrorCallout.calloutTitle"
defaultMessage="You do not have sufficient privileges to view this page."
/>
}
color="danger"
iconType="cross"
/>
);
}
return (
<EuiCallOut
title={
<FormattedMessage
id="xpack.upgradeAssistant.genericErrorCallout.calloutTitle"
defaultMessage="An error occurred while retrieving the checkup results."
/>
}
color="danger"
iconType="cross"
>
{loadingError ? loadingError.message : null}
</EuiCallOut>
);
};

View file

@ -21,6 +21,7 @@ import { LoadingState, UpgradeAssistantTabProps } from './types';
interface TabsState {
loadingState: LoadingState;
loadingError?: Error;
checkupData?: UpgradeAssistantStatus;
selectedTabIndex: number;
}
@ -77,14 +78,15 @@ export class UpgradeAssistantTabsUI extends React.Component<
checkupData: resp.data,
});
} catch (e) {
this.setState({ loadingState: LoadingState.Error });
this.setState({ loadingState: LoadingState.Error, loadingError: e });
}
};
private get tabs() {
const { intl } = this.props;
const { loadingState, checkupData } = this.state;
const { loadingError, loadingState, checkupData } = this.state;
const commonProps: UpgradeAssistantTabProps = {
loadingError,
loadingState,
refreshCheckupData: this.loadData,
setSelectedTabIndex: this.setSelectedTabIndex,

View file

@ -365,17 +365,8 @@ exports[`CheckupTab render with error 1`] = `
panelPaddingSize="l"
>
<EuiPageContentBody>
<EuiCallOut
color="danger"
iconType="cross"
size="m"
title={
<FormattedMessage
defaultMessage="A network error occurred while retrieving the checkup results."
id="xpack.upgradeAssistant.checkupTab.errorCallout.calloutTitle"
values={Object {}}
/>
}
<Component
loadingError={[Error: something bad!]}
/>
</EuiPageContentBody>
</EuiPageContent>

View file

@ -42,7 +42,12 @@ describe('CheckupTab', () => {
expect(
shallow(
<CheckupTab
{...{ ...defaultProps, deprecations: undefined, loadingState: LoadingState.Error }}
{...{
...defaultProps,
deprecations: undefined,
loadingState: LoadingState.Error,
loadingError: new Error('something bad!'),
}}
/>
)
).toMatchSnapshot();

View file

@ -21,6 +21,7 @@ import {
import { FormattedMessage } from '@kbn/i18n/react';
import { NEXT_MAJOR_VERSION } from '../../../../common/version';
import { LoadingErrorBanner } from '../../error_banner';
import {
GroupByOption,
LevelFilterOption,
@ -63,6 +64,7 @@ export class CheckupTab extends UpgradeAssistantTabComponent<CheckupTabProps, Ch
alertBanner,
checkupLabel,
deprecations,
loadingError,
loadingState,
refreshCheckupData,
setSelectedTabIndex,
@ -134,16 +136,7 @@ export class CheckupTab extends UpgradeAssistantTabComponent<CheckupTabProps, Ch
<EuiPageContent>
<EuiPageContentBody>
{loadingState === LoadingState.Error ? (
<EuiCallOut
title={
<FormattedMessage
id="xpack.upgradeAssistant.checkupTab.errorCallout.calloutTitle"
defaultMessage="A network error occurred while retrieving the checkup results."
/>
}
color="danger"
iconType="cross"
/>
<LoadingErrorBanner loadingError={loadingError} />
) : deprecations && deprecations.length > 0 ? (
<Fragment>
<CheckupControls

View file

@ -29,8 +29,6 @@ exports[`ChecklistFlyout renders 1`] = `
</EuiTitle>
<Component
errorMessage={null}
lastCompletedStep={20}
reindexStatus={0}
reindexTaskPercComplete={null}
/>
</EuiFlyoutBody>
@ -65,14 +63,14 @@ exports[`ChecklistFlyout renders 1`] = `
>
<EuiButton
color="primary"
disabled={true}
disabled={false}
fill={true}
iconSide="left"
isLoading={true}
isLoading={false}
onClick={[MockFunction]}
type="button"
>
Reindexing…
Run reindex
</EuiButton>
</EuiFlexItem>
</EuiFlexGroup>

View file

@ -5,10 +5,12 @@
*/
import { shallow } from 'enzyme';
import { cloneDeep } from 'lodash';
import React from 'react';
import { ReindexStatus, ReindexStep, ReindexWarning } from '../../../../../../../common/types';
import { ReindexStatus, ReindexWarning } from '../../../../../../../common/types';
import { LoadingState } from '../../../../../types';
import { ReindexState } from '../polling_service';
import { ChecklistFlyoutStep } from './checklist_step';
describe('ChecklistFlyout', () => {
@ -20,12 +22,13 @@ describe('ChecklistFlyout', () => {
startReindex: jest.fn(),
reindexState: {
loadingState: LoadingState.Success,
lastCompletedStep: ReindexStep.readonly,
status: ReindexStatus.inProgress,
lastCompletedStep: undefined,
status: undefined,
reindexTaskPercComplete: null,
errorMessage: null,
reindexWarnings: [ReindexWarning.allField],
},
hasRequiredPrivileges: true,
} as ReindexState,
};
it('renders', () => {
@ -33,7 +36,16 @@ describe('ChecklistFlyout', () => {
});
it('disables button while reindexing', () => {
const wrapper = shallow(<ChecklistFlyoutStep {...defaultProps} />);
const props = cloneDeep(defaultProps);
props.reindexState.status = ReindexStatus.inProgress;
const wrapper = shallow(<ChecklistFlyoutStep {...props} />);
expect((wrapper.find('EuiButton').props() as any).isLoading).toBe(true);
});
it('disables button if hasRequiredPrivileges is false', () => {
const props = cloneDeep(defaultProps);
props.reindexState.hasRequiredPrivileges = false;
const wrapper = shallow(<ChecklistFlyoutStep {...props} />);
expect(wrapper.find('EuiButton').props().disabled).toBe(true);
});

View file

@ -51,6 +51,8 @@ export const ChecklistFlyoutStep: React.StatelessComponent<{
reindexTaskPercComplete,
lastCompletedStep,
errorMessage,
hasRequiredPrivileges,
indexGroup,
} = reindexState;
const loading = loadingState === LoadingState.Loading || status === ReindexStatus.inProgress;
@ -71,11 +73,22 @@ export const ChecklistFlyoutStep: React.StatelessComponent<{
will need to return to this page to resume reindexing.
</p>
</EuiCallOut>
{!hasRequiredPrivileges && (
<Fragment>
<EuiSpacer />
<EuiCallOut
title="You do not have sufficient privileges to reindex this index"
color="danger"
iconType="alert"
/>
</Fragment>
)}
<EuiSpacer />
<EuiTitle size="xs">
<h3>Reindexing process</h3>
</EuiTitle>
<ReindexProgress
indexGroup={indexGroup}
lastCompletedStep={lastCompletedStep}
reindexStatus={status}
reindexTaskPercComplete={reindexTaskPercComplete}
@ -96,7 +109,7 @@ export const ChecklistFlyoutStep: React.StatelessComponent<{
iconType={status === ReindexStatus.paused ? 'play' : undefined}
onClick={startReindex}
isLoading={loading}
disabled={loading || status === ReindexStatus.completed}
disabled={loading || status === ReindexStatus.completed || !hasRequiredPrivileges}
>
{buttonLabel(status)}
</EuiButton>

View file

@ -7,7 +7,7 @@
import { shallow } from 'enzyme';
import React from 'react';
import { ReindexStatus, ReindexStep } from '../../../../../../../common/types';
import { IndexGroup, ReindexStatus, ReindexStep } from '../../../../../../../common/types';
import { ReindexProgress } from './progress';
describe('ReindexProgress', () => {
@ -77,4 +77,49 @@ describe('ReindexProgress', () => {
expect(reindexStep.children.type.name).toEqual('EuiProgress');
expect(reindexStep.children.props.value).toEqual(0.25);
});
it('adds steps for index groups', () => {
const wrapper = shallow(
<ReindexProgress
lastCompletedStep={ReindexStep.created}
reindexStatus={ReindexStatus.inProgress}
indexGroup={IndexGroup.ml}
reindexTaskPercComplete={null}
errorMessage={null}
/>
);
expect(wrapper).toMatchInlineSnapshot(`
<Component
steps={
Array [
Object {
"status": "inProgress",
"title": "Pausing Machine Learning jobs",
},
Object {
"status": "incomplete",
"title": "Setting old index to read-only",
},
Object {
"status": "incomplete",
"title": "Creating new index",
},
Object {
"status": "incomplete",
"title": "Reindexing documents",
},
Object {
"status": "incomplete",
"title": "Swapping original index with alias",
},
Object {
"status": "incomplete",
"title": "Resuming Machine Learning jobs",
},
]
}
/>
`);
});
});

View file

@ -8,7 +8,7 @@ import React from 'react';
import { EuiCallOut, EuiProgress, EuiText } from '@elastic/eui';
import { ReindexStatus, ReindexStep } from '../../../../../../../common/types';
import { IndexGroup, ReindexStatus, ReindexStep } from '../../../../../../../common/types';
import { StepProgress, StepProgressStep } from './step_progress';
const ErrorCallout: React.StatelessComponent<{ errorMessage: string | null }> = ({
@ -37,9 +37,16 @@ const orderedSteps = Object.values(ReindexStep).sort() as number[];
export const ReindexProgress: React.StatelessComponent<{
lastCompletedStep?: ReindexStep;
reindexStatus?: ReindexStatus;
indexGroup?: IndexGroup;
reindexTaskPercComplete: number | null;
errorMessage: string | null;
}> = ({ lastCompletedStep = -1, reindexStatus, reindexTaskPercComplete, errorMessage }) => {
}> = ({
lastCompletedStep = -1,
reindexStatus,
indexGroup,
reindexTaskPercComplete,
errorMessage,
}) => {
const stepDetails = (thisStep: ReindexStep): Pick<StepProgressStep, 'status' | 'children'> => {
const previousStep = orderedSteps[orderedSteps.indexOf(thisStep) - 1];
@ -101,23 +108,42 @@ export const ReindexProgress: React.StatelessComponent<{
);
}
return (
<StepProgress
steps={[
{
title: 'Setting old index to read-only',
...stepDetails(ReindexStep.readonly),
},
{
title: 'Creating new index',
...stepDetails(ReindexStep.newIndexCreated),
},
reindexingDocsStep,
{
title: 'Swapping original index with alias',
...stepDetails(ReindexStep.aliasCreated),
},
]}
/>
);
const steps = [
{
title: 'Setting old index to read-only',
...stepDetails(ReindexStep.readonly),
},
{
title: 'Creating new index',
...stepDetails(ReindexStep.newIndexCreated),
},
reindexingDocsStep,
{
title: 'Swapping original index with alias',
...stepDetails(ReindexStep.aliasCreated),
},
];
// If this index is part of an index group, add the approriate group services steps.
if (indexGroup === IndexGroup.ml) {
steps.unshift({
title: 'Pausing Machine Learning jobs',
...stepDetails(ReindexStep.indexGroupServicesStopped),
});
steps.push({
title: 'Resuming Machine Learning jobs',
...stepDetails(ReindexStep.indexGroupServicesStarted),
});
} else if (indexGroup === IndexGroup.watcher) {
steps.unshift({
title: 'Stopping Watcher',
...stepDetails(ReindexStep.indexGroupServicesStopped),
});
steps.push({
title: 'Resuming Watcher',
...stepDetails(ReindexStep.indexGroupServicesStarted),
});
}
return <StepProgress steps={steps} />;
};

View file

@ -8,6 +8,7 @@ import chrome from 'ui/chrome';
import { BehaviorSubject } from 'rxjs';
import {
IndexGroup,
ReindexOperation,
ReindexStatus,
ReindexStep,
@ -35,11 +36,15 @@ export interface ReindexState {
reindexTaskPercComplete: number | null;
errorMessage: string | null;
reindexWarnings?: ReindexWarning[];
hasRequiredPrivileges?: boolean;
indexGroup?: IndexGroup;
}
interface StatusResponse {
warnings?: ReindexWarning[];
reindexOp?: ReindexOperation;
hasRequiredPrivileges?: boolean;
indexGroup?: IndexGroup;
}
/**
@ -110,7 +115,12 @@ export class ReindexPollingService {
}
};
private updateWithResponse = ({ reindexOp, warnings }: StatusResponse) => {
private updateWithResponse = ({
reindexOp,
warnings,
hasRequiredPrivileges,
indexGroup,
}: StatusResponse) => {
// Next value should always include the entire state, not just what changes.
// We make a shallow copy as a starting new state.
const nextValue = {
@ -123,6 +133,14 @@ export class ReindexPollingService {
nextValue.reindexWarnings = warnings;
}
if (hasRequiredPrivileges !== undefined) {
nextValue.hasRequiredPrivileges = hasRequiredPrivileges;
}
if (indexGroup) {
nextValue.indexGroup = indexGroup;
}
if (reindexOp) {
nextValue.lastCompletedStep = reindexOp.lastCompletedStep;
nextValue.status = reindexOp.status;

View file

@ -20,6 +20,7 @@ import {
import { FormattedMessage } from '@kbn/i18n/react';
import { NEXT_MAJOR_VERSION } from '../../../../common/version';
import { LoadingErrorBanner } from '../../error_banner';
import { LoadingState, UpgradeAssistantTabProps } from '../../types';
import { Steps } from './steps';
@ -52,15 +53,19 @@ export const OverviewTab: StatelessComponent<UpgradeAssistantTabProps> = props =
<EuiPageContent>
<EuiPageContentBody>
{props.loadingState === LoadingState.Success ? (
<Steps {...props} />
) : (
{props.loadingState === LoadingState.Success && <Steps {...props} />}
{props.loadingState === LoadingState.Loading && (
<EuiFlexGroup justifyContent="center">
<EuiFlexItem grow={false}>
<EuiLoadingSpinner />
</EuiFlexItem>
</EuiFlexGroup>
)}
{props.loadingState === LoadingState.Error && (
<LoadingErrorBanner loadingError={props.loadingError} />
)}
</EuiPageContentBody>
</EuiPageContent>
</Fragment>

View file

@ -95,9 +95,10 @@ const START_UPGRADE_STEP = {
export const StepsUI: StatelessComponent<
UpgradeAssistantTabProps & ReactIntl.InjectedIntlProps
> = ({ checkupData, setSelectedTabIndex, intl }) => {
const countByType = Object.keys(checkupData!).reduce(
const checkupDataTyped = (checkupData! as unknown) as { [checkupType: string]: any[] };
const countByType = Object.keys(checkupDataTyped).reduce(
(counts, checkupType) => {
counts[checkupType] = checkupData![checkupType].length;
counts[checkupType] = checkupDataTyped[checkupType].length;
return counts;
},
{} as { [checkupType: string]: number }

View file

@ -5,6 +5,7 @@
*/
import React from 'react';
import {
EnrichedDeprecationInfo,
UpgradeAssistantStatus,
@ -15,6 +16,7 @@ export interface UpgradeAssistantTabProps {
checkupData?: UpgradeAssistantStatus;
deprecations?: EnrichedDeprecationInfo[];
refreshCheckupData: () => Promise<void>;
loadingError?: Error;
loadingState: LoadingState;
setSelectedTabIndex: (tabIndex: number) => void;
}

View file

@ -13,6 +13,14 @@
"details": "{.monitoring-logstash=[Coercion of boolean fields], .monitoring-es=[Coercion of boolean fields], .ml-anomalies-=[Coercion of boolean fields], .watch-history-6=[Coercion of boolean fields], .monitoring-kibana=[Coercion of boolean fields], security-index-template=[Coercion of boolean fields]}"
}
],
"ml_settings": [
{
"level": "warning",
"message": "Datafeed [deprecation-datafeed] uses deprecated query options",
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-7.0.html#breaking_70_search_changes",
"details": "[Deprecated field [use_dis_max] used, replaced by [Set [tie_breaker] to 1 instead]]"
}
],
"node_settings": [
{
"level": "critical",
@ -70,6 +78,5 @@
"details": "[[type: tweet, field: liked]]"
}
]
},
"ml_settings": []
}
}

View file

@ -15,6 +15,12 @@ Object {
"message": "one or more templates use deprecated mapping settings",
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/6.0/breaking_60_indices_changes.html",
},
Object {
"details": "[Deprecated field [use_dis_max] used, replaced by [Set [tie_breaker] to 1 instead]]",
"level": "warning",
"message": "Datafeed [deprecation-datafeed] uses deprecated query options",
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-7.0.html#breaking_70_search_changes",
},
Object {
"details": "This node thing is wrong",
"level": "critical",
@ -66,5 +72,6 @@ Object {
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/6.0/breaking_60_mappings_changes.html#_coercion_of_boolean_fields",
},
],
"readyForUpgrade": false,
}
`;

View file

@ -37,4 +37,17 @@ describe('getUpgradeAssistantStatus', () => {
const resp = await getUpgradeAssistantStatus(callWithRequest, {} as any, '/');
expect(resp).toMatchSnapshot();
});
it('returns readyForUpgrade === true when no critical issues found', async () => {
deprecationsResponse = {
cluster_settings: [{ level: 'warning', message: 'Do not count me', url: 'https://...' }],
node_settings: [],
ml_settings: [],
index_settings: {},
};
await expect(
getUpgradeAssistantStatus(callWithRequest, {} as any, '/')
).resolves.toHaveProperty('readyForUpgrade', true);
});
});

View file

@ -7,7 +7,11 @@
import _ from 'lodash';
import { Request } from 'hapi';
import { DeprecationAPIResponse, DeprecationInfo } from 'src/legacy/core_plugins/elasticsearch';
import {
CallClusterWithRequest,
DeprecationAPIResponse,
DeprecationInfo,
} from 'src/legacy/core_plugins/elasticsearch';
export interface EnrichedDeprecationInfo extends DeprecationInfo {
index?: string;
@ -15,31 +19,36 @@ export interface EnrichedDeprecationInfo extends DeprecationInfo {
}
export interface UpgradeAssistantStatus {
readyForUpgrade: boolean;
cluster: EnrichedDeprecationInfo[];
indices: EnrichedDeprecationInfo[];
[checkupType: string]: EnrichedDeprecationInfo[];
}
export async function getUpgradeAssistantStatus(
callWithRequest: any,
callWithRequest: CallClusterWithRequest,
req: Request,
basePath: string
): Promise<UpgradeAssistantStatus> {
const deprecations = (await callWithRequest(req, 'transport.request', {
const deprecations = await callWithRequest(req, 'transport.request', {
path: '/_migration/deprecations',
method: 'GET',
})) as DeprecationAPIResponse;
});
const cluster = deprecations.cluster_settings
.concat(deprecations.ml_settings)
.concat(deprecations.node_settings);
const indices = getCombinedIndexInfos(deprecations, basePath);
const criticalWarnings = cluster.concat(indices).filter(d => d.level === 'critical');
return {
cluster: deprecations.cluster_settings.concat(deprecations.node_settings),
indices: getCombinedIndexInfos(deprecations, basePath),
readyForUpgrade: criticalWarnings.length === 0,
cluster,
indices,
};
}
// Combines the information from the migration assistance api and the deprecation api into a single array.
// Enhances with information about which index the deprecation applies to and adds buttons for accessing the
// reindex UI.
// Reformats the index deprecations to an array of deprecation warnings extended with an index field.
const getCombinedIndexInfos = (deprecations: DeprecationAPIResponse, basePath: string) =>
Object.keys(deprecations.index_settings).reduce(
(indexDeprecations, indexName) => {

View file

@ -8,6 +8,7 @@ import Boom from 'boom';
import moment from 'moment';
import { CallCluster } from 'src/legacy/core_plugins/elasticsearch';
import {
IndexGroup,
REINDEX_OP_TYPE,
ReindexSavedObject,
ReindexStatus,
@ -17,12 +18,7 @@ import {
CURRENT_MAJOR_VERSION,
PREV_MAJOR_VERSION,
} from 'x-pack/plugins/upgrade_assistant/common/version';
import {
LOCK_WINDOW,
ML_LOCK_DOC_ID,
ReindexActions,
reindexActionsFactory,
} from './reindex_actions';
import { LOCK_WINDOW, ReindexActions, reindexActionsFactory } from './reindex_actions';
describe('ReindexActions', () => {
let client: jest.Mocked<any>;
@ -64,7 +60,7 @@ describe('ReindexActions', () => {
reindexTaskId: null,
reindexTaskPercComplete: null,
errorMessage: null,
mlReindexCount: null,
runningReindexCount: null,
});
});
@ -79,7 +75,7 @@ describe('ReindexActions', () => {
reindexTaskId: null,
reindexTaskPercComplete: null,
errorMessage: null,
mlReindexCount: null,
runningReindexCount: null,
});
});
});
@ -273,45 +269,51 @@ describe('ReindexActions', () => {
});
});
describe('runWhileMlLocked', () => {
it('creates the ML doc if it does not exist and executes callback', async () => {
expect.assertions(3);
client.get.mockRejectedValueOnce(Boom.notFound()); // mock no ML doc exists yet
client.create.mockImplementationOnce((type: any, attributes: any, { id }: any) =>
Promise.resolve({
type,
id,
attributes,
})
);
describe('runWhileConsumerLocked', () => {
Object.keys(IndexGroup).forEach(typeKey => {
const consumerType = IndexGroup[typeKey as any] as IndexGroup;
let flip = false;
await actions.runWhileMlLocked(async mlDoc => {
expect(mlDoc.id).toEqual(ML_LOCK_DOC_ID);
expect(mlDoc.attributes.mlReindexCount).toEqual(0);
flip = true;
return mlDoc;
describe(`IndexConsumerType.${typeKey}`, () => {
it('creates the lock doc if it does not exist and executes callback', async () => {
expect.assertions(3);
client.get.mockRejectedValueOnce(Boom.notFound()); // mock no ML doc exists yet
client.create.mockImplementationOnce((type: any, attributes: any, { id }: any) =>
Promise.resolve({
type,
id,
attributes,
})
);
let flip = false;
await actions.runWhileIndexGroupLocked(consumerType, async mlDoc => {
expect(mlDoc.id).toEqual(consumerType);
expect(mlDoc.attributes.runningReindexCount).toEqual(0);
flip = true;
return mlDoc;
});
expect(flip).toEqual(true);
});
it('fails after 10 attempts to lock', async () => {
jest.setTimeout(20000); // increase the timeout
client.get.mockResolvedValue({
type: REINDEX_OP_TYPE,
id: consumerType,
attributes: { mlReindexCount: 0 },
});
client.update.mockRejectedValue(new Error('NO LOCKING!'));
await expect(
actions.runWhileIndexGroupLocked(consumerType, async m => m)
).rejects.toThrow('Could not acquire lock for ML jobs');
expect(client.update).toHaveBeenCalledTimes(10);
// Restore default timeout.
jest.setTimeout(5000);
});
});
expect(flip).toEqual(true);
});
it('fails after 10 attempts to lock', async () => {
jest.setTimeout(20000); // increase the timeout
client.get.mockResolvedValue({
type: REINDEX_OP_TYPE,
id: ML_LOCK_DOC_ID,
attributes: { mlReindexCount: 0 },
});
client.update.mockRejectedValue(new Error('NO LOCKING!'));
await expect(actions.runWhileMlLocked(async m => m)).rejects.toThrow(
'Could not acquire lock for ML jobs'
);
expect(client.update).toHaveBeenCalledTimes(10);
// Restore default timeout.
jest.setTimeout(5000);
});
});
});

View file

@ -16,6 +16,7 @@ import {
PREV_MAJOR_VERSION,
} from 'x-pack/plugins/upgrade_assistant/common/version';
import {
IndexGroup,
REINDEX_OP_TYPE,
ReindexOperation,
ReindexSavedObject,
@ -27,8 +28,6 @@ import { FlatSettings } from './types';
// TODO: base on elasticsearch.requestTimeout?
export const LOCK_WINDOW = moment.duration(90, 'seconds');
export const ML_LOCK_DOC_ID = '___ML_REINDEX_LOCK___';
/**
* A collection of utility functions pulled out out of the ReindexService to make testing simpler.
* This is NOT intended to be used by any other code.
@ -88,31 +87,32 @@ export interface ReindexActions {
*/
getFlatSettings(indexName: string): Promise<FlatSettings | null>;
// ----- Below are only for ML indices
// ----- Functions below are for enforcing locks around groups of indices like ML or Watcher
/**
* Atomically increments the number of reindex operations running for ML jobs.
* Atomically increments the number of reindex operations running for an index group.
*/
incrementMlReindexes(): Promise<void>;
incrementIndexGroupReindexes(group: IndexGroup): Promise<void>;
/**
* Atomically decrements the number of reindex operations running for ML jobs.
* Atomically decrements the number of reindex operations running for an index group.
*/
decrementMlReindexes(): Promise<void>;
decrementIndexGroupReindexes(group: IndexGroup): Promise<void>;
/**
* Runs a callback function while locking the ML count.
* @param func A function to run with the locked ML lock document. Must return a promise that resolves
* Runs a callback function while locking an index group.
* @param func A function to run with the locked index group lock document. Must return a promise that resolves
* to the updated ReindexSavedObject.
*/
runWhileMlLocked(
func: (mlLockDoc: ReindexSavedObject) => Promise<ReindexSavedObject>
runWhileIndexGroupLocked(
group: IndexGroup,
func: (lockDoc: ReindexSavedObject) => Promise<ReindexSavedObject>
): Promise<void>;
/**
* Exposed only for testing, DO NOT USE.
*/
_fetchAndLockMlDoc(): Promise<ReindexSavedObject>;
_fetchAndLockIndexGroupDoc(group: IndexGroup): Promise<ReindexSavedObject>;
}
export const reindexActionsFactory = (
@ -183,7 +183,7 @@ export const reindexActionsFactory = (
reindexTaskId: null,
reindexTaskPercComplete: null,
errorMessage: null,
mlReindexCount: null,
runningReindexCount: null,
});
},
@ -263,10 +263,11 @@ export const reindexActionsFactory = (
return flatSettings[indexName];
},
async _fetchAndLockMlDoc() {
async _fetchAndLockIndexGroupDoc(indexGroup) {
const fetchDoc = async () => {
try {
return await client.get<ReindexOperation>(REINDEX_OP_TYPE, ML_LOCK_DOC_ID);
// The IndexGroup enum value (a string) serves as the ID of the lock doc
return await client.get<ReindexOperation>(REINDEX_OP_TYPE, indexGroup);
} catch (e) {
if (e.isBoom && e.output.statusCode === 404) {
return await client.create<ReindexOperation>(
@ -280,9 +281,9 @@ export const reindexActionsFactory = (
reindexTaskId: null,
reindexTaskPercComplete: null,
errorMessage: null,
mlReindexCount: 0,
runningReindexCount: 0,
} as any,
{ id: ML_LOCK_DOC_ID }
{ id: indexGroup }
);
} else {
throw e;
@ -307,29 +308,29 @@ export const reindexActionsFactory = (
return lockDoc();
},
async incrementMlReindexes() {
this.runWhileMlLocked(mlDoc =>
this.updateReindexOp(mlDoc, {
mlReindexCount: mlDoc.attributes.mlReindexCount! + 1,
async incrementIndexGroupReindexes(indexGroup) {
this.runWhileIndexGroupLocked(indexGroup, lockDoc =>
this.updateReindexOp(lockDoc, {
runningReindexCount: lockDoc.attributes.runningReindexCount! + 1,
})
);
},
async decrementMlReindexes() {
this.runWhileMlLocked(mlDoc =>
this.updateReindexOp(mlDoc, {
mlReindexCount: mlDoc.attributes.mlReindexCount! - 1,
async decrementIndexGroupReindexes(indexGroup) {
this.runWhileIndexGroupLocked(indexGroup, lockDoc =>
this.updateReindexOp(lockDoc, {
runningReindexCount: lockDoc.attributes.runningReindexCount! - 1,
})
);
},
async runWhileMlLocked(func) {
let mlDoc = await this._fetchAndLockMlDoc();
async runWhileIndexGroupLocked(indexGroup, func) {
let lockDoc = await this._fetchAndLockIndexGroupDoc(indexGroup);
try {
mlDoc = await func(mlDoc);
lockDoc = await func(lockDoc);
} finally {
await releaseLock(mlDoc);
await releaseLock(lockDoc);
}
},
};

View file

@ -6,6 +6,7 @@
import { CallCluster } from 'src/legacy/core_plugins/elasticsearch';
import {
IndexGroup,
ReindexOperation,
ReindexSavedObject,
ReindexStatus,
@ -16,6 +17,7 @@ import { ReindexService, reindexServiceFactory } from './reindex_service';
describe('reindexService', () => {
let actions: jest.Mocked<any>;
let callCluster: jest.Mock<CallCluster>;
let xpackInfo: { feature: jest.Mocked<any> };
let service: ReindexService;
const updateMockImpl = (reindexOp: ReindexSavedObject, attrs: Partial<ReindexOperation> = {}) =>
@ -37,12 +39,108 @@ describe('reindexService', () => {
findAllByStatus: jest.fn(unimplemented('findAllInProgressOperations')),
getFlatSettings: jest.fn(unimplemented('getFlatSettings')),
cleanupChanges: jest.fn(),
incrementMlReindexes: jest.fn(unimplemented('incrementMlReindexes')),
decrementMlReindexes: jest.fn(unimplemented('decrementMlReindexes')),
runWhileMlLocked: jest.fn(async (f: any) => f({ attributes: {} })),
incrementIndexGroupReindexes: jest.fn(unimplemented('incrementIndexGroupReindexes')),
decrementIndexGroupReindexes: jest.fn(unimplemented('decrementIndexGroupReindexes')),
runWhileIndexGroupLocked: jest.fn(async (group: string, f: any) => f({ attributes: {} })),
};
callCluster = jest.fn();
service = reindexServiceFactory(callCluster, actions);
xpackInfo = {
feature: jest.fn(() => ({
isAvailable() {
return true;
},
isEnabled() {
return true;
},
})),
};
service = reindexServiceFactory(callCluster, xpackInfo as any, actions);
});
describe('hasRequiredPrivileges', () => {
it('returns true if security is disabled', async () => {
xpackInfo.feature.mockReturnValueOnce({
isAvailable() {
return true;
},
isEnabled() {
return false;
},
});
const hasRequired = await service.hasRequiredPrivileges('anIndex');
expect(hasRequired).toBe(true);
});
it('calls security API with basic requirements', async () => {
callCluster.mockResolvedValueOnce({ has_all_requested: true });
const hasRequired = await service.hasRequiredPrivileges('anIndex');
expect(hasRequired).toBe(true);
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_security/user/_has_privileges',
method: 'POST',
body: {
cluster: ['manage'],
index: [
{
names: [`anIndex*`],
privileges: ['all'],
},
{
names: ['.tasks'],
privileges: ['read', 'delete'],
},
],
},
});
});
it('includes manage_ml for ML indices', async () => {
callCluster.mockResolvedValueOnce({ has_all_requested: true });
await service.hasRequiredPrivileges('.ml-anomalies');
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_security/user/_has_privileges',
method: 'POST',
body: {
cluster: ['manage', 'manage_ml'],
index: [
{
names: [`.ml-anomalies*`],
privileges: ['all'],
},
{
names: ['.tasks'],
privileges: ['read', 'delete'],
},
],
},
});
});
it('includes manage_watcher for watcher indices', async () => {
callCluster.mockResolvedValueOnce({ has_all_requested: true });
await service.hasRequiredPrivileges('.watches');
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_security/user/_has_privileges',
method: 'POST',
body: {
cluster: ['manage', 'manage_watcher'],
index: [
{
names: [`.watches*`],
privileges: ['all'],
},
{
names: ['.tasks'],
privileges: ['read', 'delete'],
},
],
},
});
});
});
describe('detectReindexWarnings', () => {
@ -263,144 +361,201 @@ describe('reindexService', () => {
attributes: { ...defaultAttributes, lastCompletedStep: ReindexStep.created },
} as ReindexSavedObject;
// ML
const mlReindexOp = {
id: '2',
attributes: { ...reindexOp.attributes, indexName: '.ml-anomalies' },
} as ReindexSavedObject;
describe('ml behavior', () => {
const mlReindexOp = {
id: '2',
attributes: { ...reindexOp.attributes, indexName: '.ml-anomalies' },
} as ReindexSavedObject;
it('does nothing if index is not an ML index', async () => {
const updatedOp = await service.processNextStep(reindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.indexConsumersStopped);
expect(actions.incrementMlReindexes).not.toHaveBeenCalled();
expect(actions.runWhileMlLocked).not.toHaveBeenCalled();
expect(callCluster).not.toHaveBeenCalled();
});
it('does nothing if index is not an ML index', async () => {
const updatedOp = await service.processNextStep(reindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(
ReindexStep.indexGroupServicesStopped
);
expect(actions.incrementIndexGroupReindexes).not.toHaveBeenCalled();
expect(actions.runWhileIndexGroupLocked).not.toHaveBeenCalled();
expect(callCluster).not.toHaveBeenCalled();
});
it('increments ML reindexes and calls ML stop endpoint', async () => {
actions.incrementMlReindexes.mockResolvedValueOnce();
actions.runWhileMlLocked.mockImplementationOnce(async (f: any) => f());
callCluster
// Mock call to /_nodes for version check
.mockResolvedValueOnce({ nodes: { nodeX: { version: '6.7.0-alpha' } } })
// Mock call to /_ml/set_upgrade_mode?enabled=true
.mockResolvedValueOnce({ acknowledged: true });
it('increments ML reindexes and calls ML stop endpoint', async () => {
actions.incrementIndexGroupReindexes.mockResolvedValueOnce();
actions.runWhileIndexGroupLocked.mockImplementationOnce(async (group: string, f: any) =>
f()
);
callCluster
// Mock call to /_nodes for version check
.mockResolvedValueOnce({ nodes: { nodeX: { version: '6.7.0-alpha' } } })
// Mock call to /_ml/set_upgrade_mode?enabled=true
.mockResolvedValueOnce({ acknowledged: true });
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.indexConsumersStopped);
expect(actions.incrementMlReindexes).toHaveBeenCalled();
expect(actions.runWhileMlLocked).toHaveBeenCalled();
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=true',
method: 'POST',
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(
ReindexStep.indexGroupServicesStopped
);
expect(actions.incrementIndexGroupReindexes).toHaveBeenCalled();
expect(actions.runWhileIndexGroupLocked).toHaveBeenCalled();
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=true',
method: 'POST',
});
});
it('fails if ML reindexes cannot be incremented', async () => {
actions.incrementIndexGroupReindexes.mockRejectedValueOnce(new Error(`Can't lock!`));
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes(`Can't lock!`)).toBeTruthy();
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=true',
method: 'POST',
});
});
it('fails if ML doc cannot be locked', async () => {
actions.incrementIndexGroupReindexes.mockResolvedValueOnce();
actions.runWhileIndexGroupLocked.mockRejectedValueOnce(new Error(`Can't lock!`));
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes(`Can't lock!`)).toBeTruthy();
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=true',
method: 'POST',
});
});
it('fails if ML endpoint fails', async () => {
actions.incrementIndexGroupReindexes.mockResolvedValueOnce();
actions.runWhileIndexGroupLocked.mockImplementationOnce(async (group: string, f: any) =>
f()
);
callCluster
// Mock call to /_nodes for version check
.mockResolvedValueOnce({ nodes: { nodeX: { version: '6.7.0' } } })
// Mock call to /_ml/set_upgrade_mode?enabled=true
.mockResolvedValueOnce({ acknowledged: false });
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(
updatedOp.attributes.errorMessage!.includes('Could not stop ML jobs')
).toBeTruthy();
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=true',
method: 'POST',
});
});
it('fails if not all nodes have been upgraded to 6.7.0', async () => {
actions.incrementIndexGroupReindexes.mockResolvedValueOnce();
actions.runWhileIndexGroupLocked.mockImplementationOnce(async (group: string, f: any) =>
f()
);
callCluster
// Mock call to /_nodes for version check
.mockResolvedValueOnce({ nodes: { nodeX: { version: '6.6.0' } } });
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(
updatedOp.attributes.errorMessage!.includes('Some nodes are not on minimum version')
).toBeTruthy();
// Should not have called ML endpoint at all
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=true',
method: 'POST',
});
});
});
it('fails if ML reindexes cannot be incremented', async () => {
actions.incrementMlReindexes.mockRejectedValueOnce(new Error(`Can't lock!`));
describe('watcher behavior', () => {
const watcherReindexOp = {
id: '2',
attributes: { ...reindexOp.attributes, indexName: '.watches' },
} as ReindexSavedObject;
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes(`Can't lock!`)).toBeTruthy();
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=true',
method: 'POST',
it('does nothing if index is not a watcher index', async () => {
const updatedOp = await service.processNextStep(reindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(
ReindexStep.indexGroupServicesStopped
);
expect(actions.incrementIndexGroupReindexes).not.toHaveBeenCalled();
expect(actions.runWhileIndexGroupLocked).not.toHaveBeenCalled();
expect(callCluster).not.toHaveBeenCalled();
});
});
it('fails if ML doc cannot be locked', async () => {
actions.incrementMlReindexes.mockResolvedValueOnce();
actions.runWhileMlLocked.mockRejectedValueOnce(new Error(`Can't lock!`));
it('increments ML reindexes and calls watcher stop endpoint', async () => {
actions.incrementIndexGroupReindexes.mockResolvedValueOnce();
actions.runWhileIndexGroupLocked.mockImplementationOnce(async (type: string, f: any) =>
f()
);
callCluster
// Mock call to /_watcher/_stop
.mockResolvedValueOnce({ acknowledged: true });
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes(`Can't lock!`)).toBeTruthy();
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=true',
method: 'POST',
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(
ReindexStep.indexGroupServicesStopped
);
expect(actions.incrementIndexGroupReindexes).toHaveBeenCalledWith(IndexGroup.watcher);
expect(actions.runWhileIndexGroupLocked).toHaveBeenCalled();
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_stop',
method: 'POST',
});
});
});
it('fails if ML endpoint fails', async () => {
actions.incrementMlReindexes.mockResolvedValueOnce();
callCluster
// Mock call to /_nodes for version check
.mockResolvedValueOnce({ nodes: { nodeX: { version: '6.7.0' } } })
// Mock call to /_ml/set_upgrade_mode?enabled=true
.mockResolvedValueOnce({ acknowledged: false });
it('fails if watcher reindexes cannot be incremented', async () => {
actions.incrementIndexGroupReindexes.mockRejectedValueOnce(new Error(`Can't lock!`));
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes('Could not stop ML jobs')).toBeTruthy();
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=true',
method: 'POST',
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes(`Can't lock!`)).toBeTruthy();
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_stop',
method: 'POST',
});
});
});
it('fails if not all nodes have been upgraded to 6.7.0', async () => {
actions.incrementMlReindexes.mockResolvedValueOnce();
callCluster
// Mock call to /_nodes for version check
.mockResolvedValueOnce({ nodes: { nodeX: { version: '6.6.0' } } });
it('fails if watcher doc cannot be locked', async () => {
actions.incrementIndexGroupReindexes.mockResolvedValueOnce();
actions.runWhileIndexGroupLocked.mockRejectedValueOnce(new Error(`Can't lock!`));
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(
updatedOp.attributes.errorMessage!.includes('Some nodes are not on minimum version')
).toBeTruthy();
// Should not have called ML endpoint at all
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=true',
method: 'POST',
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes(`Can't lock!`)).toBeTruthy();
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_stop',
method: 'POST',
});
});
});
// Watcher
const watcherReindexOp = {
id: '2',
attributes: { ...reindexOp.attributes, indexName: '.watches' },
} as ReindexSavedObject;
it('fails if watcher endpoint fails', async () => {
actions.incrementIndexGroupReindexes.mockResolvedValueOnce();
actions.runWhileIndexGroupLocked.mockImplementationOnce(async (type: string, f: any) =>
f()
);
callCluster
// Mock call to /_watcher/_stop
.mockResolvedValueOnce({ acknowledged: false });
it('does nothing if index is not a watcher index', async () => {
const updatedOp = await service.processNextStep(reindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.indexConsumersStopped);
expect(callCluster).not.toHaveBeenCalled();
});
it('calls watcher start endpoint', async () => {
callCluster.mockResolvedValueOnce({ acknowledged: true });
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.indexConsumersStopped);
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_stop',
method: 'POST',
});
});
it('fails if watcher start endpoint fails', async () => {
callCluster.mockResolvedValueOnce({ acknowledged: false });
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_stop',
method: 'POST',
});
});
it('fails if watcher start endpoint throws', async () => {
callCluster.mockRejectedValueOnce(new Error('Whoops!'));
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_stop',
method: 'POST',
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.created);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(
updatedOp.attributes.errorMessage!.includes('Could not stop Watcher')
).toBeTruthy();
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_stop',
method: 'POST',
});
});
});
});
@ -408,7 +563,10 @@ describe('reindexService', () => {
describe('indexConsumersStopped', () => {
const reindexOp = {
id: '1',
attributes: { ...defaultAttributes, lastCompletedStep: ReindexStep.indexConsumersStopped },
attributes: {
...defaultAttributes,
lastCompletedStep: ReindexStep.indexGroupServicesStopped,
},
} as ReindexSavedObject;
it('blocks writes and updates lastCompletedStep', async () => {
@ -424,7 +582,9 @@ describe('reindexService', () => {
it('fails if setting updates are not acknowledged', async () => {
callCluster.mockResolvedValueOnce({ acknowledged: false });
const updatedOp = await service.processNextStep(reindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.indexConsumersStopped);
expect(updatedOp.attributes.lastCompletedStep).toEqual(
ReindexStep.indexGroupServicesStopped
);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage).not.toBeNull();
});
@ -432,7 +592,9 @@ describe('reindexService', () => {
it('fails if setting updates fail', async () => {
callCluster.mockRejectedValueOnce(new Error('blah!'));
const updatedOp = await service.processNextStep(reindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.indexConsumersStopped);
expect(updatedOp.attributes.lastCompletedStep).toEqual(
ReindexStep.indexGroupServicesStopped
);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage).not.toBeNull();
});
@ -654,143 +816,207 @@ describe('reindexService', () => {
attributes: { ...defaultAttributes, lastCompletedStep: ReindexStep.aliasCreated },
} as ReindexSavedObject;
// ML
const mlReindexOp = {
id: '2',
attributes: { ...reindexOp.attributes, indexName: '.ml-anomalies' },
} as ReindexSavedObject;
describe('ml behavior', () => {
const mlReindexOp = {
id: '2',
attributes: { ...reindexOp.attributes, indexName: '.ml-anomalies' },
} as ReindexSavedObject;
it('does nothing if index is not an ML index', async () => {
const updatedOp = await service.processNextStep(reindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.indexConsumersStarted);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.completed);
expect(callCluster).not.toHaveBeenCalled();
});
it('does nothing if index is not an ML index', async () => {
const updatedOp = await service.processNextStep(reindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(
ReindexStep.indexGroupServicesStarted
);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.completed);
expect(callCluster).not.toHaveBeenCalled();
});
it('decrements ML reindexes and calls ML start endpoint if no remaining ML jobs', async () => {
actions.decrementMlReindexes.mockResolvedValue();
actions.runWhileMlLocked.mockImplementationOnce(async (f: any) =>
f({ attributes: { mlReindexCount: 0 } })
);
// Mock call to /_ml/set_upgrade_mode?enabled=false
callCluster.mockResolvedValueOnce({ acknowledged: true });
it('decrements ML reindexes and calls ML start endpoint if no remaining ML jobs', async () => {
actions.decrementIndexGroupReindexes.mockResolvedValue();
actions.runWhileIndexGroupLocked.mockImplementationOnce(async (group: string, f: any) =>
f({ attributes: { runningReindexCount: 0 } })
);
// Mock call to /_ml/set_upgrade_mode?enabled=false
callCluster.mockResolvedValueOnce({ acknowledged: true });
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.indexConsumersStarted);
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=false',
method: 'POST',
const updatedOp = await service.processNextStep(mlReindexOp);
expect(actions.decrementIndexGroupReindexes).toHaveBeenCalledWith(IndexGroup.ml);
expect(updatedOp.attributes.lastCompletedStep).toEqual(
ReindexStep.indexGroupServicesStarted
);
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=false',
method: 'POST',
});
});
it('does not call ML start endpoint if there are remaining ML jobs', async () => {
actions.decrementIndexGroupReindexes.mockResolvedValue();
actions.runWhileIndexGroupLocked.mockImplementationOnce(async (group: string, f: any) =>
f({ attributes: { runningReindexCount: 2 } })
);
// Mock call to /_ml/set_upgrade_mode?enabled=false
callCluster.mockResolvedValueOnce({ acknowledged: true });
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(
ReindexStep.indexGroupServicesStarted
);
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=false',
method: 'POST',
});
});
it('fails if ML reindexes cannot be decremented', async () => {
// Mock unable to lock ml doc
actions.decrementIndexGroupReindexes.mockRejectedValue(new Error(`Can't lock!`));
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.aliasCreated);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes(`Can't lock!`)).toBeTruthy();
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=false',
method: 'POST',
});
});
it('fails if ML doc cannot be locked', async () => {
actions.decrementIndexGroupReindexes.mockResolvedValue();
// Mock unable to lock ml doc
actions.runWhileIndexGroupLocked.mockRejectedValueOnce(new Error(`Can't lock!`));
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.aliasCreated);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes(`Can't lock!`)).toBeTruthy();
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=false',
method: 'POST',
});
});
it('fails if ML endpoint fails', async () => {
actions.decrementIndexGroupReindexes.mockResolvedValue();
actions.runWhileIndexGroupLocked.mockImplementationOnce(async (group: string, f: any) =>
f({ attributes: { runningReindexCount: 0 } })
);
// Mock call to /_ml/set_upgrade_mode?enabled=true
callCluster.mockResolvedValueOnce({ acknowledged: false });
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.aliasCreated);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(
updatedOp.attributes.errorMessage!.includes('Could not resume ML jobs')
).toBeTruthy();
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=false',
method: 'POST',
});
});
});
it('does not call ML start endpoint if there are remaining ML jobs', async () => {
actions.decrementMlReindexes.mockResolvedValue();
actions.runWhileMlLocked.mockImplementationOnce(async (f: any) =>
f({ attributes: { mlReindexCount: 2 } })
);
// Mock call to /_ml/set_upgrade_mode?enabled=false
callCluster.mockResolvedValueOnce({ acknowledged: true });
describe('watcher behavior', () => {
const watcherReindexOp = {
id: '2',
attributes: { ...reindexOp.attributes, indexName: '.watches' },
} as ReindexSavedObject;
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.indexConsumersStarted);
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=false',
method: 'POST',
it('does nothing if index is not a watcher index', async () => {
const updatedOp = await service.processNextStep(reindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(
ReindexStep.indexGroupServicesStarted
);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.completed);
expect(callCluster).not.toHaveBeenCalled();
});
});
it('fails if ML reindexes cannot be decremented', async () => {
// Mock unable to lock ml doc
actions.decrementMlReindexes.mockRejectedValue(new Error(`Can't lock!`));
it('decrements watcher reindexes and calls wathcer start endpoint if no remaining watcher reindexes', async () => {
actions.decrementIndexGroupReindexes.mockResolvedValue();
actions.runWhileIndexGroupLocked.mockImplementationOnce(async (group: string, f: any) =>
f({ attributes: { runningReindexCount: 0 } })
);
// Mock call to /_watcher/_start
callCluster.mockResolvedValueOnce({ acknowledged: true });
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.aliasCreated);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes(`Can't lock!`)).toBeTruthy();
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=false',
method: 'POST',
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(actions.decrementIndexGroupReindexes).toHaveBeenCalledWith(IndexGroup.watcher);
expect(updatedOp.attributes.lastCompletedStep).toEqual(
ReindexStep.indexGroupServicesStarted
);
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_start',
method: 'POST',
});
});
});
it('fails if ML doc cannot be locked', async () => {
actions.decrementMlReindexes.mockResolvedValue();
// Mock unable to lock ml doc
actions.runWhileMlLocked.mockRejectedValueOnce(new Error(`Can't lock!`));
it('does not call wathcer start endpoint if there are remaining wathcer reindexes', async () => {
actions.decrementIndexGroupReindexes.mockResolvedValue();
actions.runWhileIndexGroupLocked.mockImplementationOnce(async (group: string, f: any) =>
f({ attributes: { runningReindexCount: 2 } })
);
// Mock call to /_watcher/_start
callCluster.mockResolvedValueOnce({ acknowledged: true });
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.aliasCreated);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes(`Can't lock!`)).toBeTruthy();
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=false',
method: 'POST',
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(
ReindexStep.indexGroupServicesStarted
);
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_start',
method: 'POST',
});
});
});
it('fails if ML endpoint fails', async () => {
actions.decrementMlReindexes.mockResolvedValue();
actions.runWhileMlLocked.mockImplementationOnce(async (f: any) =>
f({ attributes: { mlReindexCount: 0 } })
);
// Mock call to /_ml/set_upgrade_mode?enabled=true
callCluster.mockResolvedValueOnce({ acknowledged: false });
it('fails if watcher reindexes cannot be decremented', async () => {
// Mock unable to lock watcher doc
actions.decrementIndexGroupReindexes.mockRejectedValue(new Error(`Can't lock!`));
const updatedOp = await service.processNextStep(mlReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.aliasCreated);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(
updatedOp.attributes.errorMessage!.includes('Could not resume ML jobs')
).toBeTruthy();
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=false',
method: 'POST',
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.aliasCreated);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes(`Can't lock!`)).toBeTruthy();
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_start',
method: 'POST',
});
});
});
// Watcher
const watcherReindexOp = {
id: '2',
attributes: { ...reindexOp.attributes, indexName: '.watches' },
} as ReindexSavedObject;
it('fails if watcher doc cannot be locked', async () => {
actions.decrementIndexGroupReindexes.mockResolvedValue();
// Mock unable to lock watcher doc
actions.runWhileIndexGroupLocked.mockRejectedValueOnce(new Error(`Can't lock!`));
it('does nothing if index is not a watcher index', async () => {
const updatedOp = await service.processNextStep(reindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.indexConsumersStarted);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.completed);
expect(callCluster).not.toHaveBeenCalled();
});
it('calls watcher start endpoint', async () => {
callCluster.mockResolvedValueOnce({ acknowledged: true });
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.indexConsumersStarted);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.completed);
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_start',
method: 'POST',
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.aliasCreated);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(updatedOp.attributes.errorMessage!.includes(`Can't lock!`)).toBeTruthy();
expect(callCluster).not.toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_start',
method: 'POST',
});
});
});
it('fails if watcher start endpoint fails', async () => {
callCluster.mockResolvedValueOnce({ acknowledged: false });
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.aliasCreated);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_start',
method: 'POST',
});
});
it('fails if watcher endpoint fails', async () => {
actions.decrementIndexGroupReindexes.mockResolvedValue();
actions.runWhileIndexGroupLocked.mockImplementationOnce(async (group: string, f: any) =>
f({ attributes: { runningReindexCount: 0 } })
);
// Mock call to /_watcher/_start
callCluster.mockResolvedValueOnce({ acknowledged: false });
it('fails if watcher start endpoint throws', async () => {
callCluster.mockRejectedValueOnce(new Error('Whoops!'));
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.aliasCreated);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_start',
method: 'POST',
const updatedOp = await service.processNextStep(watcherReindexOp);
expect(updatedOp.attributes.lastCompletedStep).toEqual(ReindexStep.aliasCreated);
expect(updatedOp.attributes.status).toEqual(ReindexStatus.failed);
expect(
updatedOp.attributes.errorMessage!.includes('Could not start Watcher')
).toBeTruthy();
expect(callCluster).toHaveBeenCalledWith('transport.request', {
path: '/_watcher/_start',
method: 'POST',
});
});
});
});

View file

@ -7,7 +7,9 @@
import Boom from 'boom';
import { CallCluster } from 'src/legacy/core_plugins/elasticsearch';
import { XPackInfo } from 'x-pack/plugins/xpack_main/server/lib/xpack_info';
import {
IndexGroup,
ReindexSavedObject,
ReindexStatus,
ReindexStep,
@ -19,6 +21,12 @@ import { ReindexActions } from './reindex_actions';
const VERSION_REGEX = new RegExp(/^([1-9]+)\.([0-9]+)\.([0-9]+)/);
export interface ReindexService {
/**
* Checks whether or not the user has proper privileges required to reindex this index.
* @param indexName
*/
hasRequiredPrivileges(indexName: string): Promise<boolean>;
/**
* Checks an index's settings and mappings to flag potential issues during reindex.
* Resolves to null if index does not exist.
@ -26,6 +34,12 @@ export interface ReindexService {
*/
detectReindexWarnings(indexName: string): Promise<ReindexWarning[] | null>;
/**
* Returns an IndexGroup if the index belongs to one, otherwise undefined.
* @param indexName
*/
getIndexGroup(indexName: string): IndexGroup | undefined;
/**
* Creates a new reindex operation for a given index.
* @param indexName
@ -67,6 +81,7 @@ export interface ReindexService {
export const reindexServiceFactory = (
callCluster: CallCluster,
xpackInfo: XPackInfo,
actions: ReindexActions
): ReindexService => {
// ------ Utility functions
@ -77,8 +92,8 @@ export const reindexServiceFactory = (
* @param reindexOp
*/
const stopMlJobs = async () => {
await actions.incrementMlReindexes();
await actions.runWhileMlLocked(async mlDoc => {
await actions.incrementIndexGroupReindexes(IndexGroup.ml);
await actions.runWhileIndexGroupLocked(IndexGroup.ml, async mlDoc => {
await validateNodesMinimumVersion(6, 7);
const res = await callCluster('transport.request', {
@ -98,9 +113,9 @@ export const reindexServiceFactory = (
* Resumes ML jobs if there are no more remaining reindex operations.
*/
const resumeMlJobs = async () => {
await actions.decrementMlReindexes();
await actions.runWhileMlLocked(async mlDoc => {
if (mlDoc.attributes.mlReindexCount === 0) {
await actions.decrementIndexGroupReindexes(IndexGroup.ml);
await actions.runWhileIndexGroupLocked(IndexGroup.ml, async mlDoc => {
if (mlDoc.attributes.runningReindexCount === 0) {
const res = await callCluster('transport.request', {
path: '/_ml/set_upgrade_mode?enabled=false',
method: 'POST',
@ -119,28 +134,40 @@ export const reindexServiceFactory = (
* Stops Watcher in Elasticsearch.
*/
const stopWatcher = async () => {
const { acknowledged } = await callCluster('transport.request', {
path: '/_watcher/_stop',
method: 'POST',
});
await actions.incrementIndexGroupReindexes(IndexGroup.watcher);
await actions.runWhileIndexGroupLocked(IndexGroup.watcher, async watcherDoc => {
const { acknowledged } = await callCluster('transport.request', {
path: '/_watcher/_stop',
method: 'POST',
});
if (!acknowledged) {
throw new Error('Could not stop Watcher');
}
if (!acknowledged) {
throw new Error('Could not stop Watcher');
}
return watcherDoc;
});
};
/**
* Starts Watcher in Elasticsearch.
*/
const startWatcher = async () => {
const { acknowledged } = await callCluster('transport.request', {
path: '/_watcher/_start',
method: 'POST',
});
await actions.decrementIndexGroupReindexes(IndexGroup.watcher);
await actions.runWhileIndexGroupLocked(IndexGroup.watcher, async watcherDoc => {
if (watcherDoc.attributes.runningReindexCount === 0) {
const { acknowledged } = await callCluster('transport.request', {
path: '/_watcher/_start',
method: 'POST',
});
if (!acknowledged) {
throw new Error('Could not start Watcher');
}
if (!acknowledged) {
throw new Error('Could not start Watcher');
}
}
return watcherDoc;
});
};
const cleanupChanges = async (reindexOp: ReindexSavedObject) => {
@ -153,8 +180,8 @@ export const reindexServiceFactory = (
}
// Stop consumers if we ever got past this point.
if (reindexOp.attributes.lastCompletedStep >= ReindexStep.indexConsumersStopped) {
await resumeConsumers(reindexOp);
if (reindexOp.attributes.lastCompletedStep >= ReindexStep.indexGroupServicesStopped) {
await resumeIndexGroupServices(reindexOp);
}
};
@ -183,7 +210,7 @@ export const reindexServiceFactory = (
}
};
const stopConsumers = async (reindexOp: ReindexSavedObject) => {
const stopIndexGroupServices = async (reindexOp: ReindexSavedObject) => {
if (isMlIndex(reindexOp.attributes.indexName)) {
await stopMlJobs();
} else if (isWatcherIndex(reindexOp.attributes.indexName)) {
@ -191,7 +218,7 @@ export const reindexServiceFactory = (
}
return actions.updateReindexOp(reindexOp, {
lastCompletedStep: ReindexStep.indexConsumersStopped,
lastCompletedStep: ReindexStep.indexGroupServicesStopped,
});
};
@ -348,7 +375,7 @@ export const reindexServiceFactory = (
});
};
const resumeConsumers = async (reindexOp: ReindexSavedObject) => {
const resumeIndexGroupServices = async (reindexOp: ReindexSavedObject) => {
if (isMlIndex(reindexOp.attributes.indexName)) {
await resumeMlJobs();
} else if (isWatcherIndex(reindexOp.attributes.indexName)) {
@ -358,7 +385,7 @@ export const reindexServiceFactory = (
// Only change the status if we're still in-progress (this function is also called when the reindex fails)
if (reindexOp.attributes.status === ReindexStatus.inProgress) {
return actions.updateReindexOp(reindexOp, {
lastCompletedStep: ReindexStep.indexConsumersStarted,
lastCompletedStep: ReindexStep.indexGroupServicesStarted,
status: ReindexStatus.completed,
});
} else {
@ -369,6 +396,45 @@ export const reindexServiceFactory = (
// ------ The service itself
return {
async hasRequiredPrivileges(indexName: string) {
// If security is disabled or unavailable, return true.
const security = xpackInfo.feature('security');
if (!security.isAvailable() || !security.isEnabled()) {
return true;
}
// Otherwise, query for required privileges for this index.
const body = {
cluster: ['manage'],
index: [
{
names: [`${indexName}*`],
privileges: ['all'],
},
{
names: ['.tasks'],
privileges: ['read', 'delete'],
},
],
} as any;
if (isMlIndex(indexName)) {
body.cluster = [...body.cluster, 'manage_ml'];
}
if (isWatcherIndex(indexName)) {
body.cluster = [...body.cluster, 'manage_watcher'];
}
const resp = await callCluster('transport.request', {
path: '/_security/user/_has_privileges',
method: 'POST',
body,
});
return resp.has_all_requested;
},
async detectReindexWarnings(indexName: string) {
const flatSettings = await actions.getFlatSettings(indexName);
if (!flatSettings) {
@ -378,6 +444,14 @@ export const reindexServiceFactory = (
}
},
getIndexGroup(indexName: string) {
if (isMlIndex(indexName)) {
return IndexGroup.ml;
} else if (isWatcherIndex(indexName)) {
return IndexGroup.watcher;
}
},
async createReindexOperation(indexName: string) {
const indexExists = await callCluster('indices.exists', { index: indexName });
if (!indexExists) {
@ -418,9 +492,9 @@ export const reindexServiceFactory = (
try {
switch (lockedReindexOp.attributes.lastCompletedStep) {
case ReindexStep.created:
lockedReindexOp = await stopConsumers(lockedReindexOp);
lockedReindexOp = await stopIndexGroupServices(lockedReindexOp);
break;
case ReindexStep.indexConsumersStopped:
case ReindexStep.indexGroupServicesStopped:
lockedReindexOp = await setReadonly(lockedReindexOp);
break;
case ReindexStep.readonly:
@ -436,7 +510,7 @@ export const reindexServiceFactory = (
lockedReindexOp = await switchAlias(lockedReindexOp);
break;
case ReindexStep.aliasCreated:
lockedReindexOp = await resumeConsumers(lockedReindexOp);
lockedReindexOp = await resumeIndexGroupServices(lockedReindexOp);
break;
default:
break;
@ -499,4 +573,5 @@ export const reindexServiceFactory = (
const isMlIndex = (indexName: string) =>
indexName.startsWith('.ml-state') || indexName.startsWith('.ml-anomalies');
const isWatcherIndex = (indexName: string) => indexName.startsWith('.watches');
const isWatcherIndex = (indexName: string) =>
indexName.startsWith('.watches') || indexName.startsWith('.triggered-watches');

View file

@ -8,6 +8,7 @@ import { Request, Server } from 'src/server/kbn_server';
import { SavedObjectsClient } from 'src/server/saved_objects';
import moment = require('moment');
import { XPackInfo } from 'x-pack/plugins/xpack_main/server/lib/xpack_info';
import { ReindexSavedObject, ReindexStatus } from '../../../common/types';
import { CredentialStore } from './credential_store';
import { reindexActionsFactory } from './reindex_actions';
@ -46,6 +47,7 @@ export class ReindexWorker {
private credentialStore: CredentialStore,
private callWithRequest: CallClusterWithRequest,
private callWithInternalUser: CallCluster,
private xpackInfo: XPackInfo,
private readonly log: Server['log']
) {
if (ReindexWorker.workerSingleton) {
@ -54,6 +56,7 @@ export class ReindexWorker {
this.reindexService = reindexServiceFactory(
this.callWithInternalUser,
this.xpackInfo,
reindexActionsFactory(this.client, this.callWithInternalUser)
);
@ -158,7 +161,7 @@ export class ReindexWorker {
const fakeRequest = { headers: credential } as Request;
const callCluster = this.callWithRequest.bind(null, fakeRequest) as CallCluster;
const actions = reindexActionsFactory(this.client, callCluster);
const service = reindexServiceFactory(callCluster, actions);
const service = reindexServiceFactory(callCluster, this.xpackInfo, actions);
reindexOp = await swallowExceptions(service.processNextStep, this.log)(reindexOp);
// Update credential store with most recent state.

View file

@ -7,7 +7,9 @@
import { Server } from 'hapi';
const mockReindexService = {
hasRequiredPrivileges: jest.fn(),
detectReindexWarnings: jest.fn(),
getIndexGroup: jest.fn(),
createReindexOperation: jest.fn(),
findAllInProgressOperations: jest.fn(),
findReindexOperation: jest.fn(),
@ -21,26 +23,24 @@ jest.mock('../lib/reindexing', () => {
};
});
import { ReindexSavedObject, ReindexStatus, ReindexWarning } from '../../common/types';
import { IndexGroup, ReindexSavedObject, ReindexStatus, ReindexWarning } from '../../common/types';
import { credentialStoreFactory } from '../lib/reindexing/credential_store';
import { registerReindexIndicesRoutes } from './reindex_indices';
// Need to require to get mock on named export to work.
// tslint:disable:no-var-requires
// const MigrationApis = require('../lib/es_migration_apis');
// MigrationApis.getUpgradeAssistantStatus = jest.fn();
/**
* Since these route callbacks are so thin, these serve simply as integration tests
* to ensure they're wired up to the lib functions correctly. Business logic is tested
* more thoroughly in the es_migration_apis test.
*/
describe('reindex template API', () => {
describe('reindex API', () => {
const server = new Server();
server.plugins = {
elasticsearch: {
getCluster: () => ({ callWithRequest: jest.fn() } as any),
} as any,
xpack_main: {
info: {},
},
} as any;
server.config = () => ({ get: () => '' } as any);
server.decorate('request', 'getSavedObjectsClient', () => jest.fn());
@ -55,7 +55,9 @@ describe('reindex template API', () => {
registerReindexIndicesRoutes(server, worker, credentialStore);
beforeEach(() => {
mockReindexService.hasRequiredPrivileges.mockResolvedValue(true);
mockReindexService.detectReindexWarnings.mockReset();
mockReindexService.getIndexGroup.mockReset();
mockReindexService.createReindexOperation.mockReset();
mockReindexService.findAllInProgressOperations.mockReset();
mockReindexService.findReindexOperation.mockReset();
@ -102,7 +104,23 @@ describe('reindex template API', () => {
expect(resp.statusCode).toEqual(200);
const data = JSON.parse(resp.payload);
expect(data).toEqual({ warnings: null, reindexOp: null });
expect(data.reindexOp).toBeNull();
expect(data.warnings).toBeNull();
});
it('returns the indexGroup for ML indices', async () => {
mockReindexService.findReindexOperation.mockResolvedValueOnce(null);
mockReindexService.detectReindexWarnings.mockResolvedValueOnce([]);
mockReindexService.getIndexGroup.mockReturnValue(IndexGroup.ml);
const resp = await server.inject({
method: 'GET',
url: `/api/upgrade_assistant/reindex/.ml-state`,
});
expect(resp.statusCode).toEqual(200);
const data = JSON.parse(resp.payload);
expect(data.indexGroup).toEqual(IndexGroup.ml);
});
});
@ -178,6 +196,17 @@ describe('reindex template API', () => {
const data = JSON.parse(resp.payload);
expect(data).toEqual({ indexName: 'theIndex', status: ReindexStatus.inProgress });
});
it('returns a 403 if required privileges fails', async () => {
mockReindexService.hasRequiredPrivileges.mockResolvedValueOnce(false);
const resp = await server.inject({
method: 'POST',
url: '/api/upgrade_assistant/reindex/theIndex',
});
expect(resp.statusCode).toEqual(403);
});
});
describe('DELETE /api/upgrade_assistant/reindex/{indexName}', () => {

View file

@ -18,6 +18,7 @@ export function registerReindexWorker(server: Server, credentialStore: Credentia
const { callWithRequest, callWithInternalUser } = server.plugins.elasticsearch.getCluster(
'admin'
);
const xpackInfo = server.plugins.xpack_main.info;
const savedObjectsRepository = server.savedObjects.getSavedObjectsRepository(
callWithInternalUser
);
@ -38,6 +39,7 @@ export function registerReindexWorker(server: Server, credentialStore: Credentia
credentialStore,
callWithRequest,
callWithInternalUser,
xpackInfo,
log
);
@ -56,6 +58,7 @@ export function registerReindexIndicesRoutes(
credentialStore: CredentialStore
) {
const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin');
const xpackInfo = server.plugins.xpack_main.info;
const BASE_PATH = '/api/upgrade_assistant/reindex';
// Start reindex for an index
@ -68,9 +71,13 @@ export function registerReindexIndicesRoutes(
const callCluster = callWithRequest.bind(null, request) as CallCluster;
const reindexActions = reindexActionsFactory(client, callCluster);
const reindexService = reindexServiceFactory(callCluster, reindexActions);
const reindexService = reindexServiceFactory(callCluster, xpackInfo, reindexActions);
try {
if (!(await reindexService.hasRequiredPrivileges(indexName))) {
throw Boom.forbidden(`You do not have adequate privileges to reindex this index.`);
}
const existingOp = await reindexService.findReindexOperation(indexName);
// If the reindexOp already exists and it's paused, resume it. Otherwise create a new one.
@ -105,15 +112,22 @@ export function registerReindexIndicesRoutes(
const { indexName } = request.params;
const callCluster = callWithRequest.bind(null, request) as CallCluster;
const reindexActions = reindexActionsFactory(client, callCluster);
const reindexService = reindexServiceFactory(callCluster, reindexActions);
const reindexService = reindexServiceFactory(callCluster, xpackInfo, reindexActions);
try {
const hasRequiredPrivileges = await reindexService.hasRequiredPrivileges(indexName);
const reindexOp = await reindexService.findReindexOperation(indexName);
const reindexWarnings = await reindexService.detectReindexWarnings(indexName);
// If the user doesn't have privileges than querying for warnings is going to fail.
const warnings = hasRequiredPrivileges
? await reindexService.detectReindexWarnings(indexName)
: [];
const indexGroup = reindexService.getIndexGroup(indexName);
return {
warnings: reindexWarnings,
reindexOp: reindexOp ? reindexOp.attributes : null,
warnings,
indexGroup,
hasRequiredPrivileges,
};
} catch (e) {
if (!e.isBoom) {

View file

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Server } from 'hapi';
import { XPackInfoLicense } from './xpack_info_license';
interface XPackFeature {
isAvailable(): boolean;
isEnabled(): boolean;
registerLicenseCheckResultsGenerator(generator: (xpackInfo: XPackInfo) => void): void;
getLicenseCheckResults(): any;
}
export interface XPackInfoOptions {
clusterSource?: string;
pollFrequencyInMillis: number;
}
export declare class XPackInfo {
public license: XPackInfoLicense;
constructor(server: Server, options: XPackInfoOptions);
public isAvailable(): boolean;
public isXpackUnavailable(): boolean;
public unavailableReason(): string | Error;
public onLicenseInfoChange(handler: () => void): void;
public refreshNow(): Promise<this>;
public feature(name: string): XPackFeature;
public getSignature(): string;
public toJSON(): any;
}

View file

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
type LicenseType = 'oss' | 'basic' | 'trial' | 'standard' | 'basic' | 'gold' | 'platinum';
export declare class XPackInfoLicense {
constructor(getRawLicense: () => any);
public getUid(): string | undefined;
public isActive(): boolean;
public getExpiryDateInMillis(): number | undefined;
public isOneOf(candidateLicenses: string): boolean;
public getType(): LicenseType | undefined;
public getMode(): string | undefined;
public isActiveLicense(typeChecker: (mode: string) => boolean): boolean;
public isBasic(): boolean;
public isNotBasic(): boolean;
}

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { XPackInfo, XPackInfoOptions } from './server/lib/xpack_info';
export interface XPackMainPlugin {
info: XPackInfo;
createXPackInfo(options: XPackInfoOptions): XPackInfo;
}

View file

@ -4,7 +4,8 @@
"common/**/*",
"server/**/*",
"plugins/**/*",
"test_utils/**/*"
"test_utils/**/*",
"typings/**/*"
],
"exclude": [
"test/**/*"

14
x-pack/typings/hapi.d.ts vendored Normal file
View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import 'hapi';
import { XPackMainPlugin } from 'x-pack/plugins/xpack_main/xpack_main';
declare module 'hapi' {
interface PluginProperties {
xpack_main: XPackMainPlugin;
}
}