[Response Ops][Alerting] Reusable functions for FAAD resource installation (#152849)

Resolves https://github.com/elastic/kibana/issues/152490

## Summary

This PR refactors the resource installation methods in `AlertsService`
to be reusable library functions. It exports them from the alerting
plugin and changes the rule registry resource installer to use them as
well.

## To Verify
1. Run this branch with `enableFrameworkAlerts: true`. Verify that we
can create a detection rule in the default space & a different space and
generate a rule preview. The logs should show that the rule registry
creates the resources for the preview indices and for indices in the
non-default space.
2. Verify that when running this branch on rule registry rules from
`main` or a previous version, the rules continue to run successfully.

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2023-03-20 12:40:25 -04:00 committed by GitHub
parent 098456aefa
commit 49a0996a6a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 1829 additions and 866 deletions

View file

@ -58,18 +58,16 @@ const GetAliasResponse = {
};
const IlmPutBody = {
body: {
policy: {
_meta: {
managed: true,
},
phases: {
hot: {
actions: {
rollover: {
max_age: '30d',
max_primary_shard_size: '50gb',
},
policy: {
_meta: {
managed: true,
},
phases: {
hot: {
actions: {
rollover: {
max_age: '30d',
max_primary_shard_size: '50gb',
},
},
},
@ -212,7 +210,6 @@ describe('Alerts Service', () => {
);
expect(clusterClient.ilm.putLifecycle).toHaveBeenCalled();
expect(clusterClient.cluster.putComponentTemplate).not.toHaveBeenCalled();
});
test('should log error and set initialized to false if creating/updating common component template throws error', async () => {
@ -232,7 +229,6 @@ describe('Alerts Service', () => {
);
expect(clusterClient.ilm.putLifecycle).toHaveBeenCalled();
expect(clusterClient.cluster.putComponentTemplate).toHaveBeenCalledTimes(1);
});
test('should update index template field limit and retry initialization if creating/updating common component template fails with field limit error', async () => {

View file

@ -5,20 +5,10 @@
* 2.0.
*/
import {
ClusterPutComponentTemplateRequest,
IndicesSimulateIndexTemplateResponse,
MappingTypeMapping,
} from '@elastic/elasticsearch/lib/api/types';
import { get, isEmpty, isEqual } from 'lodash';
import { isEmpty, isEqual } from 'lodash';
import { Logger, ElasticsearchClient } from '@kbn/core/server';
import { firstValueFrom, Observable } from 'rxjs';
import { Observable } from 'rxjs';
import { alertFieldMap, ecsFieldMap, legacyAlertFieldMap } from '@kbn/alerts-as-data-utils';
import {
IndicesGetIndexTemplateIndexTemplateItem,
Metadata,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { asyncForEach } from '@kbn/std';
import {
DEFAULT_ALERTS_ILM_POLICY_NAME,
DEFAULT_ALERTS_ILM_POLICY,
@ -27,9 +17,7 @@ import {
getComponentTemplate,
getComponentTemplateName,
getIndexTemplateAndPattern,
IIndexPatternString,
} from './resource_installer_utils';
import { retryTransientEsErrors } from './retry_transient_es_errors';
import { IRuleTypeAlerts } from '../types';
import {
createResourceInstallationHelper,
@ -38,9 +26,16 @@ import {
ResourceInstallationHelper,
successResult,
} from './create_resource_installation_helper';
import {
createOrUpdateIlmPolicy,
createOrUpdateComponentTemplate,
getIndexTemplate,
createOrUpdateIndexTemplate,
createConcreteWriteIndex,
installWithTimeout,
} from './lib';
const TOTAL_FIELDS_LIMIT = 2500;
const INSTALLATION_TIMEOUT = 20 * 60 * 1000; // 20 minutes
export const TOTAL_FIELDS_LIMIT = 2500;
const LEGACY_ALERT_CONTEXT = 'legacy-alert';
export const ECS_CONTEXT = `ecs`;
export const ECS_COMPONENT_TEMPLATE_NAME = getComponentTemplateName({ name: ECS_CONTEXT });
@ -52,12 +47,6 @@ interface AlertsServiceParams {
timeoutMs?: number;
}
interface ConcreteIndexInfo {
index: string;
alias: string;
isWriteIndex: boolean;
}
interface IAlertsService {
/**
* Register solution specific resources. If common resource initialization is
@ -150,37 +139,57 @@ export class AlertsService implements IAlertsService {
this.options.logger.debug(`Initializing resources for AlertsService`);
const esClient = await this.options.elasticsearchClientPromise;
// Common initialization installs ILM policy and shared component template
// Common initialization installs ILM policy and shared component templates
const initFns = [
() => this.createOrUpdateIlmPolicy(esClient),
() =>
this.createOrUpdateComponentTemplate(
createOrUpdateIlmPolicy({
logger: this.options.logger,
esClient,
getComponentTemplate({ fieldMap: alertFieldMap, includeSettings: true })
),
name: DEFAULT_ALERTS_ILM_POLICY_NAME,
policy: DEFAULT_ALERTS_ILM_POLICY,
}),
() =>
this.createOrUpdateComponentTemplate(
createOrUpdateComponentTemplate({
logger: this.options.logger,
esClient,
getComponentTemplate({
template: getComponentTemplate({ fieldMap: alertFieldMap, includeSettings: true }),
totalFieldsLimit: TOTAL_FIELDS_LIMIT,
}),
() =>
createOrUpdateComponentTemplate({
logger: this.options.logger,
esClient,
template: getComponentTemplate({
fieldMap: legacyAlertFieldMap,
name: LEGACY_ALERT_CONTEXT,
includeSettings: true,
})
),
}),
totalFieldsLimit: TOTAL_FIELDS_LIMIT,
}),
() =>
this.createOrUpdateComponentTemplate(
createOrUpdateComponentTemplate({
logger: this.options.logger,
esClient,
getComponentTemplate({
template: getComponentTemplate({
fieldMap: ecsFieldMap,
name: ECS_CONTEXT,
includeSettings: true,
})
),
}),
totalFieldsLimit: TOTAL_FIELDS_LIMIT,
}),
];
for (const fn of initFns) {
await this.installWithTimeout(async () => await fn(), timeoutMs);
}
// Install in parallel
await Promise.all(
initFns.map((fn) =>
installWithTimeout({
installFn: async () => await fn(),
pluginStop$: this.options.pluginStop$,
logger: this.options.logger,
timeoutMs,
})
)
);
this.initialized = true;
return successResult();
@ -224,7 +233,13 @@ export class AlertsService implements IAlertsService {
context,
});
initFns.push(
async () => await this.createOrUpdateComponentTemplate(esClient, componentTemplate)
async () =>
await createOrUpdateComponentTemplate({
logger: this.options.logger,
esClient,
template: componentTemplate,
totalFieldsLimit: TOTAL_FIELDS_LIMIT,
})
);
componentTemplateRefs.push(componentTemplate.name);
}
@ -240,433 +255,36 @@ export class AlertsService implements IAlertsService {
// Context specific initialization installs index template and write index
initFns = initFns.concat([
async () =>
await this.createOrUpdateIndexTemplate(
await createOrUpdateIndexTemplate({
logger: this.options.logger,
esClient,
indexTemplateAndPattern,
componentTemplateRefs
),
async () => await this.createConcreteWriteIndex(esClient, indexTemplateAndPattern),
template: getIndexTemplate(
this.options.kibanaVersion,
DEFAULT_ALERTS_ILM_POLICY_NAME,
indexTemplateAndPattern,
componentTemplateRefs,
TOTAL_FIELDS_LIMIT
),
}),
async () =>
await createConcreteWriteIndex({
logger: this.options.logger,
esClient,
totalFieldsLimit: TOTAL_FIELDS_LIMIT,
indexPatterns: indexTemplateAndPattern,
}),
]);
// We want to install these in sequence and not in parallel because
// the concrete index depends on the index template which depends on
// the component template.
for (const fn of initFns) {
await this.installWithTimeout(async () => await fn(), timeoutMs);
}
}
/**
* Creates ILM policy if it doesn't already exist, updates it if it does
*/
private async createOrUpdateIlmPolicy(esClient: ElasticsearchClient) {
this.options.logger.info(`Installing ILM policy ${DEFAULT_ALERTS_ILM_POLICY_NAME}`);
try {
await retryTransientEsErrors(
() =>
esClient.ilm.putLifecycle({
name: DEFAULT_ALERTS_ILM_POLICY_NAME,
body: DEFAULT_ALERTS_ILM_POLICY,
}),
{ logger: this.options.logger }
);
} catch (err) {
this.options.logger.error(
`Error installing ILM policy ${DEFAULT_ALERTS_ILM_POLICY_NAME} - ${err.message}`
);
throw err;
}
}
private async getIndexTemplatesUsingComponentTemplate(
esClient: ElasticsearchClient,
componentTemplateName: string
) {
// Get all index templates and filter down to just the ones referencing this component template
const { index_templates: indexTemplates } = await esClient.indices.getIndexTemplate();
const indexTemplatesUsingComponentTemplate = (indexTemplates ?? []).filter(
(indexTemplate: IndicesGetIndexTemplateIndexTemplateItem) =>
indexTemplate.index_template.composed_of.includes(componentTemplateName)
);
await asyncForEach(
indexTemplatesUsingComponentTemplate,
async (template: IndicesGetIndexTemplateIndexTemplateItem) => {
await esClient.indices.putIndexTemplate({
name: template.name,
body: {
...template.index_template,
template: {
...template.index_template.template,
settings: {
...template.index_template.template?.settings,
'index.mapping.total_fields.limit': TOTAL_FIELDS_LIMIT,
},
},
},
});
}
);
}
private async createOrUpdateComponentTemplateHelper(
esClient: ElasticsearchClient,
template: ClusterPutComponentTemplateRequest
) {
try {
await esClient.cluster.putComponentTemplate(template);
} catch (error) {
const reason = error?.meta?.body?.error?.caused_by?.caused_by?.caused_by?.reason;
if (reason && reason.match(/Limit of total fields \[\d+\] has been exceeded/) != null) {
// This error message occurs when there is an index template using this component template
// that contains a field limit setting that using this component template exceeds
// Specifically, this can happen for the ECS component template when we add new fields
// to adhere to the ECS spec. Individual index templates specify field limits so if the
// number of new ECS fields pushes the composed mapping above the limit, this error will
// occur. We have to update the field limit inside the index template now otherwise we
// can never update the component template
await this.getIndexTemplatesUsingComponentTemplate(esClient, template.name);
// Try to update the component template again
await esClient.cluster.putComponentTemplate(template);
} else {
throw error;
}
}
}
private async createOrUpdateComponentTemplate(
esClient: ElasticsearchClient,
template: ClusterPutComponentTemplateRequest
) {
this.options.logger.info(`Installing component template ${template.name}`);
try {
await retryTransientEsErrors(
() => this.createOrUpdateComponentTemplateHelper(esClient, template),
{
logger: this.options.logger,
}
);
} catch (err) {
this.options.logger.error(
`Error installing component template ${template.name} - ${err.message}`
);
throw err;
}
}
/**
* Installs index template that uses installed component template
* Prior to installation, simulates the installation to check for possible
* conflicts. Simulate should return an empty mapping if a template
* conflicts with an already installed template.
*/
private async createOrUpdateIndexTemplate(
esClient: ElasticsearchClient,
indexPatterns: IIndexPatternString,
componentTemplateNames: string[]
) {
this.options.logger.info(`Installing index template ${indexPatterns.template}`);
const indexMetadata: Metadata = {
kibana: {
version: this.options.kibanaVersion,
},
managed: true,
namespace: 'default', // hard-coded to default here until we start supporting space IDs
};
const indexTemplate = {
name: indexPatterns.template,
body: {
index_patterns: [indexPatterns.pattern],
composed_of: componentTemplateNames,
template: {
settings: {
auto_expand_replicas: '0-1',
hidden: true,
'index.lifecycle': {
name: DEFAULT_ALERTS_ILM_POLICY_NAME,
rollover_alias: indexPatterns.alias,
},
'index.mapping.total_fields.limit': TOTAL_FIELDS_LIMIT,
},
mappings: {
dynamic: false,
_meta: indexMetadata,
},
...(indexPatterns.secondaryAlias
? {
aliases: {
[indexPatterns.secondaryAlias]: {
is_write_index: false,
},
},
}
: {}),
},
_meta: indexMetadata,
// TODO - set priority of this template when we start supporting spaces
},
};
let mappings: MappingTypeMapping = {};
try {
// Simulate the index template to proactively identify any issues with the mapping
const simulateResponse = await esClient.indices.simulateTemplate(indexTemplate);
mappings = simulateResponse.template.mappings;
} catch (err) {
this.options.logger.error(
`Failed to simulate index template mappings for ${indexPatterns.template}; not applying mappings - ${err.message}`
);
return;
}
if (isEmpty(mappings)) {
throw new Error(
`No mappings would be generated for ${indexPatterns.template}, possibly due to failed/misconfigured bootstrapping`
);
}
try {
await retryTransientEsErrors(() => esClient.indices.putIndexTemplate(indexTemplate), {
await installWithTimeout({
installFn: async () => await fn(),
pluginStop$: this.options.pluginStop$,
logger: this.options.logger,
timeoutMs,
});
} catch (err) {
this.options.logger.error(
`Error installing index template ${indexPatterns.template} - ${err.message}`
);
throw err;
}
}
/**
* Updates the underlying mapping for any existing concrete indices
*/
private async updateIndexMappings(
esClient: ElasticsearchClient,
concreteIndices: ConcreteIndexInfo[]
) {
this.options.logger.debug(
`Updating underlying mappings for ${concreteIndices.length} indices.`
);
// Update total field limit setting of found indices
// Other index setting changes are not updated at this time
await Promise.all(
concreteIndices.map((index) => this.updateTotalFieldLimitSetting(esClient, index))
);
// Update mappings of the found indices.
await Promise.all(
concreteIndices.map((index) => this.updateUnderlyingMapping(esClient, index))
);
}
private async updateTotalFieldLimitSetting(
esClient: ElasticsearchClient,
{ index, alias }: ConcreteIndexInfo
) {
try {
await retryTransientEsErrors(
() =>
esClient.indices.putSettings({
index,
body: {
'index.mapping.total_fields.limit': TOTAL_FIELDS_LIMIT,
},
}),
{
logger: this.options.logger,
}
);
return;
} catch (err) {
this.options.logger.error(
`Failed to PUT index.mapping.total_fields.limit settings for alias ${alias}: ${err.message}`
);
throw err;
}
}
private async updateUnderlyingMapping(
esClient: ElasticsearchClient,
{ index, alias }: ConcreteIndexInfo
) {
let simulatedIndexMapping: IndicesSimulateIndexTemplateResponse;
try {
simulatedIndexMapping = await esClient.indices.simulateIndexTemplate({
name: index,
});
} catch (err) {
this.options.logger.error(
`Ignored PUT mappings for alias ${alias}; error generating simulated mappings: ${err.message}`
);
return;
}
const simulatedMapping = get(simulatedIndexMapping, ['template', 'mappings']);
if (simulatedMapping == null) {
this.options.logger.error(
`Ignored PUT mappings for alias ${alias}; simulated mappings were empty`
);
return;
}
try {
await retryTransientEsErrors(
() =>
esClient.indices.putMapping({
index,
body: simulatedMapping,
}),
{
logger: this.options.logger,
}
);
return;
} catch (err) {
this.options.logger.error(`Failed to PUT mapping for alias ${alias}: ${err.message}`);
throw err;
}
}
private async createConcreteWriteIndex(
esClient: ElasticsearchClient,
indexPatterns: IIndexPatternString
) {
this.options.logger.info(`Creating concrete write index - ${indexPatterns.name}`);
// check if a concrete write index already exists
let concreteIndices: ConcreteIndexInfo[] = [];
try {
// Specify both the index pattern for the backing indices and their aliases
// The alias prevents the request from finding other namespaces that could match the -* pattern
const response = await esClient.indices.getAlias({
index: indexPatterns.pattern,
name: indexPatterns.basePattern,
});
concreteIndices = Object.entries(response).flatMap(([index, { aliases }]) =>
Object.entries(aliases).map(([aliasName, aliasProperties]) => ({
index,
alias: aliasName,
isWriteIndex: aliasProperties.is_write_index ?? false,
}))
);
this.options.logger.debug(
`Found ${concreteIndices.length} concrete indices for ${
indexPatterns.name
} - ${JSON.stringify(concreteIndices)}`
);
} catch (error) {
// 404 is expected if no concrete write indices have been created
if (error.statusCode !== 404) {
this.options.logger.error(
`Error fetching concrete indices for ${indexPatterns.pattern} pattern - ${error.message}`
);
throw error;
}
}
let concreteWriteIndicesExist = false;
// if a concrete write index already exists, update the underlying mapping
if (concreteIndices.length > 0) {
await this.updateIndexMappings(esClient, concreteIndices);
const concreteIndicesExist = concreteIndices.some(
(index) => index.alias === indexPatterns.alias
);
concreteWriteIndicesExist = concreteIndices.some(
(index) => index.alias === indexPatterns.alias && index.isWriteIndex
);
// If there are some concrete indices but none of them are the write index, we'll throw an error
// because one of the existing indices should have been the write target.
if (concreteIndicesExist && !concreteWriteIndicesExist) {
throw new Error(
`Indices matching pattern ${indexPatterns.pattern} exist but none are set as the write index for alias ${indexPatterns.alias}`
);
}
}
// check if a concrete write index already exists
if (!concreteWriteIndicesExist) {
try {
await retryTransientEsErrors(
() =>
esClient.indices.create({
index: indexPatterns.name,
body: {
aliases: {
[indexPatterns.alias]: {
is_write_index: true,
},
},
},
}),
{
logger: this.options.logger,
}
);
} catch (error) {
this.options.logger.error(`Error creating concrete write index - ${error.message}`);
// If the index already exists and it's the write index for the alias,
// something else created it so suppress the error. If it's not the write
// index, that's bad, throw an error.
if (error?.meta?.body?.error?.type === 'resource_already_exists_exception') {
const existingIndices = await esClient.indices.get({
index: indexPatterns.name,
});
if (
!existingIndices[indexPatterns.name]?.aliases?.[indexPatterns.alias]?.is_write_index
) {
throw Error(
`Attempted to create index: ${indexPatterns.name} as the write index for alias: ${indexPatterns.alias}, but the index already exists and is not the write index for the alias`
);
}
} else {
throw error;
}
}
}
}
private async installWithTimeout(
installFn: () => Promise<void>,
timeoutMs: number = INSTALLATION_TIMEOUT
): Promise<void> {
try {
let timeoutId: NodeJS.Timeout;
const install = async (): Promise<void> => {
await installFn();
if (timeoutId) {
clearTimeout(timeoutId);
}
};
const throwTimeoutException = (): Promise<void> => {
return new Promise((_, reject) => {
timeoutId = setTimeout(() => {
const msg = `Timeout: it took more than ${timeoutMs}ms`;
reject(new Error(msg));
}, timeoutMs);
firstValueFrom(this.options.pluginStop$).then(() => {
clearTimeout(timeoutId);
reject(new Error('Server is stopping; must stop all async operations'));
});
});
};
await Promise.race([install(), throwTimeoutException()]);
} catch (e) {
this.options.logger.error(e);
const reason = e?.message || 'Unknown reason';
throw new Error(`Failure during installation. ${reason}`);
}
}
}

View file

@ -5,6 +5,8 @@
* 2.0.
*/
import { IlmPolicy } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
/**
* Default alert index ILM policy
* - _meta.managed: notify users this is a managed policy and should be modified
@ -15,18 +17,16 @@
*/
export const DEFAULT_ALERTS_ILM_POLICY_NAME = '.alerts-ilm-policy';
export const DEFAULT_ALERTS_ILM_POLICY = {
policy: {
_meta: {
managed: true,
},
phases: {
hot: {
actions: {
rollover: {
max_age: '30d',
max_primary_shard_size: '50gb',
},
export const DEFAULT_ALERTS_ILM_POLICY: IlmPolicy = {
_meta: {
managed: true,
},
phases: {
hot: {
actions: {
rollover: {
max_age: '30d',
max_primary_shard_size: '50gb',
},
},
},

View file

@ -9,7 +9,7 @@ export {
DEFAULT_ALERTS_ILM_POLICY,
DEFAULT_ALERTS_ILM_POLICY_NAME,
} from './default_lifecycle_policy';
export { ECS_COMPONENT_TEMPLATE_NAME, ECS_CONTEXT } from './alerts_service';
export { ECS_COMPONENT_TEMPLATE_NAME, ECS_CONTEXT, TOTAL_FIELDS_LIMIT } from './alerts_service';
export { getComponentTemplate } from './resource_installer_utils';
export {
type InitializationPromise,
@ -17,3 +17,11 @@ export {
errorResult,
} from './create_resource_installation_helper';
export { AlertsService, type PublicFrameworkAlertsService } from './alerts_service';
export {
createOrUpdateIlmPolicy,
createOrUpdateComponentTemplate,
getIndexTemplate,
createOrUpdateIndexTemplate,
createConcreteWriteIndex,
installWithTimeout,
} from './lib';

View file

@ -0,0 +1,475 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { createConcreteWriteIndex } from './create_concrete_write_index';
const randomDelayMultiplier = 0.01;
const logger = loggingSystemMock.createLogger();
const clusterClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
interface EsError extends Error {
statusCode?: number;
meta?: {
body: {
error: {
type: string;
};
};
};
}
const GetAliasResponse = {
real_index: {
aliases: {
alias_1: {
is_hidden: true,
},
alias_2: {
is_hidden: true,
},
},
},
};
const SimulateTemplateResponse = {
template: {
aliases: {
alias_name_1: {
is_hidden: true,
},
alias_name_2: {
is_hidden: true,
},
},
mappings: { enabled: false },
settings: {},
},
};
const IndexPatterns = {
template: '.alerts-test.alerts-default-index-template',
pattern: '.internal.alerts-test.alerts-default-*',
basePattern: '.alerts-test.alerts-*',
alias: '.alerts-test.alerts-default',
name: '.internal.alerts-test.alerts-default-000001',
};
describe('createConcreteWriteIndex', () => {
beforeEach(() => {
jest.resetAllMocks();
jest.spyOn(global.Math, 'random').mockReturnValue(randomDelayMultiplier);
});
it(`should call esClient to put index template`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => ({}));
await createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
});
expect(clusterClient.indices.create).toHaveBeenCalledWith({
index: '.internal.alerts-test.alerts-default-000001',
body: {
aliases: {
'.alerts-test.alerts-default': {
is_write_index: true,
},
},
},
});
});
it(`should retry on transient ES errors`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => ({}));
clusterClient.indices.create
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockRejectedValueOnce(new EsErrors.TimeoutError('timeout'))
.mockResolvedValue({
index: '.internal.alerts-test.alerts-default-000001',
shards_acknowledged: true,
acknowledged: true,
});
await createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
});
expect(clusterClient.indices.create).toHaveBeenCalledTimes(3);
});
it(`should log and throw error if max retries exceeded`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => ({}));
clusterClient.indices.create.mockRejectedValue(new EsErrors.ConnectionError('foo'));
await expect(() =>
createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"foo"`);
expect(logger.error).toHaveBeenCalledWith(`Error creating concrete write index - foo`);
expect(clusterClient.indices.create).toHaveBeenCalledTimes(4);
});
it(`should log and throw error if ES throws error`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => ({}));
clusterClient.indices.create.mockRejectedValueOnce(new Error('generic error'));
await expect(() =>
createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"generic error"`);
expect(logger.error).toHaveBeenCalledWith(
`Error creating concrete write index - generic error`
);
});
it(`should log and return if ES throws resource_already_exists_exception error and existing index is already write index`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => ({}));
const error = new Error(`fail`) as EsError;
error.meta = {
body: {
error: {
type: 'resource_already_exists_exception',
},
},
};
clusterClient.indices.create.mockRejectedValueOnce(error);
clusterClient.indices.get.mockImplementationOnce(async () => ({
'.internal.alerts-test.alerts-default-000001': {
aliases: { '.alerts-test.alerts-default': { is_write_index: true } },
},
}));
await createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
});
expect(logger.error).toHaveBeenCalledWith(`Error creating concrete write index - fail`);
});
it(`should log and throw error if ES throws resource_already_exists_exception error and existing index is not the write index`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => ({}));
const error = new Error(`fail`) as EsError;
error.meta = {
body: {
error: {
type: 'resource_already_exists_exception',
},
},
};
clusterClient.indices.create.mockRejectedValueOnce(error);
clusterClient.indices.get.mockImplementationOnce(async () => ({
'.internal.alerts-test.alerts-default-000001': {
aliases: { '.alerts-test.alerts-default': { is_write_index: false } },
},
}));
await expect(() =>
createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
})
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Attempted to create index: .internal.alerts-test.alerts-default-000001 as the write index for alias: .alerts-test.alerts-default, but the index already exists and is not the write index for the alias"`
);
expect(logger.error).toHaveBeenCalledWith(`Error creating concrete write index - fail`);
});
it(`should call esClient to put index template if get alias throws 404`, async () => {
const error = new Error(`not found`) as EsError;
error.statusCode = 404;
clusterClient.indices.getAlias.mockRejectedValueOnce(error);
await createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
});
expect(clusterClient.indices.create).toHaveBeenCalledWith({
index: '.internal.alerts-test.alerts-default-000001',
body: {
aliases: {
'.alerts-test.alerts-default': {
is_write_index: true,
},
},
},
});
});
it(`should log and throw error if get alias throws non-404 error`, async () => {
const error = new Error(`fatal error`) as EsError;
error.statusCode = 500;
clusterClient.indices.getAlias.mockRejectedValueOnce(error);
await expect(() =>
createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"fatal error"`);
expect(logger.error).toHaveBeenCalledWith(
`Error fetching concrete indices for .internal.alerts-test.alerts-default-* pattern - fatal error`
);
});
it(`should update underlying settings and mappings of existing concrete indices if they exist`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => GetAliasResponse);
clusterClient.indices.simulateIndexTemplate.mockImplementation(
async () => SimulateTemplateResponse
);
await createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
});
expect(clusterClient.indices.create).toHaveBeenCalledWith({
index: '.internal.alerts-test.alerts-default-000001',
body: {
aliases: {
'.alerts-test.alerts-default': {
is_write_index: true,
},
},
},
});
expect(clusterClient.indices.putSettings).toHaveBeenCalledTimes(2);
expect(clusterClient.indices.putMapping).toHaveBeenCalledTimes(2);
});
it(`should retry settings update on transient ES errors`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => GetAliasResponse);
clusterClient.indices.simulateIndexTemplate.mockImplementation(
async () => SimulateTemplateResponse
);
clusterClient.indices.putSettings
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockRejectedValueOnce(new EsErrors.TimeoutError('timeout'))
.mockResolvedValue({ acknowledged: true });
await createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
});
expect(clusterClient.indices.putSettings).toHaveBeenCalledTimes(4);
});
it(`should log and throw error on settings update if max retries exceeded`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => GetAliasResponse);
clusterClient.indices.simulateIndexTemplate.mockImplementation(
async () => SimulateTemplateResponse
);
clusterClient.indices.putSettings.mockRejectedValue(new EsErrors.ConnectionError('foo'));
await expect(() =>
createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"foo"`);
expect(clusterClient.indices.putSettings).toHaveBeenCalledTimes(7);
expect(logger.error).toHaveBeenCalledWith(
`Failed to PUT index.mapping.total_fields.limit settings for alias alias_1: foo`
);
});
it(`should log and throw error on settings update if ES throws error`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => GetAliasResponse);
clusterClient.indices.simulateIndexTemplate.mockImplementation(
async () => SimulateTemplateResponse
);
clusterClient.indices.putSettings.mockRejectedValue(new Error('generic error'));
await expect(() =>
createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"generic error"`);
expect(logger.error).toHaveBeenCalledWith(
`Failed to PUT index.mapping.total_fields.limit settings for alias alias_1: generic error`
);
});
it(`should retry mappings update on transient ES errors`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => GetAliasResponse);
clusterClient.indices.simulateIndexTemplate.mockImplementation(
async () => SimulateTemplateResponse
);
clusterClient.indices.putMapping
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockRejectedValueOnce(new EsErrors.TimeoutError('timeout'))
.mockResolvedValue({ acknowledged: true });
await createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
});
expect(clusterClient.indices.putMapping).toHaveBeenCalledTimes(4);
});
it(`should log and throw error on mappings update if max retries exceeded`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => GetAliasResponse);
clusterClient.indices.simulateIndexTemplate.mockImplementation(
async () => SimulateTemplateResponse
);
clusterClient.indices.putMapping.mockRejectedValue(new EsErrors.ConnectionError('foo'));
await expect(() =>
createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"foo"`);
expect(clusterClient.indices.putMapping).toHaveBeenCalledTimes(7);
expect(logger.error).toHaveBeenCalledWith(`Failed to PUT mapping for alias alias_1: foo`);
});
it(`should log and throw error on mappings update if ES throws error`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => GetAliasResponse);
clusterClient.indices.simulateIndexTemplate.mockImplementation(
async () => SimulateTemplateResponse
);
clusterClient.indices.putMapping.mockRejectedValue(new Error('generic error'));
await expect(() =>
createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"generic error"`);
expect(logger.error).toHaveBeenCalledWith(
`Failed to PUT mapping for alias alias_1: generic error`
);
});
it(`should log and return when simulating updated mappings throws error`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => GetAliasResponse);
clusterClient.indices.simulateIndexTemplate.mockRejectedValueOnce(new Error('fail'));
await createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
});
expect(logger.error).toHaveBeenCalledWith(
`Ignored PUT mappings for alias alias_1; error generating simulated mappings: fail`
);
expect(clusterClient.indices.create).toHaveBeenCalledWith({
index: '.internal.alerts-test.alerts-default-000001',
body: {
aliases: {
'.alerts-test.alerts-default': {
is_write_index: true,
},
},
},
});
});
it(`should log and return when simulating updated mappings returns null`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => GetAliasResponse);
clusterClient.indices.simulateIndexTemplate.mockImplementationOnce(async () => ({
...SimulateTemplateResponse,
template: { ...SimulateTemplateResponse.template, mappings: null },
}));
await createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
});
expect(logger.error).toHaveBeenCalledWith(
`Ignored PUT mappings for alias alias_1; simulated mappings were empty`
);
expect(clusterClient.indices.create).toHaveBeenCalledWith({
index: '.internal.alerts-test.alerts-default-000001',
body: {
aliases: {
'.alerts-test.alerts-default': {
is_write_index: true,
},
},
},
});
});
it(`should throw error when there are concrete indices but none of them are the write index`, async () => {
clusterClient.indices.getAlias.mockImplementationOnce(async () => ({
'.internal.alerts-test.alerts-default-0001': {
aliases: {
'.alerts-test.alerts-default': {
is_write_index: false,
is_hidden: true,
},
alias_2: {
is_write_index: false,
is_hidden: true,
},
},
},
}));
clusterClient.indices.simulateIndexTemplate.mockImplementation(
async () => SimulateTemplateResponse
);
await expect(() =>
createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
})
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Indices matching pattern .internal.alerts-test.alerts-default-* exist but none are set as the write index for alias .alerts-test.alerts-default"`
);
});
});

