Handle transient Elasticsearch errors during package installation (#118587)

This commit is contained in:
Josh Dover 2021-11-30 13:14:10 +01:00 committed by GitHub
parent b8a7370156
commit 2ffe8d1c03
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 383 additions and 100 deletions

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import type { ElasticsearchClient, SavedObjectsClientContract } from 'kibana/server';
import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from 'kibana/server';
import { ElasticsearchAssetType } from '../../../../../common/types/models';
import type {
@ -18,6 +18,7 @@ import { saveInstalledEsRefs } from '../../packages/install';
import { getAsset } from '../transform/common';
import { getESAssetMetadata } from '../meta';
import { retryTransientEsErrors } from '../retry';
import { deleteIlmRefs, deleteIlms } from './remove';
@ -35,7 +36,8 @@ export const installIlmForDataStream = async (
registryPackage: InstallablePackage,
paths: string[],
esClient: ElasticsearchClient,
savedObjectsClient: SavedObjectsClientContract
savedObjectsClient: SavedObjectsClientContract,
logger: Logger
) => {
const installation = await getInstallation({ savedObjectsClient, pkgName: registryPackage.name });
let previousInstalledIlmEsAssets: EsAssetReference[] = [];
@ -90,7 +92,7 @@ export const installIlmForDataStream = async (
);
const installationPromises = ilmInstallations.map(async (ilmInstallation) => {
return handleIlmInstall({ esClient, ilmInstallation });
return handleIlmInstall({ esClient, ilmInstallation, logger });
});
installedIlms = await Promise.all(installationPromises).then((results) => results.flat());
@ -117,15 +119,21 @@ export const installIlmForDataStream = async (
async function handleIlmInstall({
esClient,
ilmInstallation,
logger,
}: {
esClient: ElasticsearchClient;
ilmInstallation: IlmInstallation;
logger: Logger;
}): Promise<EsAssetReference> {
await esClient.transport.request({
method: 'PUT',
path: `/_ilm/policy/${ilmInstallation.installationName}`,
body: ilmInstallation.content,
});
await retryTransientEsErrors(
() =>
esClient.transport.request({
method: 'PUT',
path: `/_ilm/policy/${ilmInstallation.installationName}`,
body: ilmInstallation.content,
}),
{ logger }
);
return { id: ilmInstallation.installationName, type: ElasticsearchAssetType.dataStreamIlmPolicy };
}

View file

@ -5,18 +5,20 @@
* 2.0.
*/
import type { ElasticsearchClient } from 'kibana/server';
import type { ElasticsearchClient, Logger } from 'kibana/server';
import type { InstallablePackage } from '../../../../types';
import { ElasticsearchAssetType } from '../../../../types';
import { getAsset, getPathParts } from '../../archive';
import { getESAssetMetadata } from '../meta';
import { retryTransientEsErrors } from '../retry';
export async function installILMPolicy(
packageInfo: InstallablePackage,
paths: string[],
esClient: ElasticsearchClient
esClient: ElasticsearchClient,
logger: Logger
) {
const ilmPaths = paths.filter((path) => isILMPolicy(path));
if (!ilmPaths.length) return;
@ -29,11 +31,15 @@ export async function installILMPolicy(
const { file } = getPathParts(path);
const name = file.substr(0, file.lastIndexOf('.'));
try {
await esClient.transport.request({
method: 'PUT',
path: '/_ilm/policy/' + name,
body,
});
await retryTransientEsErrors(
() =>
esClient.transport.request({
method: 'PUT',
path: '/_ilm/policy/' + name,
body,
}),
{ logger }
);
} catch (err) {
throw new Error(err.message);
}

View file

@ -6,7 +6,7 @@
*/
import type { TransportRequestOptions } from '@elastic/elasticsearch';
import type { ElasticsearchClient, SavedObjectsClientContract } from 'src/core/server';
import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from 'src/core/server';
import { ElasticsearchAssetType } from '../../../../types';
import type { EsAssetReference, RegistryDataStream, InstallablePackage } from '../../../../types';
@ -22,6 +22,8 @@ import {
import { appendMetadataToIngestPipeline } from '../meta';
import { retryTransientEsErrors } from '../retry';
import { deletePipelineRefs } from './remove';
interface RewriteSubstitution {
@ -41,7 +43,8 @@ export const installPipelines = async (
installablePackage: InstallablePackage,
paths: string[],
esClient: ElasticsearchClient,
savedObjectsClient: SavedObjectsClientContract
savedObjectsClient: SavedObjectsClientContract,
logger: Logger
) => {
// unlike other ES assets, pipeline names are versioned so after a template is updated
// it can be created pointing to the new template, without removing the old one and effecting data
@ -105,6 +108,7 @@ export const installPipelines = async (
installAllPipelines({
dataStream,
esClient,
logger,
paths: pipelinePaths,
installablePackage,
})
@ -119,6 +123,7 @@ export const installPipelines = async (
installAllPipelines({
dataStream: undefined,
esClient,
logger,
paths: topLevelPipelinePaths,
installablePackage,
})
@ -151,11 +156,13 @@ export function rewriteIngestPipeline(
export async function installAllPipelines({
esClient,
logger,
paths,
dataStream,
installablePackage,
}: {
esClient: ElasticsearchClient;
logger: Logger;
paths: string[];
dataStream?: RegistryDataStream;
installablePackage: InstallablePackage;
@ -195,7 +202,7 @@ export async function installAllPipelines({
});
const installationPromises = pipelines.map(async (pipeline) => {
return installPipeline({ esClient, pipeline, installablePackage });
return installPipeline({ esClient, pipeline, installablePackage, logger });
});
return Promise.all(installationPromises);
@ -203,10 +210,12 @@ export async function installAllPipelines({
async function installPipeline({
esClient,
logger,
pipeline,
installablePackage,
}: {
esClient: ElasticsearchClient;
logger: Logger;
pipeline: any;
installablePackage?: InstallablePackage;
}): Promise<EsAssetReference> {
@ -233,7 +242,10 @@ async function installPipeline({
};
}
await esClient.ingest.putPipeline(esClientParams, esClientRequestOptions);
await retryTransientEsErrors(
() => esClient.ingest.putPipeline(esClientParams, esClientRequestOptions),
{ logger }
);
return {
id: pipelineWithMetadata.nameForInstallation,
@ -241,7 +253,10 @@ async function installPipeline({
};
}
export async function ensureFleetFinalPipelineIsInstalled(esClient: ElasticsearchClient) {
export async function ensureFleetFinalPipelineIsInstalled(
esClient: ElasticsearchClient,
logger: Logger
) {
const esClientRequestOptions: TransportRequestOptions = {
ignore: [404],
};
@ -258,6 +273,7 @@ export async function ensureFleetFinalPipelineIsInstalled(esClient: Elasticsearc
) {
await installPipeline({
esClient,
logger,
pipeline: {
nameForInstallation: FLEET_FINAL_PIPELINE_ID,
contentForInstallation: FLEET_FINAL_PIPELINE_CONTENT,

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import type { ElasticsearchClient, SavedObjectsClientContract } from 'kibana/server';
import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from 'kibana/server';
import { errors } from '@elastic/elasticsearch';
import { saveInstalledEsRefs } from '../../packages/install';
@ -13,6 +13,8 @@ import { getPathParts } from '../../archive';
import { ElasticsearchAssetType } from '../../../../../common/types/models';
import type { EsAssetReference, InstallablePackage } from '../../../../../common/types/models';
import { retryTransientEsErrors } from '../retry';
import { getAsset } from './common';
interface MlModelInstallation {
@ -24,7 +26,8 @@ export const installMlModel = async (
installablePackage: InstallablePackage,
paths: string[],
esClient: ElasticsearchClient,
savedObjectsClient: SavedObjectsClientContract
savedObjectsClient: SavedObjectsClientContract,
logger: Logger
) => {
const mlModelPath = paths.find((path) => isMlModel(path));
@ -47,7 +50,7 @@ export const installMlModel = async (
content,
};
const result = await handleMlModelInstall({ esClient, mlModel });
const result = await handleMlModelInstall({ esClient, logger, mlModel });
installedMlModels.push(result);
}
return installedMlModels;
@ -61,19 +64,25 @@ const isMlModel = (path: string) => {
async function handleMlModelInstall({
esClient,
logger,
mlModel,
}: {
esClient: ElasticsearchClient;
logger: Logger;
mlModel: MlModelInstallation;
}): Promise<EsAssetReference> {
try {
await esClient.ml.putTrainedModel({
model_id: mlModel.installationName,
defer_definition_decompression: true,
timeout: '45s',
// @ts-expect-error expects an object not a string
body: mlModel.content,
});
await retryTransientEsErrors(
() =>
esClient.ml.putTrainedModel({
model_id: mlModel.installationName,
defer_definition_decompression: true,
timeout: '45s',
// @ts-expect-error expects an object not a string
body: mlModel.content,
}),
{ logger }
);
} catch (err) {
// swallow the error if the ml model already exists.
const isAlreadyExistError =

View file

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
jest.mock('timers/promises');
import { setTimeout } from 'timers/promises';
import { loggerMock } from '@kbn/logging/mocks';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { retryTransientEsErrors } from './retry';
const setTimeoutMock = setTimeout as jest.Mock<
ReturnType<typeof setTimeout>,
Parameters<typeof setTimeout>
>;
describe('retryTransientErrors', () => {
beforeEach(() => {
setTimeoutMock.mockClear();
});
it("doesn't retry if operation is successful", async () => {
const esCallMock = jest.fn().mockResolvedValue('success');
expect(await retryTransientEsErrors(esCallMock)).toEqual('success');
expect(esCallMock).toHaveBeenCalledTimes(1);
});
it('logs an warning message on retry', async () => {
const logger = loggerMock.create();
const esCallMock = jest
.fn()
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockResolvedValue('success');
await retryTransientEsErrors(esCallMock, { logger });
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn.mock.calls[0][0]).toMatch(
`Retrying Elasticsearch operation after [2s] due to error: ConnectionError: foo ConnectionError: foo`
);
});
it('retries with an exponential backoff', async () => {
let attempt = 0;
const esCallMock = jest.fn(async () => {
attempt++;
if (attempt < 5) {
throw new EsErrors.ConnectionError('foo');
} else {
return 'success';
}
});
expect(await retryTransientEsErrors(esCallMock)).toEqual('success');
expect(setTimeoutMock.mock.calls).toEqual([[2000], [4000], [8000], [16000]]);
expect(esCallMock).toHaveBeenCalledTimes(5);
});
it('retries each supported error type', async () => {
const errors = [
new EsErrors.NoLivingConnectionsError('no living connection', {
warnings: [],
meta: {} as any,
}),
new EsErrors.ConnectionError('no connection'),
new EsErrors.TimeoutError('timeout'),
new EsErrors.ResponseError({ statusCode: 503, meta: {} as any, warnings: [] }),
new EsErrors.ResponseError({ statusCode: 408, meta: {} as any, warnings: [] }),
new EsErrors.ResponseError({ statusCode: 410, meta: {} as any, warnings: [] }),
];
for (const error of errors) {
const esCallMock = jest.fn().mockRejectedValueOnce(error).mockResolvedValue('success');
expect(await retryTransientEsErrors(esCallMock)).toEqual('success');
expect(esCallMock).toHaveBeenCalledTimes(2);
}
});
it('does not retry unsupported errors', async () => {
const error = new Error('foo!');
const esCallMock = jest.fn().mockRejectedValueOnce(error).mockResolvedValue('success');
await expect(retryTransientEsErrors(esCallMock)).rejects.toThrow(error);
expect(esCallMock).toHaveBeenCalledTimes(1);
});
});

View file

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { setTimeout } from 'timers/promises';
import { errors as EsErrors } from '@elastic/elasticsearch';
import type { Logger } from '@kbn/logging';
const MAX_ATTEMPTS = 5;
const retryResponseStatuses = [
503, // ServiceUnavailable
408, // RequestTimeout
410, // Gone
];
const isRetryableError = (e: any) =>
e instanceof EsErrors.NoLivingConnectionsError ||
e instanceof EsErrors.ConnectionError ||
e instanceof EsErrors.TimeoutError ||
(e instanceof EsErrors.ResponseError && retryResponseStatuses.includes(e?.statusCode!));
/**
* Retries any transient network or configuration issues encountered from Elasticsearch with an exponential backoff.
* Should only be used to wrap operations that are idempotent and can be safely executed more than once.
*/
export const retryTransientEsErrors = async <T>(
esCall: () => Promise<T>,
{ logger, attempt = 0 }: { logger?: Logger; attempt?: number } = {}
): Promise<T> => {
try {
return await esCall();
} catch (e) {
if (attempt < MAX_ATTEMPTS && isRetryableError(e)) {
const retryCount = attempt + 1;
const retryDelaySec = Math.min(Math.pow(2, retryCount), 64); // 2s, 4s, 8s, 16s, 32s, 64s, 64s, 64s ...
logger?.warn(
`Retrying Elasticsearch operation after [${retryDelaySec}s] due to error: ${e.toString()} ${
e.stack
}`
);
await setTimeout(retryDelaySec * 1000);
return retryTransientEsErrors(esCall, { logger, attempt: retryCount });
}
throw e;
}
};

View file

@ -6,6 +6,7 @@
*/
import { elasticsearchServiceMock } from 'src/core/server/mocks';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { loggerMock } from '@kbn/logging/mocks';
import { createAppContextStartContractMock } from '../../../../mocks';
import { appContextService } from '../../../../services';
@ -44,6 +45,7 @@ describe('EPM install', () => {
const templatePriorityDatasetIsPrefixUnset = 200;
await installTemplate({
esClient,
logger: loggerMock.create(),
fields,
dataStream: dataStreamDatasetIsPrefixUnset,
packageVersion: pkg.version,
@ -84,6 +86,7 @@ describe('EPM install', () => {
const templatePriorityDatasetIsPrefixFalse = 200;
await installTemplate({
esClient,
logger: loggerMock.create(),
fields,
dataStream: dataStreamDatasetIsPrefixFalse,
packageVersion: pkg.version,
@ -124,6 +127,7 @@ describe('EPM install', () => {
const templatePriorityDatasetIsPrefixTrue = 150;
await installTemplate({
esClient,
logger: loggerMock.create(),
fields,
dataStream: dataStreamDatasetIsPrefixTrue,
packageVersion: pkg.version,
@ -174,6 +178,7 @@ describe('EPM install', () => {
const templatePriorityDatasetIsPrefixUnset = 200;
await installTemplate({
esClient,
logger: loggerMock.create(),
fields,
dataStream: dataStreamDatasetIsPrefixUnset,
packageVersion: pkg.version,

View file

@ -7,7 +7,7 @@
import { merge } from 'lodash';
import Boom from '@hapi/boom';
import type { ElasticsearchClient, SavedObjectsClientContract } from 'src/core/server';
import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from 'src/core/server';
import { ElasticsearchAssetType } from '../../../../types';
import type {
@ -29,6 +29,7 @@ import {
import type { ESAssetMetadata } from '../meta';
import { getESAssetMetadata } from '../meta';
import { retryTransientEsErrors } from '../retry';
import {
generateMappings,
@ -42,14 +43,15 @@ import { buildDefaultSettings } from './default_settings';
export const installTemplates = async (
installablePackage: InstallablePackage,
esClient: ElasticsearchClient,
logger: Logger,
paths: string[],
savedObjectsClient: SavedObjectsClientContract
): Promise<IndexTemplateEntry[]> => {
// install any pre-built index template assets,
// atm, this is only the base package's global index templates
// Install component templates first, as they are used by the index templates
await installPreBuiltComponentTemplates(paths, esClient);
await installPreBuiltTemplates(paths, esClient);
await installPreBuiltComponentTemplates(paths, esClient, logger);
await installPreBuiltTemplates(paths, esClient, logger);
// remove package installation's references to index templates
await removeAssetTypesFromInstalledEs(savedObjectsClient, installablePackage.name, [
@ -65,6 +67,7 @@ export const installTemplates = async (
installTemplateForDataStream({
pkg: installablePackage,
esClient,
logger,
dataStream,
})
)
@ -84,7 +87,11 @@ export const installTemplates = async (
return installedTemplates;
};
const installPreBuiltTemplates = async (paths: string[], esClient: ElasticsearchClient) => {
const installPreBuiltTemplates = async (
paths: string[],
esClient: ElasticsearchClient,
logger: Logger
) => {
const templatePaths = paths.filter((path) => isTemplate(path));
const templateInstallPromises = templatePaths.map(async (path) => {
const { file } = getPathParts(path);
@ -96,10 +103,16 @@ const installPreBuiltTemplates = async (paths: string[], esClient: Elasticsearch
if (content.hasOwnProperty('template') || content.hasOwnProperty('composed_of')) {
// Template is v2
return esClient.indices.putIndexTemplate(esClientParams, esClientRequestOptions);
return retryTransientEsErrors(
() => esClient.indices.putIndexTemplate(esClientParams, esClientRequestOptions),
{ logger }
);
} else {
// template is V1
return esClient.indices.putTemplate(esClientParams, esClientRequestOptions);
return retryTransientEsErrors(
() => esClient.indices.putTemplate(esClientParams, esClientRequestOptions),
{ logger }
);
}
});
try {
@ -113,7 +126,8 @@ const installPreBuiltTemplates = async (paths: string[], esClient: Elasticsearch
const installPreBuiltComponentTemplates = async (
paths: string[],
esClient: ElasticsearchClient
esClient: ElasticsearchClient,
logger: Logger
) => {
const templatePaths = paths.filter((path) => isComponentTemplate(path));
const templateInstallPromises = templatePaths.map(async (path) => {
@ -126,7 +140,10 @@ const installPreBuiltComponentTemplates = async (
body: content,
};
return esClient.cluster.putComponentTemplate(esClientParams, { ignore: [404] });
return retryTransientEsErrors(
() => esClient.cluster.putComponentTemplate(esClientParams, { ignore: [404] }),
{ logger }
);
});
try {
@ -157,15 +174,18 @@ const isComponentTemplate = (path: string) => {
export async function installTemplateForDataStream({
pkg,
esClient,
logger,
dataStream,
}: {
pkg: InstallablePackage;
esClient: ElasticsearchClient;
logger: Logger;
dataStream: RegistryDataStream;
}): Promise<IndexTemplateEntry> {
const fields = await loadFieldsFromYaml(pkg, dataStream.path);
return installTemplate({
esClient,
logger,
fields,
dataStream,
packageVersion: pkg.version,
@ -186,6 +206,7 @@ interface TemplateMapEntry {
type TemplateMap = Record<string, TemplateMapEntry>;
function putComponentTemplate(
esClient: ElasticsearchClient,
logger: Logger,
params: {
body: TemplateMapEntry;
name: string;
@ -194,9 +215,9 @@ function putComponentTemplate(
): { clusterPromise: Promise<any>; name: string } {
const { name, body, create = false } = params;
return {
clusterPromise: esClient.cluster.putComponentTemplate(
{ name, body, create },
{ ignore: [404] }
clusterPromise: retryTransientEsErrors(
() => esClient.cluster.putComponentTemplate({ name, body, create }, { ignore: [404] }),
{ logger }
),
name,
};
@ -256,10 +277,12 @@ async function installDataStreamComponentTemplates(params: {
templateName: string;
registryElasticsearch: RegistryElasticsearch | undefined;
esClient: ElasticsearchClient;
logger: Logger;
packageName: string;
defaultSettings: IndexTemplate['template']['settings'];
}) {
const { templateName, registryElasticsearch, esClient, packageName, defaultSettings } = params;
const { templateName, registryElasticsearch, esClient, packageName, defaultSettings, logger } =
params;
const templates = buildComponentTemplates({
templateName,
registryElasticsearch,
@ -274,15 +297,22 @@ async function installDataStreamComponentTemplates(params: {
templateEntries.map(async ([name, body]) => {
if (isUserSettingsTemplate(name)) {
// look for existing user_settings template
const result = await esClient.cluster.getComponentTemplate({ name }, { ignore: [404] });
const result = await retryTransientEsErrors(
() => esClient.cluster.getComponentTemplate({ name }, { ignore: [404] }),
{ logger }
);
const hasUserSettingsTemplate = result.body.component_templates?.length === 1;
if (!hasUserSettingsTemplate) {
// only add if one isn't already present
const { clusterPromise } = putComponentTemplate(esClient, { body, name, create: true });
const { clusterPromise } = putComponentTemplate(esClient, logger, {
body,
name,
create: true,
});
return clusterPromise;
}
} else {
const { clusterPromise } = putComponentTemplate(esClient, { body, name });
const { clusterPromise } = putComponentTemplate(esClient, logger, { body, name });
return clusterPromise;
}
})
@ -291,19 +321,26 @@ async function installDataStreamComponentTemplates(params: {
return templateNames;
}
export async function ensureDefaultComponentTemplate(esClient: ElasticsearchClient) {
const { body: getTemplateRes } = await esClient.cluster.getComponentTemplate(
{
name: FLEET_GLOBAL_COMPONENT_TEMPLATE_NAME,
},
{
ignore: [404],
}
export async function ensureDefaultComponentTemplate(
esClient: ElasticsearchClient,
logger: Logger
) {
const { body: getTemplateRes } = await retryTransientEsErrors(
() =>
esClient.cluster.getComponentTemplate(
{
name: FLEET_GLOBAL_COMPONENT_TEMPLATE_NAME,
},
{
ignore: [404],
}
),
{ logger }
);
const existingTemplate = getTemplateRes?.component_templates?.[0];
if (!existingTemplate) {
await putComponentTemplate(esClient, {
await putComponentTemplate(esClient, logger, {
name: FLEET_GLOBAL_COMPONENT_TEMPLATE_NAME,
body: FLEET_GLOBAL_COMPONENT_TEMPLATE_CONTENT,
create: true,
@ -315,12 +352,14 @@ export async function ensureDefaultComponentTemplate(esClient: ElasticsearchClie
export async function installTemplate({
esClient,
logger,
fields,
dataStream,
packageVersion,
packageName,
}: {
esClient: ElasticsearchClient;
logger: Logger;
fields: Field[];
dataStream: RegistryDataStream;
packageVersion: string;
@ -342,13 +381,17 @@ export async function installTemplate({
}
// Datastream now throw an error if the aliases field is present so ensure that we remove that field.
const { body: getTemplateRes } = await esClient.indices.getIndexTemplate(
{
name: templateName,
},
{
ignore: [404],
}
const { body: getTemplateRes } = await retryTransientEsErrors(
() =>
esClient.indices.getIndexTemplate(
{
name: templateName,
},
{
ignore: [404],
}
),
{ logger }
);
const existingIndexTemplate = getTemplateRes?.index_templates?.[0];
@ -369,7 +412,10 @@ export async function installTemplate({
},
};
await esClient.indices.putIndexTemplate(updateIndexTemplateParams, { ignore: [404] });
await retryTransientEsErrors(
() => esClient.indices.putIndexTemplate(updateIndexTemplateParams, { ignore: [404] }),
{ logger }
);
}
const defaultSettings = buildDefaultSettings({
@ -384,6 +430,7 @@ export async function installTemplate({
templateName,
registryElasticsearch: dataStream.elasticsearch,
esClient,
logger,
packageName,
defaultSettings,
});
@ -406,7 +453,10 @@ export async function installTemplate({
body: template,
};
await esClient.indices.putIndexTemplate(esClientParams, { ignore: [404] });
await retryTransientEsErrors(
() => esClient.indices.putIndexTemplate(esClientParams, { ignore: [404] }),
{ logger }
);
return {
templateName,

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import type { ElasticsearchClient } from 'kibana/server';
import type { ElasticsearchClient, Logger } from 'kibana/server';
import type { Field, Fields } from '../../fields/field';
import type {
@ -18,6 +18,7 @@ import { appContextService } from '../../../';
import { getRegistryDataStreamAssetBaseName } from '../index';
import { FLEET_GLOBAL_COMPONENT_TEMPLATE_NAME } from '../../../../constants';
import { getESAssetMetadata } from '../meta';
import { retryTransientEsErrors } from '../retry';
interface Properties {
[key: string]: any;
@ -408,13 +409,14 @@ function getBaseTemplate(
export const updateCurrentWriteIndices = async (
esClient: ElasticsearchClient,
logger: Logger,
templates: IndexTemplateEntry[]
): Promise<void> => {
if (!templates.length) return;
const allIndices = await queryDataStreamsFromTemplates(esClient, templates);
if (!allIndices.length) return;
return updateAllDataStreams(allIndices, esClient);
return updateAllDataStreams(allIndices, esClient, logger);
};
function isCurrentDataStream(item: CurrentDataStream[] | undefined): item is CurrentDataStream[] {
@ -448,11 +450,12 @@ const getDataStreams = async (
const updateAllDataStreams = async (
indexNameWithTemplates: CurrentDataStream[],
esClient: ElasticsearchClient
esClient: ElasticsearchClient,
logger: Logger
): Promise<void> => {
const updatedataStreamPromises = indexNameWithTemplates.map(
({ dataStreamName, indexTemplate }) => {
return updateExistingDataStream({ dataStreamName, esClient, indexTemplate });
return updateExistingDataStream({ dataStreamName, esClient, logger, indexTemplate });
}
);
await Promise.all(updatedataStreamPromises);
@ -460,10 +463,12 @@ const updateAllDataStreams = async (
const updateExistingDataStream = async ({
dataStreamName,
esClient,
logger,
indexTemplate,
}: {
dataStreamName: string;
esClient: ElasticsearchClient;
logger: Logger;
indexTemplate: IndexTemplate;
}) => {
const { settings, mappings } = indexTemplate.template;
@ -476,14 +481,19 @@ const updateExistingDataStream = async ({
// try to update the mappings first
try {
await esClient.indices.putMapping({
index: dataStreamName,
body: mappings,
write_index_only: true,
});
await retryTransientEsErrors(
() =>
esClient.indices.putMapping({
index: dataStreamName,
body: mappings,
write_index_only: true,
}),
{ logger }
);
// if update fails, rollover data stream
} catch (err) {
try {
// Do no wrap rollovers in retryTransientEsErrors since it is not idempotent
const path = `/${dataStreamName}/_rollover`;
await esClient.transport.request({
method: 'POST',
@ -498,10 +508,14 @@ const updateExistingDataStream = async ({
// for now, only update the pipeline
if (!settings.index.default_pipeline) return;
try {
await esClient.indices.putSettings({
index: dataStreamName,
body: { default_pipeline: settings.index.default_pipeline },
});
await retryTransientEsErrors(
() =>
esClient.indices.putSettings({
index: dataStreamName,
body: { default_pipeline: settings.index.default_pipeline },
}),
{ logger }
);
} catch (err) {
throw new Error(`could not update index template settings for ${dataStreamName}`);
}

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import type { ElasticsearchClient, SavedObjectsClientContract } from 'kibana/server';
import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from 'kibana/server';
import { errors } from '@elastic/elasticsearch';
import { saveInstalledEsRefs } from '../../packages/install';
@ -13,10 +13,11 @@ import { getPathParts } from '../../archive';
import { ElasticsearchAssetType } from '../../../../../common/types/models';
import type { EsAssetReference, InstallablePackage } from '../../../../../common/types/models';
import { getInstallation } from '../../packages';
import { appContextService } from '../../../app_context';
import { getESAssetMetadata } from '../meta';
import { retryTransientEsErrors } from '../retry';
import { deleteTransforms, deleteTransformRefs } from './remove';
import { getAsset } from './common';
@ -29,9 +30,9 @@ export const installTransform = async (
installablePackage: InstallablePackage,
paths: string[],
esClient: ElasticsearchClient,
savedObjectsClient: SavedObjectsClientContract
savedObjectsClient: SavedObjectsClientContract,
logger: Logger
) => {
const logger = appContextService.getLogger();
const installation = await getInstallation({
savedObjectsClient,
pkgName: installablePackage.name,
@ -87,7 +88,7 @@ export const installTransform = async (
});
const installationPromises = transforms.map(async (transform) => {
return handleTransformInstall({ esClient, transform });
return handleTransformInstall({ esClient, logger, transform });
});
installedTransforms = await Promise.all(installationPromises).then((results) => results.flat());
@ -118,18 +119,24 @@ const isTransform = (path: string) => {
async function handleTransformInstall({
esClient,
logger,
transform,
}: {
esClient: ElasticsearchClient;
logger: Logger;
transform: TransformInstallation;
}): Promise<EsAssetReference> {
try {
// defer validation on put if the source index is not available
await esClient.transform.putTransform({
transform_id: transform.installationName,
defer_validation: true,
body: transform.content,
});
await retryTransientEsErrors(
() =>
// defer validation on put if the source index is not available
esClient.transform.putTransform({
transform_id: transform.installationName,
defer_validation: true,
body: transform.content,
}),
{ logger }
);
} catch (err) {
// swallow the error if the transform already exists.
const isAlreadyExistError =

View file

@ -21,6 +21,7 @@ jest.mock('./common', () => {
import { errors } from '@elastic/elasticsearch';
import type { DeeplyMockedKeys } from '@kbn/utility-types/jest';
import type { ElasticsearchClient, SavedObject, SavedObjectsClientContract } from 'kibana/server';
import { loggerMock } from '@kbn/logging/mocks';
import { ElasticsearchAssetType } from '../../../../types';
import type { Installation, RegistryPackage } from '../../../../types';
@ -157,7 +158,8 @@ describe('test transform install', () => {
'endpoint-0.16.0-dev.0/elasticsearch/transform/metadata_current/default.json',
],
esClient,
savedObjectsClient
savedObjectsClient,
loggerMock.create()
);
expect(esClient.transform.getTransform.mock.calls).toEqual([
@ -329,7 +331,8 @@ describe('test transform install', () => {
} as unknown as RegistryPackage,
['endpoint-0.16.0-dev.0/elasticsearch/transform/metadata_current/default.json'],
esClient,
savedObjectsClient
savedObjectsClient,
loggerMock.create()
);
const meta = getESAssetMetadata({ packageName: 'endpoint' });
@ -441,7 +444,8 @@ describe('test transform install', () => {
} as unknown as RegistryPackage,
[],
esClient,
savedObjectsClient
savedObjectsClient,
loggerMock.create()
);
expect(esClient.transform.getTransform.mock.calls).toEqual([
@ -556,7 +560,8 @@ describe('test transform install', () => {
} as unknown as RegistryPackage,
['endpoint-0.16.0-dev.0/elasticsearch/transform/metadata_current/default.json'],
esClient,
savedObjectsClient
savedObjectsClient,
loggerMock.create()
);
const meta = getESAssetMetadata({ packageName: 'endpoint' });

View file

@ -7,6 +7,7 @@
import type { SavedObjectsClientContract, ElasticsearchClient } from 'src/core/server';
import { savedObjectsClientMock, elasticsearchServiceMock } from 'src/core/server/mocks';
import { loggerMock } from '@kbn/logging/mocks';
import { appContextService } from '../../app_context';
import { createAppContextStartContractMock } from '../../../mocks';
@ -66,6 +67,7 @@ describe('_installPackage', () => {
const installationPromise = _installPackage({
savedObjectsClient: soClient,
esClient,
logger: loggerMock.create(),
paths: [],
packageInfo: {
title: 'title',

View file

@ -5,7 +5,12 @@
* 2.0.
*/
import type { ElasticsearchClient, SavedObject, SavedObjectsClientContract } from 'src/core/server';
import type {
ElasticsearchClient,
Logger,
SavedObject,
SavedObjectsClientContract,
} from 'src/core/server';
import {
MAX_TIME_COMPLETE_INSTALL,
@ -44,6 +49,7 @@ import { deleteKibanaSavedObjectsAssets } from './remove';
export async function _installPackage({
savedObjectsClient,
esClient,
logger,
installedPkg,
paths,
packageInfo,
@ -52,6 +58,7 @@ export async function _installPackage({
}: {
savedObjectsClient: SavedObjectsClientContract;
esClient: ElasticsearchClient;
logger: Logger;
installedPkg?: SavedObject<Installation>;
paths: string[];
packageInfo: InstallablePackage;
@ -131,41 +138,51 @@ export async function _installPackage({
// currently only the base package has an ILM policy
// at some point ILM policies can be installed/modified
// per data stream and we should then save them
await installILMPolicy(packageInfo, paths, esClient);
await installILMPolicy(packageInfo, paths, esClient, logger);
const installedDataStreamIlm = await installIlmForDataStream(
packageInfo,
paths,
esClient,
savedObjectsClient
savedObjectsClient,
logger
);
// installs ml models
const installedMlModel = await installMlModel(packageInfo, paths, esClient, savedObjectsClient);
const installedMlModel = await installMlModel(
packageInfo,
paths,
esClient,
savedObjectsClient,
logger
);
// installs versionized pipelines without removing currently installed ones
const installedPipelines = await installPipelines(
packageInfo,
paths,
esClient,
savedObjectsClient
savedObjectsClient,
logger
);
// install or update the templates referencing the newly installed pipelines
const installedTemplates = await installTemplates(
packageInfo,
esClient,
logger,
paths,
savedObjectsClient
);
// update current backing indices of each data stream
await updateCurrentWriteIndices(esClient, installedTemplates);
await updateCurrentWriteIndices(esClient, logger, installedTemplates);
const installedTransforms = await installTransform(
packageInfo,
paths,
esClient,
savedObjectsClient
savedObjectsClient,
logger
);
// If this is an update or retrying an update, delete the previous version's pipelines

View file

@ -308,6 +308,7 @@ async function installPackageFromRegistry({
return _installPackage({
savedObjectsClient,
esClient,
logger,
installedPkg,
paths,
packageInfo,
@ -367,6 +368,7 @@ async function installPackageByUpload({
archiveBuffer,
contentType,
}: InstallUploadedArchiveParams): Promise<InstallResult> {
const logger = appContextService.getLogger();
// if an error happens during getInstallType, report that we don't know
let installType: InstallType = 'unknown';
const telemetryEvent: PackageUpdateEvent = getTelemetryEvent('', '');
@ -409,6 +411,7 @@ async function installPackageByUpload({
return _installPackage({
savedObjectsClient,
esClient,
logger,
installedPkg,
paths,
packageInfo,

View file

@ -139,8 +139,8 @@ export async function ensureFleetGlobalEsAssets(
// Ensure Global Fleet ES assets are installed
logger.debug('Creating Fleet component template and ingest pipeline');
const globalAssetsRes = await Promise.all([
ensureDefaultComponentTemplate(esClient),
ensureFleetFinalPipelineIsInstalled(esClient),
ensureDefaultComponentTemplate(esClient, logger),
ensureFleetFinalPipelineIsInstalled(esClient, logger),
]);
if (globalAssetsRes.some((asset) => asset.isCreated)) {