[Entity Analytics] [Entity Store] Run init requests sequentially to prevent resource exists error (#198268)

## Summary

This PR fixes an issue where running init for both `user` and `host`
entity engines in parallel would cause a race condition while enabling
the risk engine, resulting in a `Resource already exists` error.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Tiago Vila Verde 2024-10-30 17:03:07 +01:00 committed by GitHub
parent 6cc8b97b60
commit 6a50066e00
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 38 additions and 17 deletions

View file

@ -66,6 +66,7 @@ const EntityStoreDashboardPanelsComponent = () => {
}; };
setRiskEngineInitializing(true); setRiskEngineInitializing(true);
initRiskEngine(undefined, options); initRiskEngine(undefined, options);
return;
} }
if (enable.entityStore) { if (enable.entityStore) {

View file

@ -43,7 +43,8 @@ export const useEntityStoreEnablement = () => {
const { initEntityStore } = useEntityStoreRoutes(); const { initEntityStore } = useEntityStoreRoutes();
const { refetch: initialize } = useQuery({ const { refetch: initialize } = useQuery({
queryKey: [ENTITY_STORE_ENABLEMENT_INIT], queryKey: [ENTITY_STORE_ENABLEMENT_INIT],
queryFn: () => Promise.all([initEntityStore('user'), initEntityStore('host')]), queryFn: async () =>
initEntityStore('user').then((usr) => initEntityStore('host').then((host) => [usr, host])),
enabled: false, enabled: false,
}); });
@ -79,7 +80,9 @@ export const useInitEntityEngineMutation = (options?: UseMutationOptions<{}>) =>
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
action: 'start', action: 'start',
}); });
return Promise.all([initEntityStore('user'), initEntityStore('host')]); return initEntityStore('user').then((usr) =>
initEntityStore('host').then((host) => [usr, host])
);
}, },
{ {
...options, ...options,
@ -107,7 +110,9 @@ export const useStopEntityEngineMutation = (options?: UseMutationOptions<{}>) =>
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
action: 'stop', action: 'stop',
}); });
return Promise.all([stopEntityStore('user'), stopEntityStore('host')]); return stopEntityStore('user').then((usr) =>
stopEntityStore('host').then((host) => [usr, host])
);
}, },
{ {
...options, ...options,
@ -129,7 +134,10 @@ export const useDeleteEntityEngineMutation = (options?: UseMutationOptions<{}>)
const invalidateEntityEngineStatusQuery = useInvalidateEntityEngineStatusQuery(); const invalidateEntityEngineStatusQuery = useInvalidateEntityEngineStatusQuery();
const { deleteEntityEngine } = useEntityStoreRoutes(); const { deleteEntityEngine } = useEntityStoreRoutes();
return useMutation<DeleteEntityEngineResponse[]>( return useMutation<DeleteEntityEngineResponse[]>(
() => Promise.all([deleteEntityEngine('user', true), deleteEntityEngine('host', true)]), () =>
deleteEntityEngine('user', true).then((usr) =>
deleteEntityEngine('host', true).then((host) => [usr, host])
),
{ {
...options, ...options,
mutationKey: DELETE_ENTITY_ENGINE_STATUS_KEY, mutationKey: DELETE_ENTITY_ENGINE_STATUS_KEY,

View file

@ -125,7 +125,7 @@ export const createPlatformPipeline = async ({
managed_by: 'entity_store', managed_by: 'entity_store',
managed: true, managed: true,
}, },
description: `Ingest pipeline for entity defiinition ${entityManagerDefinition.id}`, description: `Ingest pipeline for entity definition ${entityManagerDefinition.id}`,
processors: buildIngestPipeline({ processors: buildIngestPipeline({
namespace: unitedDefinition.namespace, namespace: unitedDefinition.namespace,
version: unitedDefinition.version, version: unitedDefinition.version,

View file

@ -144,7 +144,7 @@ export class EntityStoreDataClient {
); );
} }
logger.info( logger.info(
`In namespace ${this.options.namespace}: Initializing entity store for ${entityType}` `[Entity Store] In namespace ${this.options.namespace}: Initializing entity store for ${entityType}`
); );
const descriptor = await this.engineClient.init(entityType, { const descriptor = await this.engineClient.init(entityType, {
@ -152,7 +152,7 @@ export class EntityStoreDataClient {
fieldHistoryLength, fieldHistoryLength,
indexPattern, indexPattern,
}); });
logger.debug(`Initialized engine for ${entityType}`); logger.debug(`[Entity Store] Initialized saved object for ${entityType}`);
// first create the entity definition without starting it // first create the entity definition without starting it
// so that the index template is created which we can add a component template to // so that the index template is created which we can add a component template to
@ -165,7 +165,9 @@ export class EntityStoreDataClient {
config, config,
pipelineDebugMode pipelineDebugMode
).catch((error) => { ).catch((error) => {
logger.error(`There was an error during async setup of the Entity Store: ${error.message}`); logger.error(
`[Entity Store] There was an error during async setup of the Entity Store: ${error.message}`
);
}); });
return descriptor; return descriptor;
@ -260,7 +262,7 @@ export class EntityStoreDataClient {
logger, logger,
taskManager, taskManager,
}); });
logger.info(`Entity store initialized for ${entityType}`); debugLog(`Entity store initialized`);
const setupEndTime = moment().utc().toISOString(); const setupEndTime = moment().utc().toISOString();
const duration = moment(setupEndTime).diff(moment(setupStartTime), 'seconds'); const duration = moment(setupEndTime).diff(moment(setupStartTime), 'seconds');
@ -271,7 +273,7 @@ export class EntityStoreDataClient {
return updated; return updated;
} catch (err) { } catch (err) {
this.options.logger.error( this.options.logger.error(
`Error initializing entity store for ${entityType}: ${err.message}` `[Entity Store] Error initializing entity store for ${entityType}: ${err.message}`
); );
this.options.telemetry?.reportEvent(ENTITY_ENGINE_RESOURCE_INIT_FAILURE_EVENT.eventType, { this.options.telemetry?.reportEvent(ENTITY_ENGINE_RESOURCE_INIT_FAILURE_EVENT.eventType, {
@ -369,7 +371,9 @@ export class EntityStoreDataClient {
frequency: `${config.frequency.asSeconds()}s`, frequency: `${config.frequency.asSeconds()}s`,
}); });
const { entityManagerDefinition } = unitedDefinition; const { entityManagerDefinition } = unitedDefinition;
logger.info(`In namespace ${namespace}: Deleting entity store for ${entityType}`); logger.info(
`[Entity Store] In namespace ${namespace}: Deleting entity store for ${entityType}`
);
try { try {
try { try {
await this.entityClient.deleteEntityDefinition({ await this.entityClient.deleteEntityDefinition({
@ -416,6 +420,7 @@ export class EntityStoreDataClient {
}); });
} }
logger.info(`[Entity Store] In namespace ${namespace}: Deleted store for ${entityType}`);
return { deleted: true }; return { deleted: true };
} catch (e) { } catch (e) {
logger.error(`Error deleting entity store for ${entityType}: ${e.message}`); logger.error(`Error deleting entity store for ${entityType}: ${e.message}`);

View file

@ -36,12 +36,12 @@ import {
const logFactory = const logFactory =
(logger: Logger, taskId: string) => (logger: Logger, taskId: string) =>
(message: string): void => (message: string): void =>
logger.info(`[task ${taskId}]: ${message}`); logger.info(`[Entity Store] [task ${taskId}]: ${message}`);
const debugLogFactory = const debugLogFactory =
(logger: Logger, taskId: string) => (logger: Logger, taskId: string) =>
(message: string): void => (message: string): void =>
logger.debug(`[task ${taskId}]: ${message}`); logger.debug(`[Entity Store] [task ${taskId}]: ${message}`);
const getTaskName = (): string => TYPE; const getTaskName = (): string => TYPE;
@ -65,7 +65,9 @@ export const registerEntityStoreFieldRetentionEnrichTask = ({
taskManager: TaskManagerSetupContract | undefined; taskManager: TaskManagerSetupContract | undefined;
}): void => { }): void => {
if (!taskManager) { if (!taskManager) {
logger.info('Task Manager is unavailable; skipping entity store enrich policy registration.'); logger.info(
'[Entity Store] Task Manager is unavailable; skipping entity store enrich policy registration.'
);
return; return;
} }
@ -134,7 +136,7 @@ export const startEntityStoreFieldRetentionEnrichTask = async ({
params: { version: VERSION }, params: { version: VERSION },
}); });
} catch (e) { } catch (e) {
logger.warn(`[task ${taskId}]: error scheduling task, received ${e.message}`); logger.warn(`[Entity Store] [task ${taskId}]: error scheduling task, received ${e.message}`);
throw e; throw e;
} }
}; };
@ -150,9 +152,14 @@ export const removeEntityStoreFieldRetentionEnrichTask = async ({
}) => { }) => {
try { try {
await taskManager.remove(getTaskId(namespace)); await taskManager.remove(getTaskId(namespace));
logger.info(
`[Entity Store] Removed entity store enrich policy task for namespace ${namespace}`
);
} catch (err) { } catch (err) {
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) { if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {
logger.error(`Failed to remove entity store enrich policy task: ${err.message}`); logger.error(
`[Entity Store] Failed to remove entity store enrich policy task: ${err.message}`
);
throw err; throw err;
} }
} }
@ -233,7 +240,7 @@ export const runTask = async ({
state: updatedState, state: updatedState,
}; };
} catch (e) { } catch (e) {
logger.error(`[task ${taskId}]: error running task, received ${e.message}`); logger.error(`[Entity Store] [task ${taskId}]: error running task, received ${e.message}`);
throw e; throw e;
} }
}; };