View file

@ -0,0 +1,239 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IndicesSimulateIndexTemplateResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { Logger, ElasticsearchClient } from '@kbn/core/server';
import { get } from 'lodash';
import { IIndexPatternString } from '../resource_installer_utils';
import { retryTransientEsErrors } from './retry_transient_es_errors';
interface ConcreteIndexInfo {
index: string;
alias: string;
isWriteIndex: boolean;
}
interface UpdateIndexMappingsOpts {
logger: Logger;
esClient: ElasticsearchClient;
totalFieldsLimit: number;
concreteIndices: ConcreteIndexInfo[];
}
interface UpdateIndexOpts {
logger: Logger;
esClient: ElasticsearchClient;
totalFieldsLimit: number;
concreteIndexInfo: ConcreteIndexInfo;
}
const updateTotalFieldLimitSetting = async ({
logger,
esClient,
totalFieldsLimit,
concreteIndexInfo,
}: UpdateIndexOpts) => {
const { index, alias } = concreteIndexInfo;
try {
await retryTransientEsErrors(
() =>
esClient.indices.putSettings({
index,
body: { 'index.mapping.total_fields.limit': totalFieldsLimit },
}),
{ logger }
);
return;
} catch (err) {
logger.error(
`Failed to PUT index.mapping.total_fields.limit settings for alias ${alias}: ${err.message}`
);
throw err;
}
};
// This will update the mappings of backing indices but *not* the settings. This
// is due to the fact settings can be classed as dynamic and static, and static
// updates will fail on an index that isn't closed. New settings *will* be applied as part
// of the ILM policy rollovers. More info: https://github.com/elastic/kibana/pull/113389#issuecomment-940152654
const updateUnderlyingMapping = async ({
logger,
esClient,
concreteIndexInfo,
}: UpdateIndexOpts) => {
const { index, alias } = concreteIndexInfo;
let simulatedIndexMapping: IndicesSimulateIndexTemplateResponse;
try {
simulatedIndexMapping = await esClient.indices.simulateIndexTemplate({
name: index,
});
} catch (err) {
logger.error(
`Ignored PUT mappings for alias ${alias}; error generating simulated mappings: ${err.message}`
);
return;
}
const simulatedMapping = get(simulatedIndexMapping, ['template', 'mappings']);
if (simulatedMapping == null) {
logger.error(`Ignored PUT mappings for alias ${alias}; simulated mappings were empty`);
return;
}
try {
await retryTransientEsErrors(
() => esClient.indices.putMapping({ index, body: simulatedMapping }),
{ logger }
);
return;
} catch (err) {
logger.error(`Failed to PUT mapping for alias ${alias}: ${err.message}`);
throw err;
}
};
/**
* Updates the underlying mapping for any existing concrete indices
*/
const updateIndexMappings = async ({
logger,
esClient,
totalFieldsLimit,
concreteIndices,
}: UpdateIndexMappingsOpts) => {
logger.debug(`Updating underlying mappings for ${concreteIndices.length} indices.`);
// Update total field limit setting of found indices
// Other index setting changes are not updated at this time
await Promise.all(
concreteIndices.map((index) =>
updateTotalFieldLimitSetting({ logger, esClient, totalFieldsLimit, concreteIndexInfo: index })
)
);
// Update mappings of the found indices.
await Promise.all(
concreteIndices.map((index) =>
updateUnderlyingMapping({ logger, esClient, totalFieldsLimit, concreteIndexInfo: index })
)
);
};
interface CreateConcreteWriteIndexOpts {
logger: Logger;
esClient: ElasticsearchClient;
totalFieldsLimit: number;
indexPatterns: IIndexPatternString;
}
/**
* Installs index template that uses installed component template
* Prior to installation, simulates the installation to check for possible
* conflicts. Simulate should return an empty mapping if a template
* conflicts with an already installed template.
*/
export const createConcreteWriteIndex = async ({
logger,
esClient,
indexPatterns,
totalFieldsLimit,
}: CreateConcreteWriteIndexOpts) => {
logger.info(`Creating concrete write index - ${indexPatterns.name}`);
// check if a concrete write index already exists
let concreteIndices: ConcreteIndexInfo[] = [];
try {
// Specify both the index pattern for the backing indices and their aliases
// The alias prevents the request from finding other namespaces that could match the -* pattern
const response = await esClient.indices.getAlias({
index: indexPatterns.pattern,
name: indexPatterns.basePattern,
});
concreteIndices = Object.entries(response).flatMap(([index, { aliases }]) =>
Object.entries(aliases).map(([aliasName, aliasProperties]) => ({
index,
alias: aliasName,
isWriteIndex: aliasProperties.is_write_index ?? false,
}))
);
logger.debug(
`Found ${concreteIndices.length} concrete indices for ${
indexPatterns.name
} - ${JSON.stringify(concreteIndices)}`
);
} catch (error) {
// 404 is expected if no concrete write indices have been created
if (error.statusCode !== 404) {
logger.error(
`Error fetching concrete indices for ${indexPatterns.pattern} pattern - ${error.message}`
);
throw error;
}
}
let concreteWriteIndicesExist = false;
// if a concrete write index already exists, update the underlying mapping
if (concreteIndices.length > 0) {
await updateIndexMappings({ logger, esClient, totalFieldsLimit, concreteIndices });
const concreteIndicesExist = concreteIndices.some(
(index) => index.alias === indexPatterns.alias
);
concreteWriteIndicesExist = concreteIndices.some(
(index) => index.alias === indexPatterns.alias && index.isWriteIndex
);
// If there are some concrete indices but none of them are the write index, we'll throw an error
// because one of the existing indices should have been the write target.
if (concreteIndicesExist && !concreteWriteIndicesExist) {
throw new Error(
`Indices matching pattern ${indexPatterns.pattern} exist but none are set as the write index for alias ${indexPatterns.alias}`
);
}
}
// check if a concrete write index already exists
if (!concreteWriteIndicesExist) {
try {
await retryTransientEsErrors(
() =>
esClient.indices.create({
index: indexPatterns.name,
body: {
aliases: {
[indexPatterns.alias]: {
is_write_index: true,
},
},
},
}),
{
logger,
}
);
} catch (error) {
logger.error(`Error creating concrete write index - ${error.message}`);
// If the index already exists and it's the write index for the alias,
// something else created it so suppress the error. If it's not the write
// index, that's bad, throw an error.
if (error?.meta?.body?.error?.type === 'resource_already_exists_exception') {
const existingIndices = await esClient.indices.get({
index: indexPatterns.name,
});
if (!existingIndices[indexPatterns.name]?.aliases?.[indexPatterns.alias]?.is_write_index) {
throw Error(
`Attempted to create index: ${indexPatterns.name} as the write index for alias: ${indexPatterns.alias}, but the index already exists and is not the write index for the alias`
);
}
} else {
throw error;
}
}
}
};

