[Fleet] Fix ES index patterns for custom package (#176010)

This commit is contained in:
Nicolas Chaulet 2024-02-02 10:50:37 -05:00 committed by GitHub
parent e2fc23f57e
commit 1985dd07bc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 419 additions and 309 deletions

View file

@ -55,9 +55,6 @@ const createMockFleetStartContract = (): DeeplyMockedKeys<FleetStartContract> =>
fromRequest: jest.fn(async (_) => createFleetAuthzMock()),
},
fleetSetupCompleted: jest.fn().mockResolvedValue(undefined),
esIndexPatternService: {
getESIndexPattern: jest.fn().mockResolvedValue(undefined),
},
// @ts-expect-error 2322
agentService: createMockAgentService(),
// @ts-expect-error 2322

View file

@ -62,9 +62,6 @@ const createMockFleetStartContract = (): DeeplyMockedKeys<FleetStartContract> =>
fromRequest: jest.fn(async (_) => createFleetAuthzMock()),
},
fleetSetupCompleted: jest.fn().mockResolvedValue(undefined),
esIndexPatternService: {
getESIndexPattern: jest.fn().mockResolvedValue(undefined),
},
// @ts-expect-error 2322
agentService: createMockAgentService(),
// @ts-expect-error 2322

View file

@ -33,6 +33,8 @@ export const DATASET_VAR_NAME = 'data_stream.dataset';
export const CUSTOM_INTEGRATION_PACKAGE_SPEC_VERSION = '2.9.0';
export const GENERIC_DATASET_NAME = 'generic';
/*
Package rules:
| | autoUpdatePackages |

View file

@ -236,7 +236,7 @@ describe('getNormalizedDataStreams', () => {
title: expect.any(String),
release: 'ga',
package: 'nginx',
path: 'nginx',
path: 'nginx.bar',
streams: [
{
input: 'string',

View file

@ -80,13 +80,15 @@ export const getNormalizedDataStreams = (
}
return policyTemplates.map((policyTemplate) => {
const dataset = datasetName || createDefaultDatasetName(packageInfo, policyTemplate);
const dataStream: RegistryDataStream = {
type: policyTemplate.type,
dataset: datasetName || createDefaultDatasetName(packageInfo, policyTemplate),
dataset,
title: policyTemplate.title + ' Dataset',
release: packageInfo.release || 'ga',
package: packageInfo.name,
path: packageInfo.name,
path: dataset,
elasticsearch: packageInfo.elasticsearch || {},
streams: [
{

View file

@ -12,14 +12,13 @@ import { i18n } from '@kbn/i18n';
import { FormattedMessage } from '@kbn/i18n-react';
import type { DataStream } from '../../../../../../../../../common/types';
import { GENERIC_DATASET_NAME } from '../../../../../../../../../common/constants';
interface SelectedDataset {
dataset: string;
package: string;
}
const GENERIC_DATASET_NAME = 'generic';
export const DatasetComboBox: React.FC<{
value?: SelectedDataset | string;
onChange: (newValue: SelectedDataset) => void;

View file

@ -11,7 +11,6 @@ export { buildAgentStatusRuntimeField } from './services/agents/build_status_run
export type {
AgentService,
AgentClient,
ESIndexPatternService,
PackageService,
PackageClient,
AgentPolicyServiceInterface,

View file

@ -90,17 +90,11 @@ import { registerEncryptedSavedObjects, registerSavedObjects } from './saved_obj
import { registerRoutes } from './routes';
import type { ExternalCallback, FleetRequestHandlerContext } from './types';
import type {
AgentPolicyServiceInterface,
AgentService,
ESIndexPatternService,
PackageService,
} from './services';
import type { AgentPolicyServiceInterface, AgentService, PackageService } from './services';
import {
agentPolicyService,
AgentServiceImpl,
appContextService,
ESIndexPatternSavedObjectService,
FleetUsageSender,
licenseService,
packagePolicyService,
@ -204,7 +198,6 @@ export interface FleetStartContract {
authz: {
fromRequest(request: KibanaRequest): Promise<FleetAuthz>;
};
esIndexPatternService: ESIndexPatternService;
packageService: PackageService;
agentService: AgentService;
/**
@ -611,7 +604,6 @@ export class FleetPlugin
fromRequest: getAuthzFromRequest,
},
fleetSetupCompleted: () => fleetSetupPromise,
esIndexPatternService: new ESIndexPatternSavedObjectService(),
packageService: this.setupPackageService(
core.elasticsearch.client.asInternalUser,
internalSoClient

View file

@ -12,7 +12,7 @@ import {
type PackageInstallContext,
} from '../../../../../common/types/models';
import type { EsAssetReference, RegistryDataStream } from '../../../../../common/types/models';
import { updateEsAssetReferences } from '../../packages/install';
import { updateEsAssetReferences } from '../../packages/es_assets_reference';
import { getAssetFromAssetsMap } from '../../archive';
import { getESAssetMetadata } from '../meta';

View file

@ -11,7 +11,7 @@ import type { EsAssetReference } from '../../../../types';
import { ElasticsearchAssetType } from '../../../../types';
import { getAssetFromAssetsMap, getPathParts } from '../../archive';
import { updateEsAssetReferences } from '../../packages/install';
import { updateEsAssetReferences } from '../../packages/es_assets_reference';
import { getESAssetMetadata } from '../meta';
import { retryTransientEsErrors } from '../retry';
import { PackageInvalidArchiveError } from '../../../../errors';

View file

@ -11,7 +11,7 @@ import { appContextService } from '../../..';
import { ElasticsearchAssetType } from '../../../../types';
import { FleetError } from '../../../../errors';
import type { EsAssetReference } from '../../../../../common/types';
import { updateEsAssetReferences } from '../../packages/install';
import { updateEsAssetReferences } from '../../packages/es_assets_reference';
export const deletePreviousPipelines = async (
esClient: ElasticsearchClient,

View file

@ -17,7 +17,7 @@ import type { EsAssetReference } from '../../../../../common/types/models';
import { retryTransientEsErrors } from '../retry';
import { updateEsAssetReferences } from '../../packages/install';
import { updateEsAssetReferences } from '../../packages/es_assets_reference';
interface MlModelInstallation {
installationName: string;

View file

@ -28,7 +28,7 @@ import {
import { isFields, processFields } from '../../fields/field';
import { generateMappings } from '../template/template';
import { getESAssetMetadata } from '../meta';
import { updateEsAssetReferences } from '../../packages/install';
import { updateEsAssetReferences } from '../../packages/es_assets_reference';
import { getAssetFromAssetsMap, getPathParts } from '../../archive';
import {
ElasticsearchAssetType,

View file

@ -12,7 +12,7 @@ import type { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-ser
import { sortBy, uniqBy } from 'lodash';
import type { SecondaryAuthorizationHeader } from '../../../../../common/types/models/transform_api_key';
import { updateEsAssetReferences } from '../../packages/install';
import { updateEsAssetReferences } from '../../packages/es_assets_reference';
import type { Installation } from '../../../../../common';
import { ElasticsearchAssetType, PACKAGES_SAVED_OBJECT_TYPE } from '../../../../../common';

View file

@ -31,6 +31,7 @@ jest.mock('../kibana/assets/install');
jest.mock('../kibana/index_pattern/install');
jest.mock('./install');
jest.mock('./get');
jest.mock('./install_index_template_pipeline');
jest.mock('../archive/storage');
jest.mock('../elasticsearch/ilm/install');
@ -41,7 +42,8 @@ import { installKibanaAssetsAndReferences } from '../kibana/assets/install';
import { MAX_TIME_COMPLETE_INSTALL } from '../../../../common/constants';
import { installIndexTemplatesAndPipelines, restartInstallation } from './install';
import { restartInstallation } from './install';
import { installIndexTemplatesAndPipelines } from './install_index_template_pipeline';
import { _installPackage } from './_install_package';

View file

@ -50,13 +50,10 @@ import { appContextService, packagePolicyService } from '../..';
import { auditLoggingService } from '../../audit_logging';
import {
createInstallation,
restartInstallation,
installIndexTemplatesAndPipelines,
} from './install';
import { createInstallation, restartInstallation } from './install';
import { withPackageSpan } from './utils';
import { clearLatestFailedAttempts } from './install_errors_helpers';
import { installIndexTemplatesAndPipelines } from './install_index_template_pipeline';
// this is only exported for testing
// use a leading underscore to indicate it's not the supported path

View file

@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { SavedObjectsClientContract } from '@kbn/core/server';
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import pRetry from 'p-retry';
import { uniqBy } from 'lodash';
import type { EsAssetReference, Installation } from '../../../types';
import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../constants';
import { auditLoggingService } from '../../audit_logging';
/**
* Utility function for updating the installed_es field of a package
*/
export const updateEsAssetReferences = async (
savedObjectsClient: SavedObjectsClientContract,
pkgName: string,
currentAssets: EsAssetReference[],
{
assetsToAdd = [],
assetsToRemove = [],
refresh = false,
}: {
assetsToAdd?: EsAssetReference[];
assetsToRemove?: EsAssetReference[];
/**
* Whether or not the update should force a refresh on the SO index.
* Defaults to `false` for faster updates, should only be `wait_for` if the update needs to be queried back from ES
* immediately.
*/
refresh?: 'wait_for' | false;
}
): Promise<EsAssetReference[]> => {
const withAssetsRemoved = currentAssets.filter(({ type, id }) => {
if (
assetsToRemove.some(
({ type: removeType, id: removeId }) => removeType === type && removeId === id
)
) {
return false;
}
return true;
});
const deduplicatedAssets = uniqBy(
[...withAssetsRemoved, ...assetsToAdd],
({ type, id }) => `${type}-${id}`
);
auditLoggingService.writeCustomSoAuditLog({
action: 'update',
id: pkgName,
savedObjectType: PACKAGES_SAVED_OBJECT_TYPE,
});
const {
attributes: { installed_es: updatedAssets },
} =
// Because Kibana assets are installed in parallel with ES assets with refresh: false, we almost always run into an
// issue that causes a conflict error due to this issue: https://github.com/elastic/kibana/issues/126240. This is safe
// to retry constantly until it succeeds to optimize this critical user journey path as much as possible.
await pRetry(
() =>
savedObjectsClient.update<Installation>(
PACKAGES_SAVED_OBJECT_TYPE,
pkgName,
{
installed_es: deduplicatedAssets,
},
{
refresh,
}
),
// Use a lower number of retries for ES assets since they're installed in serial and can only conflict with
// the single Kibana update call.
{ retries: 5 }
);
return updatedAssets ?? [];
};
/**
* Utility function for adding assets the installed_es field of a package
* uses optimistic concurrency control to prevent missed updates
*/
export const optimisticallyAddEsAssetReferences = async (
savedObjectsClient: SavedObjectsClientContract,
pkgName: string,
assetsToAdd: EsAssetReference[],
esIndexPatterns?: Record<string, string>
): Promise<EsAssetReference[]> => {
const addEsAssets = async () => {
// TODO: Should this be replaced by a `get()` call from epm/get.ts?
const so = await savedObjectsClient.get<Installation>(PACKAGES_SAVED_OBJECT_TYPE, pkgName);
auditLoggingService.writeCustomSoAuditLog({
action: 'get',
id: pkgName,
savedObjectType: PACKAGES_SAVED_OBJECT_TYPE,
});
const installedEs = so.attributes.installed_es ?? [];
const deduplicatedAssets = uniqBy(
[...installedEs, ...assetsToAdd],
({ type, id }) => `${type}-${id}`
);
const deduplicatedIndexPatterns = Object.assign(
{},
so.attributes.es_index_patterns ?? {},
esIndexPatterns
);
auditLoggingService.writeCustomSoAuditLog({
action: 'update',
id: pkgName,
savedObjectType: PACKAGES_SAVED_OBJECT_TYPE,
});
const {
attributes: { installed_es: updatedAssets },
} = await savedObjectsClient.update<Installation>(
PACKAGES_SAVED_OBJECT_TYPE,
pkgName,
{
installed_es: deduplicatedAssets,
es_index_patterns: deduplicatedIndexPatterns,
},
{
version: so.version,
}
);
return updatedAssets ?? [];
};
const onlyRetryConflictErrors = (err: Error) => {
if (!SavedObjectsErrorHelpers.isConflictError(err)) {
throw err;
}
};
return pRetry(addEsAssets, { retries: 10, onFailedAttempt: onlyRetryConflictErrors });
};

View file

@ -16,16 +16,27 @@ import { sendTelemetryEvents } from '../../upgrade_sender';
import { licenseService } from '../../license';
import { auditLoggingService } from '../../audit_logging';
import { appContextService } from '../../app_context';
import { ConcurrentInstallOperationError, FleetError } from '../../../errors';
import { ConcurrentInstallOperationError, FleetError, PackageNotFoundError } from '../../../errors';
import * as Registry from '../registry';
import { dataStreamService } from '../../data_streams';
import { createInstallation, handleInstallPackageFailure, installPackage } from './install';
import {
createInstallation,
handleInstallPackageFailure,
installAssetsForInputPackagePolicy,
installPackage,
} from './install';
import * as install from './_install_package';
import { getBundledPackageByPkgKey } from './bundled_packages';
import * as obj from '.';
import { getInstalledPackageWithAssets, getInstallationObject } from './get';
import { optimisticallyAddEsAssetReferences } from './es_assets_reference';
jest.mock('../../data_streams');
jest.mock('./get');
jest.mock('./install_index_template_pipeline');
jest.mock('./es_assets_reference');
jest.mock('../../app_context', () => {
const logger = { error: jest.fn(), debug: jest.fn(), warn: jest.fn(), info: jest.fn() };
const mockedSavedObjectTagging = {
@ -228,8 +239,9 @@ describe('install', () => {
it('should send telemetry on update success', async () => {
jest
.spyOn(obj, 'getInstallationObject')
.mockImplementationOnce(() => Promise.resolve({ attributes: { version: '1.2.0' } } as any));
.mocked(getInstallationObject)
.mockResolvedValueOnce({ attributes: { version: '1.2.0' } } as any);
jest.spyOn(licenseService, 'hasAtLeast').mockReturnValue(true);
await installPackage({
spaceId: DEFAULT_SPACE_ID,
@ -319,16 +331,14 @@ describe('install', () => {
});
it('should do nothing if same version is installed', async () => {
jest.spyOn(obj, 'getInstallationObject').mockImplementationOnce(() =>
Promise.resolve({
attributes: {
version: '1.2.0',
install_status: 'installed',
installed_es: [],
installed_kibana: [],
},
} as any)
);
jest.mocked(getInstallationObject).mockResolvedValueOnce({
attributes: {
version: '1.2.0',
install_status: 'installed',
installed_es: [],
installed_kibana: [],
},
} as any);
jest.spyOn(licenseService, 'hasAtLeast').mockReturnValue(true);
const response = await installPackage({
spaceId: DEFAULT_SPACE_ID,
@ -365,8 +375,9 @@ describe('install', () => {
.mocked(appContextService.getInternalUserSOClientForSpaceId)
.mockReturnValue(mockedTaggingSo);
jest
.spyOn(obj, 'getInstallationObject')
.mockImplementationOnce(() => Promise.resolve({ attributes: { version: '1.2.0' } } as any));
.mocked(getInstallationObject)
.mockResolvedValueOnce({ attributes: { version: '1.2.0' } } as any);
jest.spyOn(licenseService, 'hasAtLeast').mockReturnValue(true);
await installPackage({
spaceId: 'test',
@ -395,8 +406,8 @@ describe('install', () => {
describe('upload', () => {
it('should send telemetry on update', async () => {
jest
.spyOn(obj, 'getInstallationObject')
.mockImplementationOnce(() => Promise.resolve({ attributes: { version: '1.2.0' } } as any));
.mocked(getInstallationObject)
.mockResolvedValueOnce({ attributes: { version: '1.2.0' } } as any);
jest.spyOn(licenseService, 'hasAtLeast').mockReturnValue(true);
await installPackage({
spaceId: DEFAULT_SPACE_ID,
@ -463,6 +474,97 @@ describe('install', () => {
});
});
});
describe('installAssetsForInputPackagePolicy', () => {
beforeEach(() => {
jest.mocked(optimisticallyAddEsAssetReferences).mockReset();
});
it('should do nothing for non input package', async () => {
const mockedLogger = jest.mocked(appContextService.getLogger());
await installAssetsForInputPackagePolicy({
pkgInfo: {
type: 'integration',
} as any,
soClient: savedObjectsClientMock.create(),
esClient: {} as ElasticsearchClient,
force: false,
logger: mockedLogger,
packagePolicy: {} as any,
});
});
const TEST_PKG_INFO_INPUT = {
type: 'input',
name: 'test',
version: '1.0.0',
policy_templates: [
{
name: 'log',
type: 'log',
},
],
};
it('should throw for input package if package is not installed', async () => {
jest.mocked(dataStreamService).getMatchingDataStreams.mockResolvedValue([]);
jest.mocked(getInstalledPackageWithAssets).mockResolvedValue(undefined);
const mockedLogger = jest.mocked(appContextService.getLogger());
await expect(() =>
installAssetsForInputPackagePolicy({
pkgInfo: TEST_PKG_INFO_INPUT as any,
soClient: savedObjectsClientMock.create(),
esClient: {} as ElasticsearchClient,
force: false,
logger: mockedLogger,
packagePolicy: {
inputs: [{ type: 'log', streams: [{ type: 'log', vars: { dataset: 'test.tata' } }] }],
} as any,
})
).rejects.toThrowError(PackageNotFoundError);
});
it('should install es index patterns for input package if package is installed', async () => {
jest.mocked(dataStreamService).getMatchingDataStreams.mockResolvedValue([]);
jest.mocked(getInstalledPackageWithAssets).mockResolvedValue({
installation: {
name: 'test',
version: '1.0.0',
},
packageInfo: TEST_PKG_INFO_INPUT,
assetsMap: new Map(),
paths: [],
} as any);
const mockedLogger = jest.mocked(appContextService.getLogger());
await installAssetsForInputPackagePolicy({
pkgInfo: TEST_PKG_INFO_INPUT as any,
soClient: savedObjectsClientMock.create(),
esClient: {} as ElasticsearchClient,
force: false,
logger: mockedLogger,
packagePolicy: {
inputs: [
{
name: 'log',
type: 'log',
streams: [{ type: 'log', vars: { 'data_stream.dataset': { value: 'test.tata' } } }],
},
],
} as any,
});
expect(jest.mocked(optimisticallyAddEsAssetReferences)).toBeCalledWith(
expect.anything(),
expect.anything(),
expect.anything(),
{
'test.tata': 'log-test.tata-*',
}
);
});
});
describe('handleInstallPackageFailure', () => {
const mockedLogger = jest.mocked(appContextService.getLogger());
beforeEach(() => {
@ -541,6 +643,7 @@ describe('handleInstallPackageFailure', () => {
},
},
} as any;
jest.mocked(getInstallationObject).mockResolvedValueOnce(installedPkg);
await handleInstallPackageFailure({
savedObjectsClient,
error: new FleetError('test 123'),
@ -585,6 +688,7 @@ describe('handleInstallPackageFailure', () => {
},
},
} as any;
await handleInstallPackageFailure({
savedObjectsClient,
error: new Error('test 123'),

View file

@ -18,7 +18,6 @@ import type {
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common/constants';
import pRetry from 'p-retry';
import { uniqBy } from 'lodash';
import type { LicenseType } from '@kbn/licensing-plugin/server';
import type { PackageDataStreamTypes, PackageInstallContext } from '../../../../common/types';
@ -30,7 +29,6 @@ import type {
ArchivePackage,
BulkInstallPackageInfo,
EpmPackageInstallStatus,
EsAssetReference,
InstallablePackage,
Installation,
InstallResult,
@ -40,12 +38,12 @@ import type {
NewPackagePolicy,
PackageInfo,
PackageVerificationResult,
RegistryDataStream,
} from '../../../types';
import {
AUTO_UPGRADE_POLICIES_PACKAGES,
CUSTOM_INTEGRATION_PACKAGE_SPEC_VERSION,
DATASET_VAR_NAME,
GENERIC_DATASET_NAME,
} from '../../../../common/constants';
import {
FleetError,
@ -69,19 +67,16 @@ import { toAssetReference } from '../kibana/assets/install';
import type { ArchiveAsset } from '../kibana/assets/install';
import type { PackageUpdateEvent } from '../../upgrade_sender';
import { sendTelemetryEvents, UpdateEventType } from '../../upgrade_sender';
import { prepareToInstallPipelines } from '../elasticsearch/ingest_pipeline';
import { prepareToInstallTemplates } from '../elasticsearch/template/install';
import { auditLoggingService } from '../../audit_logging';
import { getFilteredInstallPackages } from '../filtered_packages';
import { formatVerificationResultForSO } from './package_verification';
import { getInstallation, getInstallationObject } from '.';
import { getInstallation, getInstallationObject } from './get';
import { removeInstallation } from './remove';
import { getInstalledPackageWithAssets, getPackageSavedObjects } from './get';
import { _installPackage } from './_install_package';
import { removeOldAssets } from './cleanup';
import { getBundledPackageByPkgKey } from './bundled_packages';
import { withPackageSpan } from './utils';
import { convertStringToTitle, generateDescription } from './custom_integrations/utils';
import { INITIAL_VERSION } from './custom_integrations/constants';
import { createAssets } from './custom_integrations';
@ -89,6 +84,8 @@ import { generateDatastreamEntries } from './custom_integrations/assets/dataset/
import { checkForNamingCollision } from './custom_integrations/validation/check_naming_collision';
import { checkDatasetsNameFormat } from './custom_integrations/validation/check_dataset_name_format';
import { addErrorToLatestFailedAttempts } from './install_errors_helpers';
import { installIndexTemplatesAndPipelines } from './install_index_template_pipeline';
import { optimisticallyAddEsAssetReferences } from './es_assets_reference';
export async function isPackageInstalled(options: {
savedObjectsClient: SavedObjectsClientContract;
@ -379,6 +376,7 @@ async function installPackageFromRegistry({
try {
// get the currently installed package
const installedPkg = await getInstallationObject({ savedObjectsClient, pkgName });
installType = getInstallType({ pkgVersion, installedPkg });
@ -986,7 +984,9 @@ export async function createInstallation(options: {
}) {
const { savedObjectsClient, packageInfo, installSource, verificationResult } = options;
const { name: pkgName, version: pkgVersion } = packageInfo;
const toSaveESIndexPatterns = generateESIndexPatterns(packageInfo.data_streams);
const toSaveESIndexPatterns = generateESIndexPatterns(
getNormalizedDataStreams(packageInfo, GENERIC_DATASET_NAME)
);
// For "stack-aligned" packages, default the `keep_policies_up_to_date` setting to true. For all other
// packages, default it to undefined. Use undefined rather than false to allow us to differentiate
@ -1064,131 +1064,6 @@ export const saveKibanaAssetsRefs = async (
return assetRefs;
};
/**
* Utility function for updating the installed_es field of a package
*/
export const updateEsAssetReferences = async (
savedObjectsClient: SavedObjectsClientContract,
pkgName: string,
currentAssets: EsAssetReference[],
{
assetsToAdd = [],
assetsToRemove = [],
refresh = false,
}: {
assetsToAdd?: EsAssetReference[];
assetsToRemove?: EsAssetReference[];
/**
* Whether or not the update should force a refresh on the SO index.
* Defaults to `false` for faster updates, should only be `wait_for` if the update needs to be queried back from ES
* immediately.
*/
refresh?: 'wait_for' | false;
}
): Promise<EsAssetReference[]> => {
const withAssetsRemoved = currentAssets.filter(({ type, id }) => {
if (
assetsToRemove.some(
({ type: removeType, id: removeId }) => removeType === type && removeId === id
)
) {
return false;
}
return true;
});
const deduplicatedAssets = uniqBy(
[...withAssetsRemoved, ...assetsToAdd],
({ type, id }) => `${type}-${id}`
);
auditLoggingService.writeCustomSoAuditLog({
action: 'update',
id: pkgName,
savedObjectType: PACKAGES_SAVED_OBJECT_TYPE,
});
const {
attributes: { installed_es: updatedAssets },
} =
// Because Kibana assets are installed in parallel with ES assets with refresh: false, we almost always run into an
// issue that causes a conflict error due to this issue: https://github.com/elastic/kibana/issues/126240. This is safe
// to retry constantly until it succeeds to optimize this critical user journey path as much as possible.
await pRetry(
() =>
savedObjectsClient.update<Installation>(
PACKAGES_SAVED_OBJECT_TYPE,
pkgName,
{
installed_es: deduplicatedAssets,
},
{
refresh,
}
),
// Use a lower number of retries for ES assets since they're installed in serial and can only conflict with
// the single Kibana update call.
{ retries: 5 }
);
return updatedAssets ?? [];
};
/**
* Utility function for adding assets the installed_es field of a package
* uses optimistic concurrency control to prevent missed updates
*/
export const optimisticallyAddEsAssetReferences = async (
savedObjectsClient: SavedObjectsClientContract,
pkgName: string,
assetsToAdd: EsAssetReference[]
): Promise<EsAssetReference[]> => {
const addEsAssets = async () => {
// TODO: Should this be replaced by a `get()` call from epm/get.ts?
const so = await savedObjectsClient.get<Installation>(PACKAGES_SAVED_OBJECT_TYPE, pkgName);
auditLoggingService.writeCustomSoAuditLog({
action: 'get',
id: pkgName,
savedObjectType: PACKAGES_SAVED_OBJECT_TYPE,
});
const installedEs = so.attributes.installed_es ?? [];
const deduplicatedAssets = uniqBy(
[...installedEs, ...assetsToAdd],
({ type, id }) => `${type}-${id}`
);
auditLoggingService.writeCustomSoAuditLog({
action: 'update',
id: pkgName,
savedObjectType: PACKAGES_SAVED_OBJECT_TYPE,
});
const {
attributes: { installed_es: updatedAssets },
} = await savedObjectsClient.update<Installation>(
PACKAGES_SAVED_OBJECT_TYPE,
pkgName,
{
installed_es: deduplicatedAssets,
},
{
version: so.version,
}
);
return updatedAssets ?? [];
};
const onlyRetryConflictErrors = (err: Error) => {
if (!SavedObjectsErrorHelpers.isConflictError(err)) {
throw err;
}
};
return pRetry(addEsAssets, { retries: 10, onFailedAttempt: onlyRetryConflictErrors });
};
export async function ensurePackagesCompletedInstall(
savedObjectsClient: SavedObjectsClientContract,
esClient: ElasticsearchClient
@ -1223,92 +1098,6 @@ export async function ensurePackagesCompletedInstall(
return installingPackages;
}
export async function installIndexTemplatesAndPipelines({
installedPkg,
packageInstallContext,
esReferences,
savedObjectsClient,
esClient,
logger,
onlyForDataStreams,
}: {
installedPkg?: Installation;
packageInstallContext: PackageInstallContext;
esReferences: EsAssetReference[];
savedObjectsClient: SavedObjectsClientContract;
esClient: ElasticsearchClient;
logger: Logger;
onlyForDataStreams?: RegistryDataStream[];
}) {
/**
* In order to install assets in parallel, we need to split the preparation step from the installation step. This
* allows us to know which asset references are going to be installed so that we can save them on the packages
* SO before installation begins. In the case of a failure during installing any individual asset, we'll have the
* references necessary to remove any assets in that were successfully installed during the rollback phase.
*
* This split of prepare/install could be extended to all asset types. Besides performance, it also allows us to
* more easily write unit tests against the asset generation code without needing to mock ES responses.
*/
const experimentalDataStreamFeatures = installedPkg?.experimental_data_stream_features ?? [];
const preparedIngestPipelines = prepareToInstallPipelines(
packageInstallContext,
onlyForDataStreams
);
const preparedIndexTemplates = prepareToInstallTemplates(
packageInstallContext,
esReferences,
experimentalDataStreamFeatures,
onlyForDataStreams
);
// Update the references for the templates and ingest pipelines together. Need to be done together to avoid race
// conditions on updating the installed_es field at the same time
// These must be saved before we actually attempt to install the templates or pipelines so that we know what to
// cleanup in the case that a single asset fails to install.
let newEsReferences: EsAssetReference[] = [];
if (onlyForDataStreams) {
// if onlyForDataStreams is present that means we are in create package policy flow
// not install flow, meaning we do not have a lock on the installation SO
// so we need to use optimistic concurrency control
newEsReferences = await optimisticallyAddEsAssetReferences(
savedObjectsClient,
packageInstallContext.packageInfo.name,
[...preparedIngestPipelines.assetsToAdd, ...preparedIndexTemplates.assetsToAdd]
);
} else {
newEsReferences = await updateEsAssetReferences(
savedObjectsClient,
packageInstallContext.packageInfo.name,
esReferences,
{
assetsToRemove: preparedIndexTemplates.assetsToRemove,
assetsToAdd: [
...preparedIngestPipelines.assetsToAdd,
...preparedIndexTemplates.assetsToAdd,
],
}
);
}
// Install index templates and ingest pipelines in parallel since they typically take the longest
const [installedTemplates] = await Promise.all([
withPackageSpan('Install index templates', () =>
preparedIndexTemplates.install(esClient, logger)
),
// installs versionized pipelines without removing currently installed ones
withPackageSpan('Install ingest pipelines', () =>
preparedIngestPipelines.install(esClient, logger)
),
]);
return {
esReferences: newEsReferences,
installedTemplates,
};
}
export async function installAssetsForInputPackagePolicy(opts: {
pkgInfo: PackageInfo;
logger: Logger;
@ -1347,7 +1136,7 @@ export async function installAssetsForInputPackagePolicy(opts: {
);
} else {
logger.info(
`Data stream ${dataStream.name} already exists, skipping index template creation for ${packagePolicy.id}`
`Data stream for dataset ${datasetName} already exists, skipping index template creation for ${packagePolicy.id}`
);
return;
}
@ -1414,6 +1203,13 @@ export async function installAssetsForInputPackagePolicy(opts: {
logger,
onlyForDataStreams: [dataStream],
});
// Upate ES index patterns
await optimisticallyAddEsAssetReferences(
soClient,
installedPkgWithAssets.installation.name,
[],
generateESIndexPatterns([dataStream])
);
}
interface NoPkgArgs {

View file

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient, SavedObjectsClientContract, Logger } from '@kbn/core/server';
import type { PackageInstallContext } from '../../../../common/types';
import type { EsAssetReference, Installation, RegistryDataStream } from '../../../types';
import { prepareToInstallPipelines } from '../elasticsearch/ingest_pipeline';
import { prepareToInstallTemplates } from '../elasticsearch/template/install';
import { withPackageSpan } from './utils';
import { optimisticallyAddEsAssetReferences, updateEsAssetReferences } from './es_assets_reference';
export async function installIndexTemplatesAndPipelines({
installedPkg,
packageInstallContext,
esReferences,
savedObjectsClient,
esClient,
logger,
onlyForDataStreams,
}: {
installedPkg?: Installation;
packageInstallContext: PackageInstallContext;
esReferences: EsAssetReference[];
savedObjectsClient: SavedObjectsClientContract;
esClient: ElasticsearchClient;
logger: Logger;
onlyForDataStreams?: RegistryDataStream[];
}) {
/**
* In order to install assets in parallel, we need to split the preparation step from the installation step. This
* allows us to know which asset references are going to be installed so that we can save them on the packages
* SO before installation begins. In the case of a failure during installing any individual asset, we'll have the
* references necessary to remove any assets in that were successfully installed during the rollback phase.
*
* This split of prepare/install could be extended to all asset types. Besides performance, it also allows us to
* more easily write unit tests against the asset generation code without needing to mock ES responses.
*/
const experimentalDataStreamFeatures = installedPkg?.experimental_data_stream_features ?? [];
const preparedIngestPipelines = prepareToInstallPipelines(
packageInstallContext,
onlyForDataStreams
);
const preparedIndexTemplates = prepareToInstallTemplates(
packageInstallContext,
esReferences,
experimentalDataStreamFeatures,
onlyForDataStreams
);
// Update the references for the templates and ingest pipelines together. Need to be done together to avoid race
// conditions on updating the installed_es field at the same time
// These must be saved before we actually attempt to install the templates or pipelines so that we know what to
// cleanup in the case that a single asset fails to install.
let newEsReferences: EsAssetReference[] = [];
if (onlyForDataStreams) {
// if onlyForDataStreams is present that means we are in create package policy flow
// not install flow, meaning we do not have a lock on the installation SO
// so we need to use optimistic concurrency control
newEsReferences = await optimisticallyAddEsAssetReferences(
savedObjectsClient,
packageInstallContext.packageInfo.name,
[...preparedIngestPipelines.assetsToAdd, ...preparedIndexTemplates.assetsToAdd]
);
} else {
newEsReferences = await updateEsAssetReferences(
savedObjectsClient,
packageInstallContext.packageInfo.name,
esReferences,
{
assetsToRemove: preparedIndexTemplates.assetsToRemove,
assetsToAdd: [
...preparedIngestPipelines.assetsToAdd,
...preparedIndexTemplates.assetsToAdd,
],
}
);
}
// Install index templates and ingest pipelines in parallel since they typically take the longest
const [installedTemplates] = await Promise.all([
withPackageSpan('Install index templates', () =>
preparedIndexTemplates.install(esClient, logger)
),
// installs versionized pipelines without removing currently installed ones
withPackageSpan('Install ingest pipelines', () =>
preparedIngestPipelines.install(esClient, logger)
),
]);
return {
esReferences: newEsReferences,
installedTemplates,
};
}

View file

@ -1,23 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { SavedObjectsClientContract } from '@kbn/core/server';
import type { ESIndexPatternService } from '..';
import { getInstallation } from './epm/packages';
export class ESIndexPatternSavedObjectService implements ESIndexPatternService {
public async getESIndexPattern(
savedObjectsClient: SavedObjectsClientContract,
pkgName: string,
datasetPath: string
): Promise<string | undefined> {
const installation = await getInstallation({ savedObjectsClient, pkgName });
return installation?.es_index_patterns[datasetPath];
}
}

View file

@ -5,25 +5,11 @@
* 2.0.
*/
import type { SavedObjectsClientContract } from '@kbn/core/server';
import type { agentPolicyService } from './agent_policy';
import * as settingsService from './settings';
export { ESIndexPatternSavedObjectService } from './es_index_pattern';
export { getRegistryUrl } from './epm/registry/registry_url';
/**
* Service to return the index pattern of EPM packages
*/
export interface ESIndexPatternService {
getESIndexPattern(
savedObjectsClient: SavedObjectsClientContract,
pkgName: string,
datasetPath: string
): Promise<string | undefined>;
}
/**
* Service that provides exported function that return information about EPM packages
*/

View file

@ -65,6 +65,7 @@ describe('setupFleet', () => {
(upgradeManagedPackagePolicies as jest.Mock).mockResolvedValue([]);
soClient.get.mockResolvedValue({ attributes: {} } as any);
soClient.find.mockResolvedValue({ saved_objects: [] } as any);
soClient.bulkGet.mockResolvedValue({ saved_objects: [] } as any);
});