[Fleet][Spacetime] Replace promise.all with pMap across Fleet (#201628)

Part of https://github.com/elastic/ingest-dev/issues/4517
Closes https://github.com/elastic/kibana/issues/167160


## Summary

Replace `promise.all` with `pMap` across Fleet. The reason for this
refactor is introducing concurrency control in those cases when we are
running multiple requests towards ESclient.
Most of the refactor in this PR occurs around the packages install area,
as packages can have high numbers of assets, data streams or backing
indices so I focused on addressing those.


### Checklist

Check the PR satisfies following conditions. 

- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] The PR description includes the appropriate Release Notes section,
and the correct `release_note:*` label is applied per the
[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

### Identify risks
As this PR can have impacts in key areas of Fleet (package installs,
policies etc), I'm not going to merge right away but I'll take some time
to test properly and I'll merge next week.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Cristina Amico 2024-12-02 12:45:50 +01:00 committed by GitHub
parent 742854f8bc
commit 76c2f31a3a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 479 additions and 282 deletions

View file

@ -124,3 +124,4 @@ export {
export { FILE_STORAGE_DATA_AGENT_INDEX } from './fleet_es_assets';
export { FILE_STORAGE_METADATA_AGENT_INDEX } from './fleet_es_assets';
export * from '../../common/constants/mappings';
export * from './max_concurrency_constants';

View file

@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
// Constants used across the code to limit concurrency of pMap operations
export const MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS = 50;
export const MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20 = 20;
export const MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_10 = 10;
export const MAX_CONCURRENT_DATASTREAM_OPERATIONS = 50;
export const MAX_CONCURRENT_FLEET_PROXIES_OPERATIONS = 20;
export const MAX_CONCURRENT_EPM_PACKAGES_INSTALLATIONS = 10;
export const MAX_CONCURRENT_PACKAGE_ASSETS = 5;
export const MAX_CONCURRENT_CREATE_ACTIONS = 50;
export const MAX_CONCURRENT_DATASTREAMS_ILM_OPERATIONS = 50;
export const MAX_CONCURRENT_ILM_POLICIES_OPERATIONS = 50;
export const MAX_CONCURRENT_PIPELINES_DELETIONS = 50;
export const MAX_CONCURRENT_ML_MODELS_OPERATIONS = 50;
export const MAX_CONCURRENT_COMPONENT_TEMPLATES = 50;
export const MAX_CONCURRENT_TRANSFORMS_OPERATIONS = 20;
export const MAX_CONCURRENT_INDEX_PATTERN_OPERATIONS = 50;
export const MAX_CONCURRENT_ES_ASSETS_OPERATIONS = 50;
export const MAX_CONCURRENT_AGENT_FILES_UPLOADS = 50;
export const MAX_CONCURRENT_BACKFILL_OUTPUTS_PRESETS = 20;
export const MAX_CONCURRENT_CLEAN_OLD_FILE_INDICES = 2;

View file

@ -19,7 +19,11 @@ import { HTTPAuthorizationHeader } from '../../../common/http_authorization_head
import { fullAgentPolicyToYaml } from '../../../common/services';
import { appContextService, agentPolicyService } from '../../services';
import { type AgentClient, getLatestAvailableAgentVersion } from '../../services/agents';
import { AGENTS_PREFIX, UNPRIVILEGED_AGENT_KUERY } from '../../constants';
import {
AGENTS_PREFIX,
MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_10,
UNPRIVILEGED_AGENT_KUERY,
} from '../../constants';
import type {
GetAgentPoliciesRequestSchema,
GetOneAgentPolicyRequestSchema,
@ -82,7 +86,7 @@ export async function populateAssignedAgentsCount(
.then(({ total }) => (agentPolicy.unprivileged_agents = total));
return Promise.all([totalAgents, unprivilegedAgents]);
},
{ concurrency: 10 }
{ concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_10 }
);
}

View file

@ -17,6 +17,7 @@ import type { GetDataStreamsResponse } from '../../../common/types';
import { getPackageSavedObjects } from '../../services/epm/packages/get';
import type { MeteringStats } from '../../services/data_streams';
import { dataStreamService } from '../../services/data_streams';
import { MAX_CONCURRENT_DATASTREAM_OPERATIONS } from '../../constants';
import { appContextService } from '../../services';
import { getDataStreamsQueryMetadata } from './get_data_streams_query_metadata';
@ -233,7 +234,7 @@ export const getListHandler: RequestHandler = async (context, request, response)
// After filtering out data streams that are missing dataset/namespace/type/package fields
body.data_streams = (
await pMap(dataStreamNames, (dataStreamName) => queryDataStreamInfo(dataStreamName), {
concurrency: 50,
concurrency: MAX_CONCURRENT_DATASTREAM_OPERATIONS,
})
)
.filter(({ dataset, namespace, type }) => dataset && namespace && type)

View file

@ -31,6 +31,7 @@ import type {
DownloadSource,
} from '../../types';
import { agentPolicyService } from '../../services';
import { MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20 } from '../../constants';
async function bumpRelatedPolicies(
soClient: SavedObjectsClientContract,
@ -49,7 +50,7 @@ async function bumpRelatedPolicies(
outputs,
(output) => agentPolicyService.bumpAllAgentPoliciesForOutput(esClient, output.id),
{
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
}
);
await pMap(
@ -57,7 +58,7 @@ async function bumpRelatedPolicies(
(fleetServerHost) =>
agentPolicyService.bumpAllAgentPoliciesForFleetServerHosts(esClient, fleetServerHost.id),
{
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
}
);
@ -66,7 +67,7 @@ async function bumpRelatedPolicies(
(downloadSource) =>
agentPolicyService.bumpAllAgentPoliciesForDownloadSource(esClient, downloadSource.id),
{
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
}
);
}

View file

@ -97,6 +97,11 @@ import type { FullAgentConfigMap } from '../../common/types/models/agent_cm';
import { fullAgentConfigMapToYaml } from '../../common/services/agent_cm_to_yaml';
import {
MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS,
MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
} from '../constants';
import { appContextService } from '.';
import { mapAgentPolicySavedObjectToAgentPolicy } from './agent_policies/utils';
@ -550,7 +555,7 @@ class AgentPolicyService {
}
return agentPolicy;
},
{ concurrency: 50 }
{ concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS }
);
const result = agentPolicies.filter(
@ -657,7 +662,7 @@ class AgentPolicyService {
return agentPolicy;
},
{ concurrency: 50 }
{ concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS }
);
}
@ -955,7 +960,7 @@ class AgentPolicyService {
);
},
{
concurrency: 50,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS,
}
);
await pMap(
@ -970,7 +975,7 @@ class AgentPolicyService {
});
},
{
concurrency: 50,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS,
}
);
}
@ -1014,7 +1019,7 @@ class AgentPolicyService {
}
),
{
concurrency: 50,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS,
}
);
}
@ -1083,7 +1088,7 @@ class AgentPolicyService {
this.triggerAgentPolicyUpdatedEvent(esClient, 'updated', policy.id, {
spaceId: policy.namespaces?.[0],
}),
{ concurrency: 50 }
{ concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS }
);
}
@ -1352,7 +1357,7 @@ class AgentPolicyService {
agentPolicy: agentPolicies?.find((policy) => policy.id === agentPolicyId),
}),
{
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
}
);
@ -1423,27 +1428,29 @@ class AgentPolicyService {
);
}
await Promise.all(
fleetServerPolicies
.filter((fleetServerPolicy) => {
const policy = policiesMap[fleetServerPolicy.policy_id];
return (
!policy.schema_version || lt(policy.schema_version, FLEET_AGENT_POLICIES_SCHEMA_VERSION)
);
})
.map((fleetServerPolicy) =>
// There are some potential performance concerns around using `agentPolicyService.update` in this context.
// This could potentially be a bottleneck in environments with several thousand agent policies being deployed here.
agentPolicyService.update(
soClient,
esClient,
fleetServerPolicy.policy_id,
{
schema_version: FLEET_AGENT_POLICIES_SCHEMA_VERSION,
},
{ force: true }
)
)
const filteredFleetServerPolicies = fleetServerPolicies.filter((fleetServerPolicy) => {
const policy = policiesMap[fleetServerPolicy.policy_id];
return (
!policy.schema_version || lt(policy.schema_version, FLEET_AGENT_POLICIES_SCHEMA_VERSION)
);
});
await pMap(
filteredFleetServerPolicies,
(fleetServerPolicy) =>
// There are some potential performance concerns around using `agentPolicyService.update` in this context.
// This could potentially be a bottleneck in environments with several thousand agent policies being deployed here.
agentPolicyService.update(
soClient,
esClient,
fleetServerPolicy.policy_id,
{
schema_version: FLEET_AGENT_POLICIES_SCHEMA_VERSION,
},
{ force: true }
),
{
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS,
}
);
}
@ -1589,7 +1596,7 @@ class AgentPolicyService {
}
),
{
concurrency: 50,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS,
}
);
}
@ -1890,7 +1897,7 @@ class AgentPolicyService {
return { integrationPolicyName: pkgPolicy?.name, id: pkgPolicy?.output_id ?? '' };
},
{
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
}
);
}
@ -1923,7 +1930,7 @@ class AgentPolicyService {
return { agentPolicyId: agentPolicy.id, ...output };
},
{
concurrency: 50,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS,
}
);
return allOutputs;

View file

@ -8,6 +8,7 @@
import { v4 as uuidv4 } from 'uuid';
import type { ElasticsearchClient, SavedObjectsClientContract } from '@kbn/core/server';
import apm from 'elastic-apm-node';
import pMap from 'p-map';
import { partition } from 'lodash';
@ -33,6 +34,8 @@ import { getAgentIdsForAgentPolicies } from '../agent_policies/agent_policies_to
import { getCurrentNamespace } from '../spaces/get_current_namespace';
import { addNamespaceFilteringToQuery } from '../spaces/query_namespaces_filtering';
import { MAX_CONCURRENT_CREATE_ACTIONS } from '../../constants';
import { bulkUpdateAgents } from './crud';
const ONE_MONTH_IN_MS = 2592000000;
@ -110,40 +113,47 @@ export async function bulkCreateAgentActions(
}
const messageSigningService = appContextService.getMessageSigningService();
const fleetServerAgentActions = await pMap(
actions,
async (action) => {
const body: FleetServerAgentAction = {
'@timestamp': new Date().toISOString(),
expiration: action.expiration ?? new Date(Date.now() + ONE_MONTH_IN_MS).toISOString(),
start_time: action.start_time,
rollout_duration_seconds: action.rollout_duration_seconds,
agents: action.agents,
action_id: action.id,
data: action.data,
type: action.type,
traceparent: apm.currentTraceparent,
};
if (SIGNED_ACTIONS.has(action.type) && messageSigningService) {
const signedBody = await messageSigningService.sign(body);
body.signed = {
data: signedBody.data.toString('base64'),
signature: signedBody.signature,
};
}
return [
{
create: {
_id: action.id,
},
},
body,
].flat();
},
{
concurrency: MAX_CONCURRENT_CREATE_ACTIONS,
}
);
await esClient.bulk({
index: AGENT_ACTIONS_INDEX,
body: await Promise.all(
actions.flatMap(async (action) => {
const body: FleetServerAgentAction = {
'@timestamp': new Date().toISOString(),
expiration: action.expiration ?? new Date(Date.now() + ONE_MONTH_IN_MS).toISOString(),
start_time: action.start_time,
rollout_duration_seconds: action.rollout_duration_seconds,
agents: action.agents,
action_id: action.id,
data: action.data,
type: action.type,
traceparent: apm.currentTraceparent,
};
if (SIGNED_ACTIONS.has(action.type) && messageSigningService) {
const signedBody = await messageSigningService.sign(body);
body.signed = {
data: signedBody.data.toString('base64'),
signature: signedBody.signature,
};
}
return [
{
create: {
_id: action.id,
},
},
body,
];
})
),
body: fleetServerAgentActions,
});
for (const action of actions) {

View file

@ -6,6 +6,7 @@
*/
import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from '@kbn/core/server';
import pMap from 'p-map';
import {
ElasticsearchAssetType,
@ -18,6 +19,8 @@ import { getAssetFromAssetsMap } from '../../archive';
import { getESAssetMetadata } from '../meta';
import { retryTransientEsErrors } from '../retry';
import { MAX_CONCURRENT_DATASTREAMS_ILM_OPERATIONS } from '../../../../constants';
import { deleteIlms } from './remove';
interface IlmInstallation {
@ -111,11 +114,15 @@ export const installIlmForDataStream = async (
}
);
const installationPromises = ilmInstallations.map(async (ilmInstallation) => {
return handleIlmInstall({ esClient, ilmInstallation, logger });
});
installedIlms = await Promise.all(installationPromises).then((results) => results.flat());
installedIlms = await pMap(
ilmInstallations,
async (ilmInstallation) => {
return handleIlmInstall({ esClient, ilmInstallation, logger });
},
{
concurrency: MAX_CONCURRENT_DATASTREAMS_ILM_OPERATIONS,
}
).then((results) => results.flat());
}
return { installedIlms, esReferences };

View file

@ -7,9 +7,17 @@
import type { ElasticsearchClient } from '@kbn/core/server';
import pMap from 'p-map';
import { appContextService } from '../../../app_context';
import { MAX_CONCURRENT_ILM_POLICIES_OPERATIONS } from '../../../../constants';
export const deleteIlms = async (esClient: ElasticsearchClient, ilmPolicyIds: string[]) => {
await Promise.all(
ilmPolicyIds.map(async (ilmPolicyId) => {
const logger = appContextService.getLogger();
await pMap(
ilmPolicyIds,
async (ilmPolicyId) => {
await esClient.transport.request(
{
method: 'DELETE',
@ -19,6 +27,10 @@ export const deleteIlms = async (esClient: ElasticsearchClient, ilmPolicyIds: st
ignore: [404, 400],
}
);
})
logger.debug(`Deleted ilm policy with id: ${ilmPolicyId}`);
},
{
concurrency: MAX_CONCURRENT_ILM_POLICIES_OPERATIONS,
}
);
};

View file

@ -6,6 +6,7 @@
*/
import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from '@kbn/core/server';
import pMap from 'p-map';
import type { EsAssetReference } from '../../../../types';
@ -16,6 +17,7 @@ import { getESAssetMetadata } from '../meta';
import { retryTransientEsErrors } from '../retry';
import { PackageInvalidArchiveError } from '../../../../errors';
import type { PackageInstallContext } from '../../../../../common/types';
import { MAX_CONCURRENT_ILM_POLICIES_OPERATIONS } from '../../../../constants';
export async function installILMPolicy(
packageInstallContext: PackageInstallContext,
@ -48,8 +50,9 @@ export async function installILMPolicy(
})),
});
await Promise.all(
ilmPolicies.map(async (policy) => {
await pMap(
ilmPolicies,
async (policy) => {
try {
await retryTransientEsErrors(
() =>
@ -63,7 +66,10 @@ export async function installILMPolicy(
} catch (err) {
throw new PackageInvalidArchiveError(`Couldn't install ilm policies: ${err.message}`);
}
})
},
{
concurrency: MAX_CONCURRENT_ILM_POLICIES_OPERATIONS,
}
);
return esReferences;

View file

@ -7,11 +7,14 @@
import type { ElasticsearchClient, SavedObjectsClientContract } from '@kbn/core/server';
import pMap from 'p-map';
import { appContextService } from '../../..';
import { ElasticsearchAssetType } from '../../../../types';
import { FleetError } from '../../../../errors';
import type { EsAssetReference } from '../../../../../common/types';
import { updateEsAssetReferences } from '../../packages/es_assets_reference';
import { MAX_CONCURRENT_PIPELINES_DELETIONS } from '../../../../constants';
export const deletePreviousPipelines = async (
esClient: ElasticsearchClient,
@ -26,10 +29,15 @@ export const deletePreviousPipelines = async (
type === ElasticsearchAssetType.ingestPipeline && id.includes(previousPkgVersion)
);
try {
await Promise.all(
installedPipelines.map(({ type, id }) => {
await pMap(
installedPipelines,
({ type, id }) => {
logger.debug(`Deleting pipeline with id: ${id}`);
return deletePipeline(esClient, id);
})
},
{
concurrency: MAX_CONCURRENT_PIPELINES_DELETIONS,
}
);
} catch (e) {
logger.error(e);

View file

@ -7,17 +7,24 @@
import type { ElasticsearchClient } from '@kbn/core/server';
import pMap from 'p-map';
import { appContextService } from '../../../app_context';
import { MAX_CONCURRENT_ML_MODELS_OPERATIONS } from '../../../../constants';
export const deleteMlModel = async (esClient: ElasticsearchClient, mlModelIds: string[]) => {
const logger = appContextService.getLogger();
if (mlModelIds.length) {
logger.info(`Deleting currently installed ml model ids ${mlModelIds}`);
}
await Promise.all(
mlModelIds.map(async (modelId) => {
await pMap(
mlModelIds,
async (modelId) => {
await esClient.ml.deleteTrainedModel({ model_id: modelId }, { ignore: [404] });
logger.info(`Deleted: ${modelId}`);
})
},
{
concurrency: MAX_CONCURRENT_ML_MODELS_OPERATIONS,
}
);
};

View file

@ -8,6 +8,7 @@
import { merge, concat, uniqBy, omit } from 'lodash';
import Boom from '@hapi/boom';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import pMap from 'p-map';
import type {
IndicesCreateRequest,
@ -38,6 +39,7 @@ import {
PACKAGE_TEMPLATE_SUFFIX,
USER_SETTINGS_TEMPLATE_SUFFIX,
STACK_COMPONENT_TEMPLATES,
MAX_CONCURRENT_COMPONENT_TEMPLATES,
} from '../../../../constants';
import { getESAssetMetadata } from '../meta';
import { retryTransientEsErrors } from '../retry';
@ -103,15 +105,18 @@ export const prepareToInstallTemplates = (
await installPreBuiltComponentTemplates(packageInstallContext, esClient, logger);
await installPreBuiltTemplates(packageInstallContext, esClient, logger);
await Promise.all(
templates.map((template) =>
await pMap(
templates,
(template) =>
installComponentAndIndexTemplateForDataStream({
esClient,
logger,
componentTemplates: template.componentTemplates,
indexTemplate: template.indexTemplate,
})
)
}),
{
concurrency: MAX_CONCURRENT_COMPONENT_TEMPLATES,
}
);
return templates.map((template) => template.indexTemplate);
@ -125,32 +130,37 @@ const installPreBuiltTemplates = async (
logger: Logger
) => {
const templatePaths = packageInstallContext.paths.filter((path) => isTemplate(path));
const templateInstallPromises = templatePaths.map(async (path) => {
const { file } = getPathParts(path);
const templateName = file.substr(0, file.lastIndexOf('.'));
const content = JSON.parse(
getAssetFromAssetsMap(packageInstallContext.assetsMap, path).toString('utf8')
);
const esClientParams = { name: templateName, body: content };
const esClientRequestOptions = { ignore: [404] };
if (Object.hasOwn(content, 'template') || Object.hasOwn(content, 'composed_of')) {
// Template is v2
return retryTransientEsErrors(
() => esClient.indices.putIndexTemplate(esClientParams, esClientRequestOptions),
{ logger }
);
} else {
// template is V1
return retryTransientEsErrors(
() => esClient.indices.putTemplate(esClientParams, esClientRequestOptions),
{ logger }
);
}
});
try {
return await Promise.all(templateInstallPromises);
await pMap(
templatePaths,
async (path) => {
const { file } = getPathParts(path);
const templateName = file.substr(0, file.lastIndexOf('.'));
const content = JSON.parse(
getAssetFromAssetsMap(packageInstallContext.assetsMap, path).toString('utf8')
);
const esClientParams = { name: templateName, body: content };
const esClientRequestOptions = { ignore: [404] };
if (Object.hasOwn(content, 'template') || Object.hasOwn(content, 'composed_of')) {
// Template is v2
return retryTransientEsErrors(
() => esClient.indices.putIndexTemplate(esClientParams, esClientRequestOptions),
{ logger }
);
} else {
// template is V1
return retryTransientEsErrors(
() => esClient.indices.putTemplate(esClientParams, esClientRequestOptions),
{ logger }
);
}
},
{
concurrency: MAX_CONCURRENT_COMPONENT_TEMPLATES,
}
);
} catch (e) {
throw new Boom.Boom(`Error installing prebuilt index templates ${e.message}`, {
statusCode: 400,
@ -164,26 +174,30 @@ const installPreBuiltComponentTemplates = async (
logger: Logger
) => {
const templatePaths = packageInstallContext.paths.filter((path) => isComponentTemplate(path));
const templateInstallPromises = templatePaths.map(async (path) => {
const { file } = getPathParts(path);
const templateName = file.substr(0, file.lastIndexOf('.'));
const content = JSON.parse(
getAssetFromAssetsMap(packageInstallContext.assetsMap, path).toString('utf8')
);
const esClientParams = {
name: templateName,
body: content,
};
return retryTransientEsErrors(
() => esClient.cluster.putComponentTemplate(esClientParams, { ignore: [404] }),
{ logger }
);
});
try {
return await Promise.all(templateInstallPromises);
await pMap(
templatePaths,
async (path) => {
const { file } = getPathParts(path);
const templateName = file.substr(0, file.lastIndexOf('.'));
const content = JSON.parse(
getAssetFromAssetsMap(packageInstallContext.assetsMap, path).toString('utf8')
);
const esClientParams = {
name: templateName,
body: content,
};
return retryTransientEsErrors(
() => esClient.cluster.putComponentTemplate(esClientParams, { ignore: [404] }),
{ logger }
);
},
{
concurrency: MAX_CONCURRENT_COMPONENT_TEMPLATES,
}
);
} catch (e) {
throw new Boom.Boom(`Error installing prebuilt component templates ${e.message}`, {
statusCode: 400,
@ -450,8 +464,9 @@ async function installDataStreamComponentTemplates({
logger: Logger;
componentTemplates: TemplateMap;
}) {
await Promise.all(
Object.entries(componentTemplates).map(async ([name, body]) => {
await pMap(
Object.entries(componentTemplates),
async ([name, body]) => {
// @custom component template should be lazily created by user
if (isUserSettingsTemplate(name)) {
return;
@ -459,7 +474,10 @@ async function installDataStreamComponentTemplates({
const { clusterPromise } = putComponentTemplate(esClient, logger, { body, name });
return clusterPromise;
})
},
{
concurrency: MAX_CONCURRENT_COMPONENT_TEMPLATES,
}
);
}
@ -467,10 +485,12 @@ export async function ensureDefaultComponentTemplates(
esClient: ElasticsearchClient,
logger: Logger
) {
return Promise.all(
FLEET_COMPONENT_TEMPLATES.map(({ name, body }) =>
ensureComponentTemplate(esClient, logger, name, body)
)
return await pMap(
FLEET_COMPONENT_TEMPLATES,
({ name, body }) => ensureComponentTemplate(esClient, logger, name, body),
{
concurrency: MAX_CONCURRENT_COMPONENT_TEMPLATES,
}
);
}

View file

@ -19,6 +19,7 @@ import {
FLEET_EVENT_INGESTED_COMPONENT_TEMPLATE_NAME,
STACK_COMPONENT_TEMPLATE_LOGS_MAPPINGS,
} from '../../../../constants/fleet_es_assets';
import { MAX_CONCURRENT_DATASTREAM_OPERATIONS } from '../../../../constants';
import type { Field, Fields } from '../../fields/field';
import type {
@ -931,10 +932,15 @@ const queryDataStreamsFromTemplates = async (
esClient: ElasticsearchClient,
templates: IndexTemplateEntry[]
): Promise<CurrentDataStream[]> => {
const dataStreamPromises = templates.map((template) => {
return getDataStreams(esClient, template);
});
const dataStreamObjects = await Promise.all(dataStreamPromises);
const dataStreamObjects = await pMap(
templates,
(template) => {
return getDataStreams(esClient, template);
},
{
concurrency: MAX_CONCURRENT_DATASTREAM_OPERATIONS,
}
);
return dataStreamObjects.filter(isCurrentDataStream).flat();
};
@ -997,8 +1003,7 @@ const updateAllDataStreams = async (
});
},
{
// Limit concurrent putMapping/rollover requests to avoid overwhelming ES cluster
concurrency: 20,
concurrency: MAX_CONCURRENT_DATASTREAM_OPERATIONS,
}
);
};

View file

@ -10,6 +10,7 @@ import { errors } from '@elastic/elasticsearch';
import { load } from 'js-yaml';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import { uniqBy } from 'lodash';
import pMap from 'p-map';
import type { HTTPAuthorizationHeader } from '../../../../../common/http_authorization_header';
@ -44,6 +45,8 @@ import { getInstallation } from '../../packages';
import { retryTransientEsErrors } from '../retry';
import { isUserSettingsTemplate } from '../template/utils';
import { MAX_CONCURRENT_TRANSFORMS_OPERATIONS } from '../../../../constants';
import { deleteTransforms } from './remove';
import { getDestinationIndexAliases } from './transform_utils';
import { loadMappingForTransform } from './mappings';
@ -573,17 +576,21 @@ const installTransformsAssets = async (
}
} else {
// Else, create & start all the transforms at once for speed
const transformsPromises = transforms.map(async (transform) => {
return handleTransformInstall({
esClient,
logger,
transform,
startTransform: transformsSpecifications.get(transform.transformModuleId)?.get('start'),
secondaryAuth: transform.runAsKibanaSystem !== false ? undefined : secondaryAuth,
});
});
installedTransforms = await Promise.all(transformsPromises).then((results) => results.flat());
installedTransforms = await pMap(
transforms,
async (transform) => {
return handleTransformInstall({
esClient,
logger,
transform,
startTransform: transformsSpecifications.get(transform.transformModuleId)?.get('start'),
secondaryAuth: transform.runAsKibanaSystem !== false ? undefined : secondaryAuth,
});
},
{
concurrency: MAX_CONCURRENT_TRANSFORMS_OPERATIONS,
}
).then((results) => results.flat());
}
// If user does not have sufficient permissions to start the transforms,

View file

@ -10,6 +10,7 @@ import type { Logger } from '@kbn/logging';
import type { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-server';
import { sortBy, uniqBy } from 'lodash';
import pMap from 'p-map';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import type { ErrorResponseBase } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
@ -19,6 +20,7 @@ import type { Installation } from '../../../../../common';
import { ElasticsearchAssetType, PACKAGES_SAVED_OBJECT_TYPE } from '../../../../../common';
import { retryTransientEsErrors } from '../retry';
import { MAX_CONCURRENT_TRANSFORMS_OPERATIONS } from '../../../../constants';
interface FleetTransformMetadata {
fleet_transform_version?: string;
@ -119,8 +121,9 @@ export async function handleTransformReauthorizeAndStart({
);
}
const transformInfos = await Promise.all(
transforms.map(({ transformId }) =>
const transformInfos = await pMap(
transforms,
({ transformId }) =>
retryTransientEsErrors(
() =>
esClient.transform.getTransform(
@ -130,8 +133,10 @@ export async function handleTransformReauthorizeAndStart({
{ ...(secondaryAuth ? secondaryAuth : {}), ignore: [404] }
),
{ logger, additionalResponseStatuses: [400] }
)
)
),
{
concurrency: MAX_CONCURRENT_TRANSFORMS_OPERATIONS,
}
);
const transformsMetadata: FleetTransformMetadata[] = transformInfos
@ -168,17 +173,20 @@ export async function handleTransformReauthorizeAndStart({
}
} else {
// Else, create & start all the transforms at once for speed
const transformsPromises = transformsMetadata.map(async ({ transformId, ...meta }) => {
return await reauthorizeAndStartTransform({
esClient,
logger,
transformId,
secondaryAuth,
meta: { ...meta, last_authorized_by: username },
});
});
authorizedTransforms = await Promise.all(transformsPromises).then((results) => results.flat());
authorizedTransforms = await pMap(
transformsMetadata,
async ({ transformId, ...meta }) =>
reauthorizeAndStartTransform({
esClient,
logger,
transformId,
secondaryAuth,
meta: { ...meta, last_authorized_by: username },
}),
{
concurrency: MAX_CONCURRENT_TRANSFORMS_OPERATIONS,
}
).then((results) => results.flat());
}
const so = await savedObjectsClient.get<Installation>(PACKAGES_SAVED_OBJECT_TYPE, pkgName);

View file

@ -6,6 +6,7 @@
*/
import type { ElasticsearchClient, SavedObjectsClientContract } from '@kbn/core/server';
import pMap from 'p-map';
import type { SecondaryAuthorizationHeader } from '../../../../../common/types/models/transform_api_key';
import { ElasticsearchAssetType } from '../../../../types';
@ -14,6 +15,7 @@ import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../../../common/constants';
import { appContextService } from '../../../app_context';
import { retryTransientEsErrors } from '../retry';
import { MAX_CONCURRENT_TRANSFORMS_OPERATIONS } from '../../../../constants';
export const stopTransforms = async (transformIds: string[], esClient: ElasticsearchClient) => {
for (const transformId of transformIds) {
@ -34,8 +36,10 @@ export const deleteTransforms = async (
if (transformIds.length) {
logger.info(`Deleting currently installed transform ids ${transformIds}`);
}
await Promise.all(
transformIds.map(async (transformId) => {
await pMap(
transformIds,
async (transformId) => {
await stopTransforms([transformId], esClient);
await retryTransientEsErrors(() =>
esClient.transform.deleteTransform(
@ -48,7 +52,10 @@ export const deleteTransforms = async (
)
);
logger.info(`Deleted: ${transformId}`);
})
},
{
concurrency: MAX_CONCURRENT_TRANSFORMS_OPERATIONS,
}
);
};

View file

@ -6,10 +6,12 @@
*/
import type { SavedObjectsClientContract } from '@kbn/core/server';
import pMap from 'p-map';
import { dataTypes, installationStatuses } from '../../../../../common/constants';
import { appContextService } from '../../..';
import { getPackageSavedObjects } from '../../packages/get';
import { MAX_CONCURRENT_INDEX_PATTERN_OPERATIONS } from '../../../../constants';
const INDEX_PATTERN_SAVED_OBJECT_TYPE = 'index-pattern';
export const indexPatternTypes = [dataTypes.Logs, dataTypes.Metrics];
@ -72,9 +74,9 @@ export async function removeUnusedIndexPatterns(savedObjectsClient: SavedObjects
// eslint-disable-next-line @typescript-eslint/naming-convention
const idsToDelete = resolvedObjects.map(({ saved_object }) => saved_object.id);
return Promise.all(
idsToDelete.map(async (id) => {
await pMap(
idsToDelete,
async (id) => {
try {
logger.debug(`deleting index pattern ${id}`);
await savedObjectsClient.delete(INDEX_PATTERN_SAVED_OBJECT_TYPE, id);
@ -83,6 +85,9 @@ export async function removeUnusedIndexPatterns(savedObjectsClient: SavedObjects
logger.debug(`Non fatal error encountered deleting index pattern ${id} : ${err}`);
}
return;
})
},
{
concurrency: MAX_CONCURRENT_INDEX_PATTERN_OPERATIONS,
}
);
}

View file

@ -39,7 +39,10 @@ import type {
PackageSpecManifest,
AssetsMap,
} from '../../../../common/types';
import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../constants';
import {
PACKAGES_SAVED_OBJECT_TYPE,
MAX_CONCURRENT_EPM_PACKAGES_INSTALLATIONS,
} from '../../../constants';
import type {
ArchivePackage,
RegistryPackage,
@ -142,7 +145,7 @@ export async function getPackages(
);
}
},
{ concurrency: 10 }
{ concurrency: MAX_CONCURRENT_EPM_PACKAGES_INSTALLATIONS }
)
).filter((p): p is Installable<any> => p !== null);

View file

@ -17,10 +17,12 @@ import { SavedObjectsUtils, SavedObjectsErrorHelpers } from '@kbn/core/server';
import minVersion from 'semver/ranges/min-version';
import { chunk } from 'lodash';
import pMap from 'p-map';
import { updateIndexSettings } from '../elasticsearch/index/update_settings';
import {
MAX_CONCURRENT_ES_ASSETS_OPERATIONS,
PACKAGE_POLICY_SAVED_OBJECT_TYPE,
PACKAGES_SAVED_OBJECT_TYPE,
SO_SEARCH_LIMIT,
@ -209,29 +211,36 @@ async function bulkDeleteSavedObjects(
}
}
export function deleteESAssets(
export const deleteESAsset = async (
installedObject: EsAssetReference,
esClient: ElasticsearchClient
): Promise<void> => {
const { id, type } = installedObject;
const assetType = type as AssetType;
if (assetType === ElasticsearchAssetType.ingestPipeline) {
return deletePipeline(esClient, id);
} else if (assetType === ElasticsearchAssetType.indexTemplate) {
return deleteIndexTemplate(esClient, id);
} else if (assetType === ElasticsearchAssetType.componentTemplate) {
return deleteComponentTemplate(esClient, id);
} else if (assetType === ElasticsearchAssetType.transform) {
return deleteTransforms(esClient, [id], true);
} else if (assetType === ElasticsearchAssetType.dataStreamIlmPolicy) {
return deleteIlms(esClient, [id]);
} else if (assetType === ElasticsearchAssetType.ilmPolicy) {
return deleteIlms(esClient, [id]);
} else if (assetType === ElasticsearchAssetType.mlModel) {
return deleteMlModel(esClient, [id]);
}
};
export const deleteESAssets = (
installedObjects: EsAssetReference[],
esClient: ElasticsearchClient
): Array<Promise<unknown>> {
return installedObjects.map(async ({ id, type }) => {
const assetType = type as AssetType;
if (assetType === ElasticsearchAssetType.ingestPipeline) {
return deletePipeline(esClient, id);
} else if (assetType === ElasticsearchAssetType.indexTemplate) {
return deleteIndexTemplate(esClient, id);
} else if (assetType === ElasticsearchAssetType.componentTemplate) {
return deleteComponentTemplate(esClient, id);
} else if (assetType === ElasticsearchAssetType.transform) {
return deleteTransforms(esClient, [id], true);
} else if (assetType === ElasticsearchAssetType.dataStreamIlmPolicy) {
return deleteIlms(esClient, [id]);
} else if (assetType === ElasticsearchAssetType.ilmPolicy) {
return deleteIlms(esClient, [id]);
} else if (assetType === ElasticsearchAssetType.mlModel) {
return deleteMlModel(esClient, [id]);
}
});
}
): Array<Promise<void>> => {
return installedObjects.map((installedObject) => deleteESAsset(installedObject, esClient));
};
type Tuple = [EsAssetReference[], EsAssetReference[], EsAssetReference[], EsAssetReference[]];
export const splitESAssets = (installedEs: EsAssetReference[]) => {
@ -291,16 +300,24 @@ export async function deletePrerequisiteAssets(
try {
// must first unset any default pipeline associated with any existing indices
// by setting empty string
await Promise.all(
indexAssets.map((asset) => updateIndexSettings(esClient, asset.id, { default_pipeline: '' }))
await pMap(
indexAssets,
(asset) => updateIndexSettings(esClient, asset.id, { default_pipeline: '' }),
{
concurrency: MAX_CONCURRENT_ES_ASSETS_OPERATIONS,
}
);
// in case transform's destination index contains any pipeline,
// we should delete the transforms first
await Promise.all(deleteESAssets(transformAssets, esClient));
await pMap(transformAssets, (transformAsset) => deleteESAsset(transformAsset, esClient), {
concurrency: MAX_CONCURRENT_ES_ASSETS_OPERATIONS,
});
// then delete index templates and pipelines
await Promise.all(deleteESAssets(indexTemplatesAndPipelines, esClient));
await pMap(indexTemplatesAndPipelines, (asset) => deleteESAsset(asset, esClient), {
concurrency: MAX_CONCURRENT_ES_ASSETS_OPERATIONS,
});
} catch (err) {
// in the rollback case, partial installs are likely, so missing assets are not an error
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {

View file

@ -8,6 +8,7 @@
import type { ElasticsearchClient } from '@kbn/core/server';
import type { SearchHit, UpdateByQueryResponse } from '@elastic/elasticsearch/lib/api/types';
import type { FileStatus } from '@kbn/files-plugin/common/types';
import pMap from 'p-map';
import {
FILE_STORAGE_DATA_INDEX_PATTERN,
@ -20,6 +21,8 @@ import { getFileMetadataIndexName } from '../../../common/services';
import { ES_SEARCH_LIMIT } from '../../../common/constants';
import { MAX_CONCURRENT_AGENT_FILES_UPLOADS } from '../../constants';
import { parseFileStorageIndex } from './utils';
/**
@ -145,14 +148,15 @@ export async function fileIdsWithoutChunksByIndex(
* @param fileIdsByIndex
* @param status
*/
export function updateFilesStatus(
export async function updateFilesStatus(
esClient: ElasticsearchClient,
abortController: AbortController | undefined,
fileIdsByIndex: FileIdsByIndex,
status: FileStatus
): Promise<UpdateByQueryResponse[]> {
return Promise.all(
Object.entries(fileIdsByIndex).map(([index, fileIds]) => {
return await pMap(
Object.entries(fileIdsByIndex),
([index, fileIds]) => {
return esClient
.updateByQuery(
{
@ -174,6 +178,9 @@ export function updateFilesStatus(
Error.captureStackTrace(err);
throw err;
});
})
},
{
concurrency: MAX_CONCURRENT_AGENT_FILES_UPLOADS,
}
);
}

View file

@ -13,7 +13,11 @@ import type {
import { omit } from 'lodash';
import pMap from 'p-map';
import { FLEET_PROXY_SAVED_OBJECT_TYPE, SO_SEARCH_LIMIT } from '../constants';
import {
FLEET_PROXY_SAVED_OBJECT_TYPE,
SO_SEARCH_LIMIT,
MAX_CONCURRENT_FLEET_PROXIES_OPERATIONS,
} from '../constants';
import { FleetProxyUnauthorizedError } from '../errors';
import type {
DownloadSource,
@ -206,7 +210,7 @@ async function updateRelatedSavedObject(
...omit(fleetServerHost, 'id'),
proxy_id: null,
}),
{ concurrency: 20 }
{ concurrency: MAX_CONCURRENT_FLEET_PROXIES_OPERATIONS }
);
await pMap(
@ -216,7 +220,7 @@ async function updateRelatedSavedObject(
...omit(output, 'id'),
proxy_id: null,
} as Partial<Output>),
{ concurrency: 20 }
{ concurrency: MAX_CONCURRENT_FLEET_PROXIES_OPERATIONS }
);
await pMap(downloadSources, (downloadSource) =>

View file

@ -44,6 +44,7 @@ import {
DEFAULT_OUTPUT_ID,
OUTPUT_SAVED_OBJECT_TYPE,
OUTPUT_HEALTH_DATA_STREAM,
MAX_CONCURRENT_BACKFILL_OUTPUTS_PRESETS,
} from '../constants';
import {
SO_SEARCH_LIMIT,
@ -1165,7 +1166,7 @@ class OutputService {
await agentPolicyService.bumpAllAgentPoliciesForOutput(esClient, output.id);
},
{
concurrency: 5,
concurrency: MAX_CONCURRENT_BACKFILL_OUTPUTS_PRESETS,
}
);
}

View file

@ -100,6 +100,11 @@ import type {
} from '../types';
import type { ExternalCallback } from '..';
import {
MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS,
MAX_CONCURRENT_PACKAGE_ASSETS,
} from '../constants';
import { createSoFindIterable } from './utils/create_so_find_iterable';
import type { FleetAuthzRouteConfig } from './security';
@ -178,7 +183,7 @@ async function getPkgInfoAssetsMap({
pkgInfo,
});
},
{ concurrency: 5 }
{ concurrency: MAX_CONCURRENT_PACKAGE_ASSETS }
);
return packageInfosandAssetsMap;
@ -2211,7 +2216,7 @@ class PackagePolicyClientImpl implements PackagePolicyClient {
}
},
{
concurrency: 50,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS,
}
);
await pMap(
@ -2231,7 +2236,7 @@ class PackagePolicyClientImpl implements PackagePolicyClient {
);
},
{
concurrency: 50,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS,
}
);
}

View file

@ -22,6 +22,8 @@ import { listFleetServerHostsForProxyId } from '../fleet_server_host';
import { agentPolicyService } from '../agent_policy';
import { outputService } from '../output';
import { MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20 } from '../../constants';
export function getPreconfiguredFleetProxiesFromConfig(config?: FleetConfigType) {
const { proxies: fleetProxiesFromConfig } = config;
@ -107,7 +109,7 @@ async function createOrUpdatePreconfiguredFleetProxies(
outputs,
(output) => agentPolicyService.bumpAllAgentPoliciesForOutput(esClient, output.id),
{
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
}
);
await pMap(
@ -118,7 +120,7 @@ async function createOrUpdatePreconfiguredFleetProxies(
fleetServerHost.id
),
{
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
}
);
}

View file

@ -11,6 +11,7 @@ import utils from 'node:util';
import type { ElasticsearchClient, SavedObjectsClientContract } from '@kbn/core/server';
import { isEqual } from 'lodash';
import { dump } from 'js-yaml';
import pMap from 'p-map';
const pbkdf2Async = utils.promisify(crypto.pbkdf2);
@ -32,6 +33,8 @@ import { appContextService } from '../app_context';
import { isDifferent } from './utils';
export const MAX_CONCURRENT_OUTPUTS_OPERATIONS = 50;
export function getPreconfiguredOutputFromConfig(config?: FleetConfigType) {
const { outputs: outputsOrUndefined } = config;
@ -78,67 +81,69 @@ export async function createOrUpdatePreconfiguredOutputs(
{ ignoreNotFound: true }
);
await Promise.all(
outputs.map(async (output) => {
const existingOutput = existingOutputs.find((o) => o.id === output.id);
const updateOrConfigureOutput = async (output: PreconfiguredOutput) => {
const existingOutput = existingOutputs.find((o) => o.id === output.id);
const { id, config, ...outputData } = output;
const { id, config, ...outputData } = output;
const configYaml = config ? dump(config) : undefined;
const configYaml = config ? dump(config) : undefined;
const data: NewOutput = {
...outputData,
is_preconfigured: true,
config_yaml: configYaml ?? null,
// Set value to null to update these fields on update
ca_sha256: outputData.ca_sha256 ?? null,
ca_trusted_fingerprint: outputData.ca_trusted_fingerprint ?? null,
ssl: outputData.ssl ?? null,
} as NewOutput;
const data: NewOutput = {
...outputData,
is_preconfigured: true,
config_yaml: configYaml ?? null,
// Set value to null to update these fields on update
ca_sha256: outputData.ca_sha256 ?? null,
ca_trusted_fingerprint: outputData.ca_trusted_fingerprint ?? null,
ssl: outputData.ssl ?? null,
} as NewOutput;
if (!data.hosts || data.hosts.length === 0) {
data.hosts = outputService.getDefaultESHosts();
if (!data.hosts || data.hosts.length === 0) {
data.hosts = outputService.getDefaultESHosts();
}
const isCreate = !existingOutput;
// field in allow edit are not updated through preconfiguration
if (!isCreate && output.allow_edit) {
for (const key of output.allow_edit) {
// @ts-expect-error
data[key] = existingOutput[key];
}
}
const isCreate = !existingOutput;
const isUpdateWithNewData =
existingOutput && (await isPreconfiguredOutputDifferentFromCurrent(existingOutput, data));
// field in allow edit are not updated through preconfiguration
if (!isCreate && output.allow_edit) {
for (const key of output.allow_edit) {
// @ts-expect-error
data[key] = existingOutput[key];
if (isCreate || isUpdateWithNewData) {
const secretHashes = await hashSecrets(output);
if (isCreate) {
logger.debug(`Creating preconfigured output ${output.id}`);
await outputService.create(soClient, esClient, data, {
id,
fromPreconfiguration: true,
secretHashes,
});
} else if (isUpdateWithNewData) {
logger.debug(`Updating preconfigured output ${output.id}`);
await outputService.update(soClient, esClient, id, data, {
fromPreconfiguration: true,
secretHashes,
});
// Bump revision of all policies using that output
if (outputData.is_default || outputData.is_default_monitoring) {
await agentPolicyService.bumpAllAgentPolicies(esClient);
} else {
await agentPolicyService.bumpAllAgentPoliciesForOutput(esClient, id);
}
}
}
};
const isUpdateWithNewData =
existingOutput && (await isPreconfiguredOutputDifferentFromCurrent(existingOutput, data));
if (isCreate || isUpdateWithNewData) {
const secretHashes = await hashSecrets(output);
if (isCreate) {
logger.debug(`Creating preconfigured output ${output.id}`);
await outputService.create(soClient, esClient, data, {
id,
fromPreconfiguration: true,
secretHashes,
});
} else if (isUpdateWithNewData) {
logger.debug(`Updating preconfigured output ${output.id}`);
await outputService.update(soClient, esClient, id, data, {
fromPreconfiguration: true,
secretHashes,
});
// Bump revision of all policies using that output
if (outputData.is_default || outputData.is_default_monitoring) {
await agentPolicyService.bumpAllAgentPolicies(esClient);
} else {
await agentPolicyService.bumpAllAgentPoliciesForOutput(esClient, id);
}
}
}
})
);
await pMap(outputs, (output) => updateOrConfigureOutput(output), {
concurrency: MAX_CONCURRENT_OUTPUTS_OPERATIONS,
});
}
// Values recommended by NodeJS documentation

View file

@ -24,6 +24,8 @@ import { listEnrollmentApiKeys, deleteEnrollmentApiKey } from '../api_keys';
import type { AgentPolicy } from '../../types';
import { AgentPolicyInvalidError } from '../../errors';
import { MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20 } from '../../constants';
export async function resetPreconfiguredAgentPolicies(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient,
@ -83,7 +85,7 @@ async function _deleteGhostPackagePolicies(
}
},
{
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
}
);
}
@ -116,7 +118,7 @@ async function _deletePreconfigurationDeleteRecord(
}),
{
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
}
);
}
@ -167,7 +169,7 @@ async function _deleteExistingData(
if (agents.length > 0) {
logger.info(`Force unenrolling ${agents.length} agents`);
await pMap(agents, (agent) => forceUnenrollAgent(esClient, soClient, agent.id), {
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
});
}
@ -183,7 +185,7 @@ async function _deleteExistingData(
enrollmentApiKeys,
(enrollmentKey) => deleteEnrollmentApiKey(esClient, enrollmentKey.id, true),
{
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
}
);
}
@ -195,7 +197,7 @@ async function _deleteExistingData(
force: true,
}),
{
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
}
);
}

View file

@ -21,6 +21,8 @@ import { AUTO_UPDATE_PACKAGES, FLEET_SETUP_LOCK_TYPE } from '../../common/consta
import type { PreconfigurationError } from '../../common/constants';
import type { DefaultPackagesInstallationError, FleetSetupLock } from '../../common/types';
import { MAX_CONCURRENT_EPM_PACKAGES_INSTALLATIONS } from '../constants';
import { appContextService } from './app_context';
import { ensurePreconfiguredPackagesAndPolicies } from './preconfiguration';
import {
@ -359,7 +361,7 @@ export async function ensureFleetGlobalEsAssets(
);
});
},
{ concurrency: 10 }
{ concurrency: MAX_CONCURRENT_EPM_PACKAGES_INSTALLATIONS }
);
}
}

View file

@ -8,6 +8,8 @@
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import pMap from 'p-map';
import { MAX_CONCURRENT_CLEAN_OLD_FILE_INDICES } from '../../constants';
const INDICES_TO_CLEAN = [
'.fleet-files-*',
'.fleet-file-data-*',
@ -49,7 +51,7 @@ export async function cleanUpOldFileIndices(esClient: ElasticsearchClient, logge
});
}
},
{ concurrency: 2 }
{ concurrency: MAX_CONCURRENT_CLEAN_OLD_FILE_INDICES }
);
await esClient.indices
.deleteIndexTemplate({

View file

@ -10,7 +10,7 @@ import pMap from 'p-map';
import { agentPolicyService } from '../agent_policy';
import { ensureDefaultEnrollmentAPIKeyForAgentPolicy } from '../api_keys';
import { SO_SEARCH_LIMIT } from '../../constants';
import { SO_SEARCH_LIMIT, MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20 } from '../../constants';
import { appContextService } from '../app_context';
import { scheduleDeployAgentPoliciesTask } from '../agent_policies/deploy_agent_policies_task';
import { scheduleBumpAgentPoliciesTask } from '../agent_policies/bump_agent_policies_task';
@ -51,7 +51,7 @@ export async function ensureAgentPoliciesFleetServerKeysAndPolicies({
}
},
{
concurrency: 20,
concurrency: MAX_CONCURRENT_AGENT_POLICIES_OPERATIONS_20,
}
);

View file

@ -9,7 +9,11 @@ import type { ElasticsearchClient, SavedObjectsClientContract } from '@kbn/core/
import pMap from 'p-map';
import type { Logger } from '@kbn/logging';
import { PACKAGES_SAVED_OBJECT_TYPE, SO_SEARCH_LIMIT } from '../../constants';
import {
MAX_CONCURRENT_EPM_PACKAGES_INSTALLATIONS,
PACKAGES_SAVED_OBJECT_TYPE,
SO_SEARCH_LIMIT,
} from '../../constants';
import { FLEET_INSTALL_FORMAT_VERSION } from '../../constants/fleet_es_assets';
import type { Installation } from '../../types';
@ -59,6 +63,6 @@ export async function upgradePackageInstallVersion({
}
});
},
{ concurrency: 10 }
{ concurrency: MAX_CONCURRENT_EPM_PACKAGES_INSTALLATIONS }
);
}