View file

@ -0,0 +1,191 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { createOrUpdateComponentTemplate } from './create_or_update_component_template';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
const randomDelayMultiplier = 0.01;
const logger = loggingSystemMock.createLogger();
const clusterClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const ComponentTemplate = {
name: 'test-mappings',
_meta: {
managed: true,
},
template: {
settings: {
number_of_shards: 1,
'index.mapping.total_fields.limit': 1500,
},
mappings: {
dynamic: false,
properties: {
foo: {
ignore_above: 1024,
type: 'keyword',
},
},
},
},
};
describe('createOrUpdateComponentTemplate', () => {
beforeEach(() => {
jest.resetAllMocks();
jest.spyOn(global.Math, 'random').mockReturnValue(randomDelayMultiplier);
});
it(`should call esClient to put component template`, async () => {
await createOrUpdateComponentTemplate({
logger,
esClient: clusterClient,
template: ComponentTemplate,
totalFieldsLimit: 2500,
});
expect(clusterClient.cluster.putComponentTemplate).toHaveBeenCalledWith(ComponentTemplate);
});
it(`should retry on transient ES errors`, async () => {
clusterClient.cluster.putComponentTemplate
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockRejectedValueOnce(new EsErrors.TimeoutError('timeout'))
.mockResolvedValue({ acknowledged: true });
await createOrUpdateComponentTemplate({
logger,
esClient: clusterClient,
template: ComponentTemplate,
totalFieldsLimit: 2500,
});
expect(clusterClient.cluster.putComponentTemplate).toHaveBeenCalledTimes(3);
});
it(`should log and throw error if max retries exceeded`, async () => {
clusterClient.cluster.putComponentTemplate.mockRejectedValue(
new EsErrors.ConnectionError('foo')
);
await expect(() =>
createOrUpdateComponentTemplate({
logger,
esClient: clusterClient,
template: ComponentTemplate,
totalFieldsLimit: 2500,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"foo"`);
expect(logger.error).toHaveBeenCalledWith(
`Error installing component template test-mappings - foo`
);
expect(clusterClient.cluster.putComponentTemplate).toHaveBeenCalledTimes(4);
});
it(`should log and throw error if ES throws error`, async () => {
clusterClient.cluster.putComponentTemplate.mockRejectedValue(new Error('generic error'));
await expect(() =>
createOrUpdateComponentTemplate({
logger,
esClient: clusterClient,
template: ComponentTemplate,
totalFieldsLimit: 2500,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"generic error"`);
expect(logger.error).toHaveBeenCalledWith(
`Error installing component template test-mappings - generic error`
);
});
it(`should update index template field limit and retry if putTemplate throws error with field limit error`, async () => {
clusterClient.cluster.putComponentTemplate.mockRejectedValueOnce(
new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 400,
body: {
error: {
root_cause: [
{
type: 'illegal_argument_exception',
reason:
'updating component template [.alerts-ecs-mappings] results in invalid composable template [.alerts-security.alerts-default-index-template] after templates are merged',
},
],
type: 'illegal_argument_exception',
reason:
'updating component template [.alerts-ecs-mappings] results in invalid composable template [.alerts-security.alerts-default-index-template] after templates are merged',
caused_by: {
type: 'illegal_argument_exception',
reason:
'composable template [.alerts-security.alerts-default-index-template] template after composition with component templates [.alerts-ecs-mappings, .alerts-security.alerts-mappings, .alerts-technical-mappings] is invalid',
caused_by: {
type: 'illegal_argument_exception',
reason:
'invalid composite mappings for [.alerts-security.alerts-default-index-template]',
caused_by: {
type: 'illegal_argument_exception',
reason: 'Limit of total fields [1900] has been exceeded',
},
},
},
},
},
})
)
);
const existingIndexTemplate = {
name: 'test-template',
index_template: {
index_patterns: ['test*'],
composed_of: ['test-mappings'],
template: {
settings: {
auto_expand_replicas: '0-1',
hidden: true,
'index.lifecycle': {
name: '.alerts-ilm-policy',
rollover_alias: `.alerts-empty-default`,
},
'index.mapping.total_fields.limit': 1800,
},
mappings: {
dynamic: false,
},
},
},
};
clusterClient.indices.getIndexTemplate.mockResolvedValueOnce({
index_templates: [existingIndexTemplate],
});
await createOrUpdateComponentTemplate({
logger,
esClient: clusterClient,
template: ComponentTemplate,
totalFieldsLimit: 2500,
});
expect(clusterClient.cluster.putComponentTemplate).toHaveBeenCalledTimes(2);
expect(clusterClient.indices.putIndexTemplate).toHaveBeenCalledTimes(1);
expect(clusterClient.indices.putIndexTemplate).toHaveBeenCalledWith({
name: existingIndexTemplate.name,
body: {
...existingIndexTemplate.index_template,
template: {
...existingIndexTemplate.index_template.template,
settings: {
...existingIndexTemplate.index_template.template?.settings,
'index.mapping.total_fields.limit': 2500,
},
},
},
});
});
});

