mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
[Entity Analytics] [Entity Store] Run init requests sequentially to prevent resource exists error (#198268)
## Summary This PR fixes an issue where running init for both `user` and `host` entity engines in parallel would cause a race condition while enabling the risk engine, resulting in a `Resource already exists` error. --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
6cc8b97b60
commit
6a50066e00
5 changed files with 38 additions and 17 deletions
|
@ -66,6 +66,7 @@ const EntityStoreDashboardPanelsComponent = () => {
|
||||||
};
|
};
|
||||||
setRiskEngineInitializing(true);
|
setRiskEngineInitializing(true);
|
||||||
initRiskEngine(undefined, options);
|
initRiskEngine(undefined, options);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (enable.entityStore) {
|
if (enable.entityStore) {
|
||||||
|
|
|
@ -43,7 +43,8 @@ export const useEntityStoreEnablement = () => {
|
||||||
const { initEntityStore } = useEntityStoreRoutes();
|
const { initEntityStore } = useEntityStoreRoutes();
|
||||||
const { refetch: initialize } = useQuery({
|
const { refetch: initialize } = useQuery({
|
||||||
queryKey: [ENTITY_STORE_ENABLEMENT_INIT],
|
queryKey: [ENTITY_STORE_ENABLEMENT_INIT],
|
||||||
queryFn: () => Promise.all([initEntityStore('user'), initEntityStore('host')]),
|
queryFn: async () =>
|
||||||
|
initEntityStore('user').then((usr) => initEntityStore('host').then((host) => [usr, host])),
|
||||||
enabled: false,
|
enabled: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -79,7 +80,9 @@ export const useInitEntityEngineMutation = (options?: UseMutationOptions<{}>) =>
|
||||||
timestamp: new Date().toISOString(),
|
timestamp: new Date().toISOString(),
|
||||||
action: 'start',
|
action: 'start',
|
||||||
});
|
});
|
||||||
return Promise.all([initEntityStore('user'), initEntityStore('host')]);
|
return initEntityStore('user').then((usr) =>
|
||||||
|
initEntityStore('host').then((host) => [usr, host])
|
||||||
|
);
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
...options,
|
...options,
|
||||||
|
@ -107,7 +110,9 @@ export const useStopEntityEngineMutation = (options?: UseMutationOptions<{}>) =>
|
||||||
timestamp: new Date().toISOString(),
|
timestamp: new Date().toISOString(),
|
||||||
action: 'stop',
|
action: 'stop',
|
||||||
});
|
});
|
||||||
return Promise.all([stopEntityStore('user'), stopEntityStore('host')]);
|
return stopEntityStore('user').then((usr) =>
|
||||||
|
stopEntityStore('host').then((host) => [usr, host])
|
||||||
|
);
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
...options,
|
...options,
|
||||||
|
@ -129,7 +134,10 @@ export const useDeleteEntityEngineMutation = (options?: UseMutationOptions<{}>)
|
||||||
const invalidateEntityEngineStatusQuery = useInvalidateEntityEngineStatusQuery();
|
const invalidateEntityEngineStatusQuery = useInvalidateEntityEngineStatusQuery();
|
||||||
const { deleteEntityEngine } = useEntityStoreRoutes();
|
const { deleteEntityEngine } = useEntityStoreRoutes();
|
||||||
return useMutation<DeleteEntityEngineResponse[]>(
|
return useMutation<DeleteEntityEngineResponse[]>(
|
||||||
() => Promise.all([deleteEntityEngine('user', true), deleteEntityEngine('host', true)]),
|
() =>
|
||||||
|
deleteEntityEngine('user', true).then((usr) =>
|
||||||
|
deleteEntityEngine('host', true).then((host) => [usr, host])
|
||||||
|
),
|
||||||
{
|
{
|
||||||
...options,
|
...options,
|
||||||
mutationKey: DELETE_ENTITY_ENGINE_STATUS_KEY,
|
mutationKey: DELETE_ENTITY_ENGINE_STATUS_KEY,
|
||||||
|
|
|
@ -125,7 +125,7 @@ export const createPlatformPipeline = async ({
|
||||||
managed_by: 'entity_store',
|
managed_by: 'entity_store',
|
||||||
managed: true,
|
managed: true,
|
||||||
},
|
},
|
||||||
description: `Ingest pipeline for entity defiinition ${entityManagerDefinition.id}`,
|
description: `Ingest pipeline for entity definition ${entityManagerDefinition.id}`,
|
||||||
processors: buildIngestPipeline({
|
processors: buildIngestPipeline({
|
||||||
namespace: unitedDefinition.namespace,
|
namespace: unitedDefinition.namespace,
|
||||||
version: unitedDefinition.version,
|
version: unitedDefinition.version,
|
||||||
|
|
|
@ -144,7 +144,7 @@ export class EntityStoreDataClient {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
logger.info(
|
logger.info(
|
||||||
`In namespace ${this.options.namespace}: Initializing entity store for ${entityType}`
|
`[Entity Store] In namespace ${this.options.namespace}: Initializing entity store for ${entityType}`
|
||||||
);
|
);
|
||||||
|
|
||||||
const descriptor = await this.engineClient.init(entityType, {
|
const descriptor = await this.engineClient.init(entityType, {
|
||||||
|
@ -152,7 +152,7 @@ export class EntityStoreDataClient {
|
||||||
fieldHistoryLength,
|
fieldHistoryLength,
|
||||||
indexPattern,
|
indexPattern,
|
||||||
});
|
});
|
||||||
logger.debug(`Initialized engine for ${entityType}`);
|
logger.debug(`[Entity Store] Initialized saved object for ${entityType}`);
|
||||||
// first create the entity definition without starting it
|
// first create the entity definition without starting it
|
||||||
// so that the index template is created which we can add a component template to
|
// so that the index template is created which we can add a component template to
|
||||||
|
|
||||||
|
@ -165,7 +165,9 @@ export class EntityStoreDataClient {
|
||||||
config,
|
config,
|
||||||
pipelineDebugMode
|
pipelineDebugMode
|
||||||
).catch((error) => {
|
).catch((error) => {
|
||||||
logger.error(`There was an error during async setup of the Entity Store: ${error.message}`);
|
logger.error(
|
||||||
|
`[Entity Store] There was an error during async setup of the Entity Store: ${error.message}`
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
return descriptor;
|
return descriptor;
|
||||||
|
@ -260,7 +262,7 @@ export class EntityStoreDataClient {
|
||||||
logger,
|
logger,
|
||||||
taskManager,
|
taskManager,
|
||||||
});
|
});
|
||||||
logger.info(`Entity store initialized for ${entityType}`);
|
debugLog(`Entity store initialized`);
|
||||||
|
|
||||||
const setupEndTime = moment().utc().toISOString();
|
const setupEndTime = moment().utc().toISOString();
|
||||||
const duration = moment(setupEndTime).diff(moment(setupStartTime), 'seconds');
|
const duration = moment(setupEndTime).diff(moment(setupStartTime), 'seconds');
|
||||||
|
@ -271,7 +273,7 @@ export class EntityStoreDataClient {
|
||||||
return updated;
|
return updated;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.options.logger.error(
|
this.options.logger.error(
|
||||||
`Error initializing entity store for ${entityType}: ${err.message}`
|
`[Entity Store] Error initializing entity store for ${entityType}: ${err.message}`
|
||||||
);
|
);
|
||||||
|
|
||||||
this.options.telemetry?.reportEvent(ENTITY_ENGINE_RESOURCE_INIT_FAILURE_EVENT.eventType, {
|
this.options.telemetry?.reportEvent(ENTITY_ENGINE_RESOURCE_INIT_FAILURE_EVENT.eventType, {
|
||||||
|
@ -369,7 +371,9 @@ export class EntityStoreDataClient {
|
||||||
frequency: `${config.frequency.asSeconds()}s`,
|
frequency: `${config.frequency.asSeconds()}s`,
|
||||||
});
|
});
|
||||||
const { entityManagerDefinition } = unitedDefinition;
|
const { entityManagerDefinition } = unitedDefinition;
|
||||||
logger.info(`In namespace ${namespace}: Deleting entity store for ${entityType}`);
|
logger.info(
|
||||||
|
`[Entity Store] In namespace ${namespace}: Deleting entity store for ${entityType}`
|
||||||
|
);
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
await this.entityClient.deleteEntityDefinition({
|
await this.entityClient.deleteEntityDefinition({
|
||||||
|
@ -416,6 +420,7 @@ export class EntityStoreDataClient {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.info(`[Entity Store] In namespace ${namespace}: Deleted store for ${entityType}`);
|
||||||
return { deleted: true };
|
return { deleted: true };
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error(`Error deleting entity store for ${entityType}: ${e.message}`);
|
logger.error(`Error deleting entity store for ${entityType}: ${e.message}`);
|
||||||
|
|
|
@ -36,12 +36,12 @@ import {
|
||||||
const logFactory =
|
const logFactory =
|
||||||
(logger: Logger, taskId: string) =>
|
(logger: Logger, taskId: string) =>
|
||||||
(message: string): void =>
|
(message: string): void =>
|
||||||
logger.info(`[task ${taskId}]: ${message}`);
|
logger.info(`[Entity Store] [task ${taskId}]: ${message}`);
|
||||||
|
|
||||||
const debugLogFactory =
|
const debugLogFactory =
|
||||||
(logger: Logger, taskId: string) =>
|
(logger: Logger, taskId: string) =>
|
||||||
(message: string): void =>
|
(message: string): void =>
|
||||||
logger.debug(`[task ${taskId}]: ${message}`);
|
logger.debug(`[Entity Store] [task ${taskId}]: ${message}`);
|
||||||
|
|
||||||
const getTaskName = (): string => TYPE;
|
const getTaskName = (): string => TYPE;
|
||||||
|
|
||||||
|
@ -65,7 +65,9 @@ export const registerEntityStoreFieldRetentionEnrichTask = ({
|
||||||
taskManager: TaskManagerSetupContract | undefined;
|
taskManager: TaskManagerSetupContract | undefined;
|
||||||
}): void => {
|
}): void => {
|
||||||
if (!taskManager) {
|
if (!taskManager) {
|
||||||
logger.info('Task Manager is unavailable; skipping entity store enrich policy registration.');
|
logger.info(
|
||||||
|
'[Entity Store] Task Manager is unavailable; skipping entity store enrich policy registration.'
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,7 +136,7 @@ export const startEntityStoreFieldRetentionEnrichTask = async ({
|
||||||
params: { version: VERSION },
|
params: { version: VERSION },
|
||||||
});
|
});
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.warn(`[task ${taskId}]: error scheduling task, received ${e.message}`);
|
logger.warn(`[Entity Store] [task ${taskId}]: error scheduling task, received ${e.message}`);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -150,9 +152,14 @@ export const removeEntityStoreFieldRetentionEnrichTask = async ({
|
||||||
}) => {
|
}) => {
|
||||||
try {
|
try {
|
||||||
await taskManager.remove(getTaskId(namespace));
|
await taskManager.remove(getTaskId(namespace));
|
||||||
|
logger.info(
|
||||||
|
`[Entity Store] Removed entity store enrich policy task for namespace ${namespace}`
|
||||||
|
);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {
|
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {
|
||||||
logger.error(`Failed to remove entity store enrich policy task: ${err.message}`);
|
logger.error(
|
||||||
|
`[Entity Store] Failed to remove entity store enrich policy task: ${err.message}`
|
||||||
|
);
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -233,7 +240,7 @@ export const runTask = async ({
|
||||||
state: updatedState,
|
state: updatedState,
|
||||||
};
|
};
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error(`[task ${taskId}]: error running task, received ${e.message}`);
|
logger.error(`[Entity Store] [task ${taskId}]: error running task, received ${e.message}`);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue