[Fleet] Chunk asset installation during package install (#189045)

**Resolves: https://github.com/elastic/kibana/pull/189043**

## Summary

This PR limits the number of saved objects installed in a single
request. During saved object installation a lot of auxiliary objects are
created in memory. Chunking allows for the garbage collection of memory
objects that are not needed for response.

**Memory consumption before**

![Screenshot 2024-07-12 at 12 44
58](https://github.com/user-attachments/assets/2c7bb609-f107-46bd-bd98-43a0c58107e8)


**After**
![Screenshot 2024-07-18 at 11 53
30](https://github.com/user-attachments/assets/ff6529dd-033e-4c2b-9630-0a2ea3927c21)
This commit is contained in:
Dmitrii Shevchenko 2024-07-25 09:09:28 +02:00 committed by GitHub
parent 425d6b10a7
commit b7e66ebf8a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -17,7 +17,7 @@ import type {
Logger,
} from '@kbn/core/server';
import { createListStream } from '@kbn/utils';
import { partition } from 'lodash';
import { partition, chunk } from 'lodash';
import { getAssetFromAssetsMap, getPathParts } from '../../archive';
import { KibanaAssetType, KibanaSavedObjectType } from '../../../../types';
@ -36,6 +36,8 @@ import { withPackageSpan } from '../../packages/utils';
import { tagKibanaAssets } from './tag_assets';
import { getSpaceAwareSaveobjectsClients } from './saved_objects';
const MAX_ASSETS_TO_INSTALL_IN_PARALLEL = 1000;
type SavedObjectsImporterContract = Pick<ISavedObjectsImporter, 'import' | 'resolveImportErrors'>;
const formatImportErrorsForLog = (errors: SavedObjectsImportFailure[]) =>
JSON.stringify(
@ -144,11 +146,33 @@ export async function installKibanaAssets(options: {
await makeManagedIndexPatternsGlobal(savedObjectsClient);
const installedAssets = await installKibanaSavedObjects({
logger,
savedObjectsImporter,
kibanaAssets: assetsToInstall,
});
let installedAssets: SavedObjectsImportSuccess[] = [];
if (
assetsToInstall.length > MAX_ASSETS_TO_INSTALL_IN_PARALLEL &&
!hasReferences(assetsToInstall)
) {
// If the package size is too large, we need to install in chunks to avoid
// memory issues as the SO import creates a lot of objects in memory
// NOTE: if there are references, we can't chunk the install because
// referenced objects might end up in different chunks leading to import
// errors.
for (const assetChunk of chunk(assetsToInstall, MAX_ASSETS_TO_INSTALL_IN_PARALLEL)) {
const result = await installKibanaSavedObjects({
logger,
savedObjectsImporter,
kibanaAssets: assetChunk,
});
installedAssets = installedAssets.concat(result);
}
} else {
installedAssets = await installKibanaSavedObjects({
logger,
savedObjectsImporter,
kibanaAssets: assetsToInstall,
});
}
return installedAssets;
}
@ -395,95 +419,94 @@ export async function installKibanaSavedObjects({
savedObjectsImporter: SavedObjectsImporterContract;
logger: Logger;
}) {
const toBeSavedObjects = await Promise.all(
kibanaAssets.map((asset) => createSavedObjectKibanaAsset(asset))
);
if (!kibanaAssets.length) {
return [];
}
const toBeSavedObjects = kibanaAssets.map((asset) => createSavedObjectKibanaAsset(asset));
let allSuccessResults: SavedObjectsImportSuccess[] = [];
if (toBeSavedObjects.length === 0) {
return [];
} else {
const {
successResults: importSuccessResults = [],
errors: importErrors = [],
success,
} = await retryImportOnConflictError(() =>
savedObjectsImporter.import({
overwrite: true,
readStream: createListStream(toBeSavedObjects),
createNewCopies: false,
refresh: false,
managed: true,
})
const {
successResults: importSuccessResults = [],
errors: importErrors = [],
success,
} = await retryImportOnConflictError(() => {
const readStream = createListStream(toBeSavedObjects);
return savedObjectsImporter.import({
overwrite: true,
readStream,
createNewCopies: false,
refresh: false,
managed: true,
});
});
if (success) {
allSuccessResults = importSuccessResults;
}
const [referenceErrors, otherErrors] = partition(
importErrors,
(e) => e?.error?.type === 'missing_references'
);
if (otherErrors?.length) {
throw new KibanaSOReferenceError(
`Encountered ${otherErrors.length} errors creating saved objects: ${formatImportErrorsForLog(
otherErrors
)}`
);
}
if (success) {
allSuccessResults = importSuccessResults;
}
const [referenceErrors, otherErrors] = partition(
importErrors,
(e) => e?.error?.type === 'missing_references'
);
if (otherErrors?.length) {
throw new KibanaSOReferenceError(
`Encountered ${
otherErrors.length
} errors creating saved objects: ${formatImportErrorsForLog(otherErrors)}`
);
}
/*
/*
A reference error here means that a saved object reference in the references
array cannot be found. This is an error in the package its-self but not a fatal
one. For example a dashboard may still refer to the legacy `metricbeat-*` index
pattern. We ignore reference errors here so that legacy version of a package
can still be installed, but if a warning is logged it should be reported to
the integrations team. */
if (referenceErrors.length) {
logger.debug(
() =>
`Resolving ${
referenceErrors.length
} reference errors creating saved objects: ${formatImportErrorsForLog(referenceErrors)}`
);
if (referenceErrors.length) {
logger.debug(
() =>
`Resolving ${
referenceErrors.length
} reference errors creating saved objects: ${formatImportErrorsForLog(referenceErrors)}`
);
const retries = toBeSavedObjects.map(({ id, type }) => {
if (referenceErrors.find(({ id: idToSearch }) => idToSearch === id)) {
return {
id,
type,
ignoreMissingReferences: true,
replaceReferences: [],
overwrite: true,
};
}
return { id, type, overwrite: true, replaceReferences: [] };
const retries = toBeSavedObjects.map(({ id, type }) => {
if (referenceErrors.find(({ id: idToSearch }) => idToSearch === id)) {
return {
id,
type,
ignoreMissingReferences: true,
replaceReferences: [],
overwrite: true,
};
}
return { id, type, overwrite: true, replaceReferences: [] };
});
const { successResults: resolveSuccessResults = [], errors: resolveErrors = [] } =
await savedObjectsImporter.resolveImportErrors({
readStream: createListStream(toBeSavedObjects),
createNewCopies: false,
managed: true,
retries,
});
const { successResults: resolveSuccessResults = [], errors: resolveErrors = [] } =
await savedObjectsImporter.resolveImportErrors({
readStream: createListStream(toBeSavedObjects),
createNewCopies: false,
managed: true,
retries,
});
if (resolveErrors?.length) {
throw new KibanaSOReferenceError(
`Encountered ${
resolveErrors.length
} errors resolving reference errors: ${formatImportErrorsForLog(resolveErrors)}`
);
}
allSuccessResults = allSuccessResults.concat(resolveSuccessResults);
if (resolveErrors?.length) {
throw new KibanaSOReferenceError(
`Encountered ${
resolveErrors.length
} errors resolving reference errors: ${formatImportErrorsForLog(resolveErrors)}`
);
}
return allSuccessResults;
allSuccessResults = allSuccessResults.concat(resolveSuccessResults);
}
return allSuccessResults;
}
// Filter out any reserved index patterns
@ -498,3 +521,7 @@ export function toAssetReference({ id, type }: SavedObject) {
return reference;
}
function hasReferences(assetsToInstall: ArchiveAsset[]) {
return assetsToInstall.some((asset) => asset.references?.length);
}