View file

@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
ClusterPutComponentTemplateRequest,
IndicesGetIndexTemplateIndexTemplateItem,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { Logger, ElasticsearchClient } from '@kbn/core/server';
import { asyncForEach } from '@kbn/std';
import { retryTransientEsErrors } from './retry_transient_es_errors';
interface CreateOrUpdateComponentTemplateOpts {
logger: Logger;
esClient: ElasticsearchClient;
template: ClusterPutComponentTemplateRequest;
totalFieldsLimit: number;
}
const getIndexTemplatesUsingComponentTemplate = async (
esClient: ElasticsearchClient,
componentTemplateName: string,
totalFieldsLimit: number
) => {
// Get all index templates and filter down to just the ones referencing this component template
const { index_templates: indexTemplates } = await esClient.indices.getIndexTemplate();
const indexTemplatesUsingComponentTemplate = (indexTemplates ?? []).filter(
(indexTemplate: IndicesGetIndexTemplateIndexTemplateItem) =>
indexTemplate.index_template.composed_of.includes(componentTemplateName)
);
await asyncForEach(
indexTemplatesUsingComponentTemplate,
async (template: IndicesGetIndexTemplateIndexTemplateItem) => {
await esClient.indices.putIndexTemplate({
name: template.name,
body: {
...template.index_template,
template: {
...template.index_template.template,
settings: {
...template.index_template.template?.settings,
'index.mapping.total_fields.limit': totalFieldsLimit,
},
},
},
});
}
);
};
const createOrUpdateComponentTemplateHelper = async (
esClient: ElasticsearchClient,
template: ClusterPutComponentTemplateRequest,
totalFieldsLimit: number
) => {
try {
await esClient.cluster.putComponentTemplate(template);
} catch (error) {
const reason = error?.meta?.body?.error?.caused_by?.caused_by?.caused_by?.reason;
if (reason && reason.match(/Limit of total fields \[\d+\] has been exceeded/) != null) {
// This error message occurs when there is an index template using this component template
// that contains a field limit setting that using this component template exceeds
// Specifically, this can happen for the ECS component template when we add new fields
// to adhere to the ECS spec. Individual index templates specify field limits so if the
// number of new ECS fields pushes the composed mapping above the limit, this error will
// occur. We have to update the field limit inside the index template now otherwise we
// can never update the component template
await getIndexTemplatesUsingComponentTemplate(esClient, template.name, totalFieldsLimit);
// Try to update the component template again
await esClient.cluster.putComponentTemplate(template);
} else {
throw error;
}
}
};
export const createOrUpdateComponentTemplate = async ({
logger,
esClient,
template,
totalFieldsLimit,
}: CreateOrUpdateComponentTemplateOpts) => {
logger.info(`Installing component template ${template.name}`);
try {
await retryTransientEsErrors(
() => createOrUpdateComponentTemplateHelper(esClient, template, totalFieldsLimit),
{ logger }
);
} catch (err) {
logger.error(`Error installing component template ${template.name} - ${err.message}`);
throw err;
}
};

View file

@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { createOrUpdateIlmPolicy } from './create_or_update_ilm_policy';
const randomDelayMultiplier = 0.01;
const logger = loggingSystemMock.createLogger();
const clusterClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const IlmPolicy = {
_meta: {
managed: true,
},
phases: {
hot: {
actions: {
rollover: {
max_age: '30d',
max_primary_shard_size: '50gb',
},
},
},
},
};
describe('createOrUpdateIlmPolicy', () => {
beforeEach(() => {
jest.resetAllMocks();
jest.spyOn(global.Math, 'random').mockReturnValue(randomDelayMultiplier);
});
it(`should call esClient to put ILM policy`, async () => {
await createOrUpdateIlmPolicy({
logger,
esClient: clusterClient,
name: 'test-policy',
policy: IlmPolicy,
});
expect(clusterClient.ilm.putLifecycle).toHaveBeenCalledWith({
name: 'test-policy',
policy: IlmPolicy,
});
});
it(`should retry on transient ES errors`, async () => {
clusterClient.ilm.putLifecycle
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockRejectedValueOnce(new EsErrors.TimeoutError('timeout'))
.mockResolvedValue({ acknowledged: true });
await createOrUpdateIlmPolicy({
logger,
esClient: clusterClient,
name: 'test-policy',
policy: IlmPolicy,
});
expect(clusterClient.ilm.putLifecycle).toHaveBeenCalledTimes(3);
});
it(`should log and throw error if max retries exceeded`, async () => {
clusterClient.ilm.putLifecycle.mockRejectedValue(new EsErrors.ConnectionError('foo'));
await expect(() =>
createOrUpdateIlmPolicy({
logger,
esClient: clusterClient,
name: 'test-policy',
policy: IlmPolicy,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"foo"`);
expect(logger.error).toHaveBeenCalledWith(`Error installing ILM policy test-policy - foo`);
expect(clusterClient.ilm.putLifecycle).toHaveBeenCalledTimes(4);
});
it(`should log and throw error if ES throws error`, async () => {
clusterClient.ilm.putLifecycle.mockRejectedValue(new Error('generic error'));
await expect(() =>
createOrUpdateIlmPolicy({
logger,
esClient: clusterClient,
name: 'test-policy',
policy: IlmPolicy,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"generic error"`);
expect(logger.error).toHaveBeenCalledWith(
`Error installing ILM policy test-policy - generic error`
);
});
});

View file

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IlmPolicy } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { Logger, ElasticsearchClient } from '@kbn/core/server';
import { retryTransientEsErrors } from './retry_transient_es_errors';
interface CreateOrUpdateIlmPolicyOpts {
logger: Logger;
esClient: ElasticsearchClient;
name: string;
policy: IlmPolicy;
}
/**
* Creates ILM policy if it doesn't already exist, updates it if it does
*/
export const createOrUpdateIlmPolicy = async ({
logger,
esClient,
name,
policy,
}: CreateOrUpdateIlmPolicyOpts) => {
logger.info(`Installing ILM policy ${name}`);
try {
await retryTransientEsErrors(() => esClient.ilm.putLifecycle({ name, policy }), { logger });
} catch (err) {
logger.error(`Error installing ILM policy ${name} - ${err.message}`);
throw err;
}
};

View file

@ -0,0 +1,189 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { getIndexTemplate, createOrUpdateIndexTemplate } from './create_or_update_index_template';
const randomDelayMultiplier = 0.01;
const logger = loggingSystemMock.createLogger();
const clusterClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const IndexTemplate = {
name: '.alerts-test.alerts-default-index-template',
body: {
_meta: {
kibana: {
version: '8.6.1',
},
managed: true,
namespace: 'default',
},
composed_of: ['mappings1', 'framework-mappings'],
index_patterns: ['.internal.alerts-test.alerts-default-*'],
template: {
mappings: {
_meta: {
kibana: {
version: '8.6.1',
},
managed: true,
namespace: 'default',
},
dynamic: false,
},
settings: {
auto_expand_replicas: '0-1',
hidden: true,
'index.lifecycle': {
name: 'test-ilm-policy',
rollover_alias: '.alerts-test.alerts-default',
},
'index.mapping.total_fields.limit': 2500,
},
},
},
};
const SimulateTemplateResponse = {
template: {
aliases: {
alias_name_1: {
is_hidden: true,
},
alias_name_2: {
is_hidden: true,
},
},
mappings: { enabled: false },
settings: {},
},
};
describe('getIndexTemplate', () => {
it(`should create index template with given parameters`, () => {
expect(
getIndexTemplate(
'8.6.1',
'test-ilm-policy',
{
template: '.alerts-test.alerts-default-index-template',
pattern: '.internal.alerts-test.alerts-default-*',
basePattern: '.alerts-test.alerts-*',
alias: '.alerts-test.alerts-default',
name: '.internal.alerts-test.alerts-default-000001',
},
['mappings1', 'framework-mappings'],
2500
)
).toEqual(IndexTemplate);
});
});
describe('createOrUpdateIndexTemplate', () => {
beforeEach(() => {
jest.resetAllMocks();
jest.spyOn(global.Math, 'random').mockReturnValue(randomDelayMultiplier);
});
it(`should call esClient to put index template`, async () => {
clusterClient.indices.simulateTemplate.mockImplementation(async () => SimulateTemplateResponse);
await createOrUpdateIndexTemplate({
logger,
esClient: clusterClient,
template: IndexTemplate,
});
expect(clusterClient.indices.simulateTemplate).toHaveBeenCalledWith(IndexTemplate);
expect(clusterClient.indices.putIndexTemplate).toHaveBeenCalledWith(IndexTemplate);
});
it(`should retry on transient ES errors`, async () => {
clusterClient.indices.simulateTemplate.mockImplementation(async () => SimulateTemplateResponse);
clusterClient.indices.putIndexTemplate
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockRejectedValueOnce(new EsErrors.TimeoutError('timeout'))
.mockResolvedValue({ acknowledged: true });
await createOrUpdateIndexTemplate({
logger,
esClient: clusterClient,
template: IndexTemplate,
});
expect(clusterClient.indices.putIndexTemplate).toHaveBeenCalledTimes(3);
});
it(`should log and throw error if max retries exceeded`, async () => {
clusterClient.indices.simulateTemplate.mockImplementation(async () => SimulateTemplateResponse);
clusterClient.indices.putIndexTemplate.mockRejectedValue(new EsErrors.ConnectionError('foo'));
await expect(() =>
createOrUpdateIndexTemplate({
logger,
esClient: clusterClient,
template: IndexTemplate,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"foo"`);
expect(logger.error).toHaveBeenCalledWith(
`Error installing index template .alerts-test.alerts-default-index-template - foo`
);
expect(clusterClient.indices.putIndexTemplate).toHaveBeenCalledTimes(4);
});
it(`should log and throw error if ES throws error`, async () => {
clusterClient.indices.simulateTemplate.mockImplementation(async () => SimulateTemplateResponse);
clusterClient.indices.putIndexTemplate.mockRejectedValue(new Error('generic error'));
await expect(() =>
createOrUpdateIndexTemplate({
logger,
esClient: clusterClient,
template: IndexTemplate,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"generic error"`);
expect(logger.error).toHaveBeenCalledWith(
`Error installing index template .alerts-test.alerts-default-index-template - generic error`
);
});
it(`should log and return without updating template if simulate throws error`, async () => {
clusterClient.indices.simulateTemplate.mockRejectedValue(new Error('simulate error'));
clusterClient.indices.putIndexTemplate.mockRejectedValue(new Error('generic error'));
await createOrUpdateIndexTemplate({
logger,
esClient: clusterClient,
template: IndexTemplate,
});
expect(logger.error).toHaveBeenCalledWith(
`Failed to simulate index template mappings for .alerts-test.alerts-default-index-template; not applying mappings - simulate error`
);
expect(clusterClient.indices.putIndexTemplate).not.toHaveBeenCalled();
});
it(`should throw error if simulate returns empty mappings`, async () => {
clusterClient.indices.simulateTemplate.mockImplementationOnce(async () => ({
...SimulateTemplateResponse,
template: {
...SimulateTemplateResponse.template,
mappings: {},
},
}));
await expect(() =>
createOrUpdateIndexTemplate({
logger,
esClient: clusterClient,
template: IndexTemplate,
})
).rejects.toThrowErrorMatchingInlineSnapshot(
`"No mappings would be generated for .alerts-test.alerts-default-index-template, possibly due to failed/misconfigured bootstrapping"`
);
expect(clusterClient.indices.putIndexTemplate).not.toHaveBeenCalled();
});
});

View file

@ -0,0 +1,114 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
IndicesPutIndexTemplateRequest,
MappingTypeMapping,
Metadata,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { Logger, ElasticsearchClient } from '@kbn/core/server';
import { isEmpty } from 'lodash';
import { IIndexPatternString } from '../resource_installer_utils';
import { retryTransientEsErrors } from './retry_transient_es_errors';
export const getIndexTemplate = (
kibanaVersion: string,
ilmPolicyName: string,
indexPatterns: IIndexPatternString,
componentTemplateRefs: string[],
totalFieldsLimit: number
): IndicesPutIndexTemplateRequest => {
const indexMetadata: Metadata = {
kibana: {
version: kibanaVersion,
},
managed: true,
namespace: 'default', // hard-coded to default here until we start supporting space IDs
};
return {
name: indexPatterns.template,
body: {
index_patterns: [indexPatterns.pattern],
composed_of: componentTemplateRefs,
template: {
settings: {
auto_expand_replicas: '0-1',
hidden: true,
'index.lifecycle': {
name: ilmPolicyName,
rollover_alias: indexPatterns.alias,
},
'index.mapping.total_fields.limit': totalFieldsLimit,
},
mappings: {
dynamic: false,
_meta: indexMetadata,
},
...(indexPatterns.secondaryAlias
? {
aliases: {
[indexPatterns.secondaryAlias]: {
is_write_index: false,
},
},
}
: {}),
},
_meta: indexMetadata,
// TODO - set priority of this template when we start supporting spaces
},
};
};
interface CreateOrUpdateIndexTemplateOpts {
logger: Logger;
esClient: ElasticsearchClient;
template: IndicesPutIndexTemplateRequest;
}
/**
* Installs index template that uses installed component template
* Prior to installation, simulates the installation to check for possible
* conflicts. Simulate should return an empty mapping if a template
* conflicts with an already installed template.
*/
export const createOrUpdateIndexTemplate = async ({
logger,
esClient,
template,
}: CreateOrUpdateIndexTemplateOpts) => {
logger.info(`Installing index template ${template.name}`);
let mappings: MappingTypeMapping = {};
try {
// Simulate the index template to proactively identify any issues with the mapping
const simulateResponse = await esClient.indices.simulateTemplate(template);
mappings = simulateResponse.template.mappings;
} catch (err) {
logger.error(
`Failed to simulate index template mappings for ${template.name}; not applying mappings - ${err.message}`
);
return;
}
if (isEmpty(mappings)) {
throw new Error(
`No mappings would be generated for ${template.name}, possibly due to failed/misconfigured bootstrapping`
);
}
try {
await retryTransientEsErrors(() => esClient.indices.putIndexTemplate(template), {
logger,
});
} catch (err) {
logger.error(`Error installing index template ${template.name} - ${err.message}`);
throw err;
}
};

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { createOrUpdateIlmPolicy } from './create_or_update_ilm_policy';
export { createOrUpdateComponentTemplate } from './create_or_update_component_template';
export { getIndexTemplate, createOrUpdateIndexTemplate } from './create_or_update_index_template';
export { createConcreteWriteIndex } from './create_concrete_write_index';
export { installWithTimeout } from './install_with_timeout';

View file

@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { loggerMock } from '@kbn/logging-mocks';
import { installWithTimeout } from './install_with_timeout';
import { ReplaySubject, Subject } from 'rxjs';
const logger = loggerMock.create();
describe('installWithTimeout', () => {
let pluginStop$: Subject<void>;
beforeEach(() => {
jest.resetAllMocks();
pluginStop$ = new ReplaySubject(1);
});
it(`should call installFn`, async () => {
await installWithTimeout({
installFn: async () => {
logger.info(`running`);
},
pluginStop$,
logger,
timeoutMs: 10,
});
expect(logger.info).toHaveBeenCalled();
});
it(`should short-circuit installFn if it exceeds configured timeout`, async () => {
await expect(() =>
installWithTimeout({
installFn: async () => {
await new Promise((r) => setTimeout(r, 20));
logger.info(`running`);
},
pluginStop$,
logger,
timeoutMs: 10,
})
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failure during installation. Timeout: it took more than 10ms"`
);
expect(logger.info).not.toHaveBeenCalled();
});
it(`should short-circuit installFn if pluginStop$ signal is received`, async () => {
pluginStop$.next();
await expect(() =>
installWithTimeout({
installFn: async () => {
await new Promise((r) => setTimeout(r, 5));
logger.info(`running`);
},
pluginStop$,
logger,
timeoutMs: 10,
})
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failure during installation. Server is stopping; must stop all async operations"`
);
expect(logger.info).not.toHaveBeenCalled();
});
});

View file

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { firstValueFrom, Observable } from 'rxjs';
import { Logger } from '@kbn/core/server';
const INSTALLATION_TIMEOUT = 20 * 60 * 1000; // 20 minutes
interface InstallWithTimeoutOpts {
description?: string;
installFn: () => Promise<void>;
pluginStop$: Observable<void>;
logger: Logger;
timeoutMs?: number;
}
export const installWithTimeout = async ({
description,
installFn,
pluginStop$,
logger,
timeoutMs = INSTALLATION_TIMEOUT,
}: InstallWithTimeoutOpts): Promise<void> => {
try {
let timeoutId: NodeJS.Timeout;
const install = async (): Promise<void> => {
await installFn();
if (timeoutId) {
clearTimeout(timeoutId);
}
};
const throwTimeoutException = (): Promise<void> => {
return new Promise((_, reject) => {
timeoutId = setTimeout(() => {
const msg = `Timeout: it took more than ${timeoutMs}ms`;
reject(new Error(msg));
}, timeoutMs);
firstValueFrom(pluginStop$).then(() => {
clearTimeout(timeoutId);
reject(new Error('Server is stopping; must stop all async operations'));
});
});
};
await Promise.race([install(), throwTimeoutException()]);
} catch (e) {
logger.error(e);
const reason = e?.message || 'Unknown reason';
throw new Error(
`Failure during installation${description ? ` of ${description}` : ''}. ${reason}`
);
}
};

View file

@ -59,8 +59,15 @@ export {
DEFAULT_ALERTS_ILM_POLICY_NAME,
ECS_COMPONENT_TEMPLATE_NAME,
ECS_CONTEXT,
TOTAL_FIELDS_LIMIT,
getComponentTemplate,
type PublicFrameworkAlertsService,
createOrUpdateIlmPolicy,
createOrUpdateComponentTemplate,
getIndexTemplate,
createOrUpdateIndexTemplate,
createConcreteWriteIndex,
installWithTimeout,
} from './alerts_service';
export const plugin = (initContext: PluginInitializerContext) => new AlertingPlugin(initContext);

View file

@ -5,17 +5,22 @@
* 2.0.
*/
import { firstValueFrom, type Observable } from 'rxjs';
import { get, isEmpty } from 'lodash';
import { type Observable } from 'rxjs';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { PublicMethodsOf } from '@kbn/utility-types';
import {
createConcreteWriteIndex,
createOrUpdateComponentTemplate,
createOrUpdateIlmPolicy,
createOrUpdateIndexTemplate,
DEFAULT_ALERTS_ILM_POLICY,
DEFAULT_ALERTS_ILM_POLICY_NAME,
ECS_COMPONENT_TEMPLATE_NAME,
installWithTimeout,
TOTAL_FIELDS_LIMIT,
type PublicFrameworkAlertsService,
} from '@kbn/alerting-plugin/server';
import { TECHNICAL_COMPONENT_TEMPLATE_NAME } from '../../common/assets';
@ -24,8 +29,6 @@ import { ecsComponentTemplate } from '../../common/assets/component_templates/ec
import type { IndexInfo } from './index_info';
const INSTALLATION_TIMEOUT = 20 * 60 * 1000; // 20 minutes
const TOTAL_FIELDS_LIMIT = 2500;
interface ConstructorOptions {
getResourceName(relativeName: string): string;
getClusterClient: () => Promise<ElasticsearchClient>;
@ -40,52 +43,6 @@ export type IResourceInstaller = PublicMethodsOf<ResourceInstaller>;
export class ResourceInstaller {
constructor(private readonly options: ConstructorOptions) {}
private async installWithTimeout(
resources: string,
installer: () => Promise<void>
): Promise<void> {
try {
let timeoutId: NodeJS.Timeout;
const installResources = async (): Promise<void> => {
const { logger, isWriteEnabled } = this.options;
if (!isWriteEnabled) {
logger.info(`Write is disabled; not installing ${resources}`);
return;
}
logger.info(`Installing ${resources}`);
await installer();
logger.info(`Installed ${resources}`);
if (timeoutId) {
clearTimeout(timeoutId);
}
};
const throwTimeoutException = (): Promise<void> => {
return new Promise((resolve, reject) => {
timeoutId = setTimeout(() => {
const msg = `Timeout: it took more than ${INSTALLATION_TIMEOUT}ms`;
reject(new Error(msg));
}, INSTALLATION_TIMEOUT);
firstValueFrom(this.options.pluginStop$).then(() => {
clearTimeout(timeoutId);
const msg = 'Server is stopping; must stop all async operations';
reject(new Error(msg));
});
});
};
await Promise.race([installResources(), throwTimeoutException()]);
} catch (e) {
this.options.logger.error(e);
const reason = e?.message || 'Unknown reason';
throw new Error(`Failure installing ${resources}. ${reason}`);
}
}
// -----------------------------------------------------------------------------------------------
// Common resources
@ -96,37 +53,62 @@ export class ResourceInstaller {
* - component template containing all standard ECS fields
*/
public async installCommonResources(): Promise<void> {
await this.installWithTimeout('common resources shared between all indices', async () => {
const { logger, frameworkAlerts } = this.options;
const resourceDescription = 'common resources shared between all indices';
const { logger, isWriteEnabled } = this.options;
if (!isWriteEnabled) {
logger.info(`Write is disabled; not installing ${resourceDescription}`);
return;
}
try {
// We can install them in parallel
await Promise.all([
// Install ILM policy and ECS component template only if framework alerts are not enabled
// If framework alerts are enabled, the alerting framework will install these
...(frameworkAlerts.enabled()
? []
: [
this.createOrUpdateLifecyclePolicy({
name: DEFAULT_ALERTS_ILM_POLICY_NAME,
body: DEFAULT_ALERTS_ILM_POLICY,
}),
this.createOrUpdateComponentTemplate({
name: ECS_COMPONENT_TEMPLATE_NAME,
body: ecsComponentTemplate,
}),
]),
this.createOrUpdateComponentTemplate({
name: TECHNICAL_COMPONENT_TEMPLATE_NAME,
body: technicalComponentTemplate,
}),
]);
} catch (err) {
logger.error(
`Error installing common resources in RuleRegistry ResourceInstaller - ${err.message}`
);
throw err;
}
await installWithTimeout({
description: resourceDescription,
logger,
pluginStop$: this.options.pluginStop$,
installFn: async () => {
const { getClusterClient, frameworkAlerts } = this.options;
const clusterClient = await getClusterClient();
try {
// We can install them in parallel
await Promise.all([
// Install ILM policy and ECS component template only if framework alerts are not enabled
// If framework alerts are enabled, the alerting framework will install these
...(frameworkAlerts.enabled()
? []
: [
createOrUpdateIlmPolicy({
logger,
esClient: clusterClient,
name: DEFAULT_ALERTS_ILM_POLICY_NAME,
policy: DEFAULT_ALERTS_ILM_POLICY,
}),
createOrUpdateComponentTemplate({
logger,
esClient: clusterClient,
template: {
name: ECS_COMPONENT_TEMPLATE_NAME,
body: ecsComponentTemplate,
},
totalFieldsLimit: TOTAL_FIELDS_LIMIT,
}),
]),
createOrUpdateComponentTemplate({
logger,
esClient: clusterClient,
template: {
name: TECHNICAL_COMPONENT_TEMPLATE_NAME,
body: technicalComponentTemplate,
},
totalFieldsLimit: TOTAL_FIELDS_LIMIT,
}),
]);
} catch (err) {
logger.error(
`Error installing common resources in RuleRegistry ResourceInstaller - ${err.message}`
);
throw err;
}
},
});
}
@ -139,110 +121,62 @@ export class ResourceInstaller {
* - component templates
*/
public async installIndexLevelResources(indexInfo: IndexInfo): Promise<void> {
await this.installWithTimeout(`resources for index ${indexInfo.baseName}`, async () => {
const { frameworkAlerts } = this.options;
const { componentTemplates, ilmPolicy, additionalPrefix } = indexInfo.indexOptions;
if (ilmPolicy != null) {
await this.createOrUpdateLifecyclePolicy({
name: indexInfo.getIlmPolicyName(),
body: { policy: ilmPolicy },
});
}
const resourceDescription = `resources for index ${indexInfo.baseName}`;
const { logger, isWriteEnabled } = this.options;
if (!isWriteEnabled) {
logger.info(`Write is disabled; not installing ${resourceDescription}`);
return;
}
if (!frameworkAlerts.enabled() || additionalPrefix) {
await Promise.all(
componentTemplates.map(async (ct) => {
await this.createOrUpdateComponentTemplate({
name: indexInfo.getComponentTemplateName(ct.name),
body: {
await installWithTimeout({
description: resourceDescription,
logger,
pluginStop$: this.options.pluginStop$,
installFn: async () => {
const { frameworkAlerts, getClusterClient } = this.options;
const { componentTemplates, ilmPolicy, additionalPrefix } = indexInfo.indexOptions;
const clusterClient = await getClusterClient();
// Rule registry allows for installation of custom ILM policy, which is only
// used by security preview indices. We will continue to let rule registry
// handle this installation.
if (ilmPolicy != null) {
await createOrUpdateIlmPolicy({
logger,
esClient: clusterClient,
name: indexInfo.getIlmPolicyName(),
policy: ilmPolicy,
});
}
// Rule registry allows for installation of resources with an additional prefix,
// which is only used by security preview indices. We will continue to let rule registry
// handle installation of these resources.
if (!frameworkAlerts.enabled() || additionalPrefix) {
await Promise.all(
componentTemplates.map(async (ct) => {
await createOrUpdateComponentTemplate({
logger,
esClient: clusterClient,
template: {
settings: ct.settings ?? {},
mappings: ct.mappings,
name: indexInfo.getComponentTemplateName(ct.name),
body: {
template: {
settings: ct.settings ?? {},
mappings: ct.mappings,
},
_meta: ct._meta,
},
},
_meta: ct._meta,
},
});
})
);
}
totalFieldsLimit: TOTAL_FIELDS_LIMIT,
});
})
);
}
},
});
}
private async updateIndexMappings(indexInfo: IndexInfo, namespace: string) {
const { logger } = this.options;
const aliases = indexInfo.basePattern;
const backingIndices = indexInfo.getPatternForBackingIndices(namespace);
logger.debug(`Updating mappings of existing concrete indices for ${indexInfo.baseName}`);
// Find all concrete indices for all namespaces of the index.
const concreteIndices = await this.fetchConcreteIndices(aliases, backingIndices);
// Update total field limit setting of found indices
await Promise.all(concreteIndices.map((item) => this.updateTotalFieldLimitSetting(item)));
// Update mappings of the found indices.
await Promise.all(concreteIndices.map((item) => this.updateAliasWriteIndexMapping(item)));
}
private async updateTotalFieldLimitSetting({ index, alias }: ConcreteIndexInfo) {
const { logger, getClusterClient } = this.options;
const clusterClient = await getClusterClient();
try {
await clusterClient.indices.putSettings({
index,
body: {
'index.mapping.total_fields.limit': TOTAL_FIELDS_LIMIT,
},
});
return;
} catch (err) {
logger.error(
`Failed to PUT index.mapping.total_fields.limit settings for alias ${alias}: ${err.message}`
);
throw err;
}
}
// NOTE / IMPORTANT: Please note this will update the mappings of backing indices but
// *not* the settings. This is due to the fact settings can be classed as dynamic and static,
// and static updates will fail on an index that isn't closed. New settings *will* be applied as part
// of the ILM policy rollovers. More info: https://github.com/elastic/kibana/pull/113389#issuecomment-940152654
private async updateAliasWriteIndexMapping({ index, alias }: ConcreteIndexInfo) {
const { logger, getClusterClient } = this.options;
const clusterClient = await getClusterClient();
let simulatedIndexMapping: estypes.IndicesSimulateIndexTemplateResponse;
try {
simulatedIndexMapping = await clusterClient.indices.simulateIndexTemplate({
name: index,
});
} catch (err) {
logger.error(
`Ignored PUT mappings for alias ${alias}; error generating simulated mappings: ${err.message}`
);
return;
}
const simulatedMapping = get(simulatedIndexMapping, ['template', 'mappings']);
if (simulatedMapping == null) {
logger.error(`Ignored PUT mappings for alias ${alias}; simulated mappings were empty`);
return;
}
try {
await clusterClient.indices.putMapping({
index,
body: simulatedMapping,
});
return;
} catch (err) {
logger.error(`Failed to PUT mapping for alias ${alias}: ${err.message}`);
throw err;
}
}
// -----------------------------------------------------------------------------------------------
// Namespace-level resources
@ -256,7 +190,8 @@ export class ResourceInstaller {
indexInfo: IndexInfo,
namespace: string
): Promise<void> {
const { logger, frameworkAlerts } = this.options;
const { logger, frameworkAlerts, getClusterClient } = this.options;
const clusterClient = await getClusterClient();
const alias = indexInfo.getPrimaryAlias(namespace);
@ -281,60 +216,28 @@ export class ResourceInstaller {
logger.info(`Installing namespace-level resources and creating concrete index for ${alias}`);
// Install / update the index template
await this.installNamespacedIndexTemplate(indexInfo, namespace);
// Update index mappings for indices matching this namespace.
await this.updateIndexMappings(indexInfo, namespace);
await createOrUpdateIndexTemplate({
logger: this.options.logger,
esClient: clusterClient,
// TODO - switch to using getIndexTemplate from alerting once that supports spaces
template: this.getIndexTemplate(indexInfo, namespace),
});
// If we find a concrete backing index which is the write index for the alias here, we shouldn't
// be making a new concrete index. We return early because we don't need a new write target.
const indexExists = await this.checkIfConcreteWriteIndexExists(indexInfo, namespace);
if (indexExists) {
return;
} else {
await this.createConcreteWriteIndex(indexInfo, namespace);
}
await createConcreteWriteIndex({
logger: this.options.logger,
esClient: clusterClient,
totalFieldsLimit: TOTAL_FIELDS_LIMIT,
indexPatterns: {
basePattern: indexInfo.basePattern,
pattern: indexInfo.getPatternForBackingIndices(namespace),
alias: indexInfo.getPrimaryAlias(namespace),
name: indexInfo.getConcreteIndexInitialName(namespace),
template: indexInfo.getIndexTemplateName(namespace),
},
});
}
private async checkIfConcreteWriteIndexExists(
indexInfo: IndexInfo,
namespace: string
): Promise<boolean> {
const { logger } = this.options;
const primaryNamespacedAlias = indexInfo.getPrimaryAlias(namespace);
const indexPatternForBackingIndices = indexInfo.getPatternForBackingIndices(namespace);
logger.debug(`Checking if concrete write index exists for ${primaryNamespacedAlias}`);
const concreteIndices = await this.fetchConcreteIndices(
primaryNamespacedAlias,
indexPatternForBackingIndices
);
const concreteIndicesExist = concreteIndices.some(
(item) => item.alias === primaryNamespacedAlias
);
const concreteWriteIndicesExist = concreteIndices.some(
(item) => item.alias === primaryNamespacedAlias && item.isWriteIndex
);
// If we find backing indices for the alias here, we shouldn't be making a new concrete index -
// either one of the indices is the write index so we return early because we don't need a new write target,
// or none of them are the write index so we'll throw an error because one of the existing indices should have
// been the write target
// If there are some concrete indices but none of them are the write index, we'll throw an error
// because one of the existing indices should have been the write target.
if (concreteIndicesExist && !concreteWriteIndicesExist) {
throw new Error(
`Indices matching pattern ${indexPatternForBackingIndices} exist but none are set as the write index for alias ${primaryNamespacedAlias}`
);
}
return concreteWriteIndicesExist;
}
private async installNamespacedIndexTemplate(indexInfo: IndexInfo, namespace: string) {
const { logger } = this.options;
private getIndexTemplate(indexInfo: IndexInfo, namespace: string) {
const {
componentTemplateRefs,
componentTemplates,
@ -346,8 +249,6 @@ export class ResourceInstaller {
const secondaryNamespacedAlias = indexInfo.getSecondaryAlias(namespace);
const indexPatternForBackingIndices = indexInfo.getPatternForBackingIndices(namespace);
logger.debug(`Installing index template for ${primaryNamespacedAlias}`);
const technicalComponentNames = [TECHNICAL_COMPONENT_TEMPLATE_NAME];
const ownComponentNames = componentTemplates.map((template) =>
indexInfo.getComponentTemplateName(template.name)
@ -363,17 +264,7 @@ export class ResourceInstaller {
namespace,
};
// TODO: need a way to update this template if/when we decide to make changes to the
// built in index template. Probably do it as part of updateIndexMappingsForAsset?
// (Before upgrading any indices, find and upgrade all namespaced index templates - component templates
// will already have been upgraded by solutions or rule registry, in the case of technical/ECS templates)
// With the current structure, it's tricky because the index template creation
// depends on both the namespace and secondary alias, both of which are not currently available
// to updateIndexMappingsForAsset. We can make the secondary alias available since
// it's known at plugin startup time, but
// the namespace values can really only come from the existing templates that we're trying to update
// - maybe we want to store the namespace as a _meta field on the index template for easy retrieval
await this.createOrUpdateIndexTemplate({
return {
name: indexInfo.getIndexTemplateName(namespace),
body: {
index_patterns: [indexPatternForBackingIndices],
@ -415,141 +306,6 @@ export class ResourceInstaller {
// then newly created indices will use the matching template with the *longest* namespace
priority: namespace.length,
},
});
}
private async createConcreteWriteIndex(indexInfo: IndexInfo, namespace: string) {
const { logger, getClusterClient } = this.options;
const clusterClient = await getClusterClient();
const primaryNamespacedAlias = indexInfo.getPrimaryAlias(namespace);
const initialIndexName = indexInfo.getConcreteIndexInitialName(namespace);
logger.debug(`Creating concrete write index for ${primaryNamespacedAlias}`);
try {
await clusterClient.indices.create({
index: initialIndexName,
body: {
aliases: {
[primaryNamespacedAlias]: {
is_write_index: true,
},
},
},
});
} catch (err) {
logger.error(
`Error creating index ${initialIndexName} as the write index for alias ${primaryNamespacedAlias} in RuleRegistry ResourceInstaller: ${err.message}`
);
// If the index already exists and it's the write index for the alias,
// something else created it so suppress the error. If it's not the write
// index, that's bad, throw an error.
if (err?.meta?.body?.error?.type === 'resource_already_exists_exception') {
const existingIndices = await clusterClient.indices.get({
index: initialIndexName,
});
if (!existingIndices[initialIndexName]?.aliases?.[primaryNamespacedAlias]?.is_write_index) {
throw Error(
`Attempted to create index: ${initialIndexName} as the write index for alias: ${primaryNamespacedAlias}, but the index already exists and is not the write index for the alias`
);
}
} else {
throw err;
}
}
}
// -----------------------------------------------------------------------------------------------
// Helpers
private async createOrUpdateLifecyclePolicy(policy: estypes.IlmPutLifecycleRequest) {
const { logger, getClusterClient } = this.options;
const clusterClient = await getClusterClient();
logger.debug(`Installing lifecycle policy ${policy.name}`);
return clusterClient.ilm.putLifecycle(policy);
}
private async createOrUpdateComponentTemplate(
template: estypes.ClusterPutComponentTemplateRequest
) {
const { logger, getClusterClient } = this.options;
const clusterClient = await getClusterClient();
logger.debug(`Installing component template ${template.name}`);
return clusterClient.cluster.putComponentTemplate(template);
}
private async createOrUpdateIndexTemplate(template: estypes.IndicesPutIndexTemplateRequest) {
const { logger, getClusterClient } = this.options;
const clusterClient = await getClusterClient();
logger.debug(`Installing index template ${template.name}`);
const simulateResponse = await clusterClient.indices.simulateTemplate(template);
const mappings: estypes.MappingTypeMapping = simulateResponse.template.mappings;
if (isEmpty(mappings)) {
throw new Error(
'No mappings would be generated for this index, possibly due to failed/misconfigured bootstrapping'
);
}
return clusterClient.indices.putIndexTemplate(template);
}
private async fetchConcreteIndices(
aliasOrPatternForAliases: string,
indexPatternForBackingIndices: string
): Promise<ConcreteIndexInfo[]> {
const { logger, getClusterClient } = this.options;
const clusterClient = await getClusterClient();
logger.debug(`Fetching concrete indices for ${indexPatternForBackingIndices}`);
try {
// It's critical that we specify *both* the index pattern for backing indices and their alias(es) in this request.
// The alias prevents the request from finding other namespaces that could match the -* part of the index pattern
// (see https://github.com/elastic/kibana/issues/107704). The backing index pattern prevents the request from
// finding legacy .siem-signals indices that we add the alias to for backwards compatibility reasons. Together,
// the index pattern and alias should ensure that we retrieve only the "new" backing indices for this
// particular alias.
const response = await clusterClient.indices.getAlias({
index: indexPatternForBackingIndices,
name: aliasOrPatternForAliases,
});
return createConcreteIndexInfo(response);
} catch (err) {
// 404 is expected if the alerts-as-data indices haven't been created yet
if (err.statusCode === 404) {
return createConcreteIndexInfo({});
}
logger.error(
`Error fetching concrete indices for ${indexPatternForBackingIndices} in RuleRegistry ResourceInstaller - ${err.message}`
);
// A non-404 error is a real problem so we re-throw.
throw err;
}
};
}
}
interface ConcreteIndexInfo {
index: string;
alias: string;
isWriteIndex: boolean;
}
const createConcreteIndexInfo = (
response: estypes.IndicesGetAliasResponse
): ConcreteIndexInfo[] => {
return Object.entries(response).flatMap(([index, { aliases }]) =>
Object.entries(aliases).map(([aliasName, aliasProperties]) => ({
index,
alias: aliasName,
isWriteIndex: aliasProperties.is_write_index ?? false,
}))
);
};