[8.18] [Obs AI Assistant] Add KB re-indexing when encountering semantic_text bug (#210386) (#211497)

# Backport

This will backport the following commits from `main` to `8.18`:
- [[Obs AI Assistant] Add KB re-indexing when encountering
`semantic_text` bug
(#210386)](https://github.com/elastic/kibana/pull/210386)

<!--- Backport version: 9.6.5 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sorenlouv/backport)

<!--BACKPORT [{"author":{"name":"Søren
Louv-Jansen","email":"soren.louv@elastic.co"},"sourceCommit":{"committedDate":"2025-02-17T17:20:50Z","message":"[Obs
AI Assistant] Add KB re-indexing when encountering `semantic_text` bug
(#210386)\n\nCloses
https://github.com/elastic/kibana/issues/210204\n\nThis will
automatically re-index the knowledge base if upon adding a KB\nentry
there is this error:\n\n> The [sparse_vector] field type is not
supported on indices created on\nversions 8.0 to 8.10\n\nThat error
means that semantic_text is not supported in the given index,\nand it
should therefore be re-indexed.\n\n**How to test this
PR:**\n\n**8.10**\n- `git checkout -B 8.10 origin/8.10`\n- Start
Kibana:\n - `nvm use && yarn kbn bootstrap && yarn start`\n- Start ES\n-
`nvm use && yarn es snapshot --license trial
--E\npath.data=\"/Users/sorenlouv/elastic/kbn_es_data/upgrade_testing\"`\n\n**8.19**\n-
`git checkout -B 8.19 origin/8.x`\n- Start Kibana:\n - `nvm use && yarn
kbn bootstrap && yarn start`\n- Start ES\n- `nvm use && yarn es snapshot
--license trial
--E\npath.data=\"/Users/sorenlouv/elastic/kbn_es_data/upgrade_testing\"`\n-
Install Knowledge base\n- Try adding an item to KB (it should fail
️)\n\n**9.1.0**\n- `gh pr checkout 210386`\n- Start Kibana:\n - `nvm
use && yarn kbn bootstrap && yarn start`\n- Start ES\n- `nvm use && yarn
es snapshot --license trial
--E\npath.data=\"/Users/sorenlouv/elastic/kbn_es_data/upgrade_testing\"`\n-
Try adding an item to KB (it should succeed ️)\n\n**TODO:**\n\n- Add an
upgrade test that covers this flow\n\n---------\n\nCo-authored-by:
Viduni Wickramarachchi
<viduni.ushanka@gmail.com>","sha":"df67a09afab22521dfa9ff3ec3a4f624a039c462","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:fix","backport:prev-major","Team:Obs
AI Assistant","v9.1.0"],"title":"[Obs AI Assistant] Add KB re-indexing
when encountering `semantic_text`
bug","number":210386,"url":"https://github.com/elastic/kibana/pull/210386","mergeCommit":{"message":"[Obs
AI Assistant] Add KB re-indexing when encountering `semantic_text` bug
(#210386)\n\nCloses
https://github.com/elastic/kibana/issues/210204\n\nThis will
automatically re-index the knowledge base if upon adding a KB\nentry
there is this error:\n\n> The [sparse_vector] field type is not
supported on indices created on\nversions 8.0 to 8.10\n\nThat error
means that semantic_text is not supported in the given index,\nand it
should therefore be re-indexed.\n\n**How to test this
PR:**\n\n**8.10**\n- `git checkout -B 8.10 origin/8.10`\n- Start
Kibana:\n - `nvm use && yarn kbn bootstrap && yarn start`\n- Start ES\n-
`nvm use && yarn es snapshot --license trial
--E\npath.data=\"/Users/sorenlouv/elastic/kbn_es_data/upgrade_testing\"`\n\n**8.19**\n-
`git checkout -B 8.19 origin/8.x`\n- Start Kibana:\n - `nvm use && yarn
kbn bootstrap && yarn start`\n- Start ES\n- `nvm use && yarn es snapshot
--license trial
--E\npath.data=\"/Users/sorenlouv/elastic/kbn_es_data/upgrade_testing\"`\n-
Install Knowledge base\n- Try adding an item to KB (it should fail
️)\n\n**9.1.0**\n- `gh pr checkout 210386`\n- Start Kibana:\n - `nvm
use && yarn kbn bootstrap && yarn start`\n- Start ES\n- `nvm use && yarn
es snapshot --license trial
--E\npath.data=\"/Users/sorenlouv/elastic/kbn_es_data/upgrade_testing\"`\n-
Try adding an item to KB (it should succeed ️)\n\n**TODO:**\n\n- Add an
upgrade test that covers this flow\n\n---------\n\nCo-authored-by:
Viduni Wickramarachchi
<viduni.ushanka@gmail.com>","sha":"df67a09afab22521dfa9ff3ec3a4f624a039c462"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/210386","number":210386,"mergeCommit":{"message":"[Obs
AI Assistant] Add KB re-indexing when encountering `semantic_text` bug
(#210386)\n\nCloses
https://github.com/elastic/kibana/issues/210204\n\nThis will
automatically re-index the knowledge base if upon adding a KB\nentry
there is this error:\n\n> The [sparse_vector] field type is not
supported on indices created on\nversions 8.0 to 8.10\n\nThat error
means that semantic_text is not supported in the given index,\nand it
should therefore be re-indexed.\n\n**How to test this
PR:**\n\n**8.10**\n- `git checkout -B 8.10 origin/8.10`\n- Start
Kibana:\n - `nvm use && yarn kbn bootstrap && yarn start`\n- Start ES\n-
`nvm use && yarn es snapshot --license trial
--E\npath.data=\"/Users/sorenlouv/elastic/kbn_es_data/upgrade_testing\"`\n\n**8.19**\n-
`git checkout -B 8.19 origin/8.x`\n- Start Kibana:\n - `nvm use && yarn
kbn bootstrap && yarn start`\n- Start ES\n- `nvm use && yarn es snapshot
--license trial
--E\npath.data=\"/Users/sorenlouv/elastic/kbn_es_data/upgrade_testing\"`\n-
Install Knowledge base\n- Try adding an item to KB (it should fail
️)\n\n**9.1.0**\n- `gh pr checkout 210386`\n- Start Kibana:\n - `nvm
use && yarn kbn bootstrap && yarn start`\n- Start ES\n- `nvm use && yarn
es snapshot --license trial
--E\npath.data=\"/Users/sorenlouv/elastic/kbn_es_data/upgrade_testing\"`\n-
Try adding an item to KB (it should succeed ️)\n\n**TODO:**\n\n- Add an
upgrade test that covers this flow\n\n---------\n\nCo-authored-by:
Viduni Wickramarachchi
<viduni.ushanka@gmail.com>","sha":"df67a09afab22521dfa9ff3ec3a4f624a039c462"}}]}]
BACKPORT-->

Co-authored-by: Søren Louv-Jansen <soren.louv@elastic.co>
This commit is contained in:
Kibana Machine 2025-02-18 07:20:12 +11:00 committed by GitHub
parent b408e8c585
commit bac6d90d65
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 667 additions and 228 deletions

View file

@ -11,6 +11,7 @@ export const config = schema.object({
enabled: schema.boolean({ defaultValue: true }),
scope: schema.maybe(schema.oneOf([schema.literal('observability'), schema.literal('search')])),
enableKnowledgeBase: schema.boolean({ defaultValue: true }),
disableKbSemanticTextMigration: schema.boolean({ defaultValue: false }),
});
export type ObservabilityAIAssistantConfig = TypeOf<typeof config>;

View file

@ -30,7 +30,8 @@ import { registerFunctions } from './functions';
import { recallRankingEvent } from './analytics/recall_ranking';
import { initLangtrace } from './service/client/instrumentation/init_langtrace';
import { aiAssistantCapabilities } from '../common/capabilities';
import { registerMigrateKnowledgeBaseEntriesTask } from './service/task_manager_definitions/register_migrate_knowledge_base_entries_task';
import { registerAndScheduleKbSemanticTextMigrationTask } from './service/task_manager_definitions/register_kb_semantic_text_migration_task';
import { updateExistingIndexAssets } from './service/create_or_update_index_assets';
export class ObservabilityAIAssistantPlugin
implements
@ -123,14 +124,22 @@ export class ObservabilityAIAssistantPlugin
config: this.config,
}));
registerMigrateKnowledgeBaseEntriesTask({
// Update existing index assets (mappings, templates, etc). This will not create assets if they do not exist.
updateExistingIndexAssets({ logger: this.logger.get('index_assets'), core }).catch((e) =>
this.logger.error(`Index assets could not be updated: ${e.message}`)
);
// register task to migrate knowledge base entries to include semantic_text field
registerAndScheduleKbSemanticTextMigrationTask({
core,
taskManager: plugins.taskManager,
logger: this.logger,
logger: this.logger.get('kb_semantic_text_migration_task'),
config: this.config,
}).catch((e) => {
this.logger.error(`Knowledge base migration was not successfully: ${e.message}`);
});
}).catch((e) =>
this.logger.error(
`Knowledge base semantic_text migration task could not be registered: ${e.message}`
)
);
service.register(registerFunctions);

View file

@ -10,9 +10,11 @@ import { connectorRoutes } from './connectors/route';
import { conversationRoutes } from './conversations/route';
import { functionRoutes } from './functions/route';
import { knowledgeBaseRoutes } from './knowledge_base/route';
import { topLevelRoutes } from './top_level/route';
export function getGlobalObservabilityAIAssistantServerRouteRepository() {
return {
...topLevelRoutes,
...chatRoutes,
...conversationRoutes,
...connectorRoutes,

View file

@ -101,7 +101,7 @@ const resetKnowledgeBase = createObservabilityAIAssistantServerRoute({
});
const semanticTextMigrationKnowledgeBase = createObservabilityAIAssistantServerRoute({
endpoint: 'POST /internal/observability_ai_assistant/kb/semantic_text_migration',
endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text',
security: {
authz: {
requiredPrivileges: ['ai_assistant'],
@ -114,7 +114,7 @@ const semanticTextMigrationKnowledgeBase = createObservabilityAIAssistantServerR
throw notImplemented();
}
return client.migrateKnowledgeBaseToSemanticText();
return client.reIndexKnowledgeBaseAndPopulateSemanticTextField();
},
});

View file

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { createOrUpdateIndexAssets } from '../../service/create_or_update_index_assets';
import { createObservabilityAIAssistantServerRoute } from '../create_observability_ai_assistant_server_route';
const createOrUpdateIndexAssetsRoute = createObservabilityAIAssistantServerRoute({
endpoint: 'POST /internal/observability_ai_assistant/index_assets',
security: {
authz: {
requiredPrivileges: ['ai_assistant'],
},
},
handler: async (resources): Promise<void> => {
return createOrUpdateIndexAssets({
logger: resources.logger,
core: resources.plugins.core.setup,
});
},
});
export const topLevelRoutes = {
...createOrUpdateIndexAssetsRoute,
};

View file

@ -75,9 +75,9 @@ import { extractTokenCount } from './operators/extract_token_count';
import { getGeneratedTitle } from './operators/get_generated_title';
import { instrumentAndCountTokens } from './operators/instrument_and_count_tokens';
import {
runSemanticTextKnowledgeBaseMigration,
scheduleSemanticTextMigration,
} from '../task_manager_definitions/register_migrate_knowledge_base_entries_task';
reIndexKnowledgeBaseAndPopulateSemanticTextField,
scheduleKbSemanticTextMigrationTask,
} from '../task_manager_definitions/register_kb_semantic_text_migration_task';
import { ObservabilityAIAssistantPluginStartDependencies } from '../../types';
import { ObservabilityAIAssistantConfig } from '../../config';
import { getElserModelId } from '../knowledge_base_service/get_elser_model_id';
@ -688,12 +688,11 @@ export class ObservabilityAIAssistantClient {
core
.getStartServices()
.then(([_, pluginsStart]) => {
logger.debug('Schedule semantic text migration task');
return scheduleSemanticTextMigration(pluginsStart);
})
.then(([_, pluginsStart]) =>
scheduleKbSemanticTextMigrationTask({ taskManager: pluginsStart.taskManager, logger })
)
.catch((error) => {
logger.error(`Failed to run semantic text migration task: ${error}`);
logger.error(`Failed to schedule semantic text migration task: ${error}`);
});
return res;
@ -704,8 +703,8 @@ export class ObservabilityAIAssistantClient {
return this.dependencies.knowledgeBaseService.reset(esClient);
};
migrateKnowledgeBaseToSemanticText = () => {
return runSemanticTextKnowledgeBaseMigration({
reIndexKnowledgeBaseAndPopulateSemanticTextField = () => {
return reIndexKnowledgeBaseAndPopulateSemanticTextField({
esClient: this.dependencies.esClient,
logger: this.dependencies.logger,
config: this.dependencies.config,

View file

@ -6,13 +6,39 @@
*/
import { createConcreteWriteIndex, getDataStreamAdapter } from '@kbn/alerting-plugin/server';
import type { CoreSetup, Logger } from '@kbn/core/server';
import type { CoreSetup, ElasticsearchClient, Logger } from '@kbn/core/server';
import type { ObservabilityAIAssistantPluginStartDependencies } from '../types';
import { conversationComponentTemplate } from './conversation_component_template';
import { kbComponentTemplate } from './kb_component_template';
import { resourceNames } from '.';
export async function setupConversationAndKbIndexAssets({
export async function updateExistingIndexAssets({
logger,
core,
}: {
logger: Logger;
core: CoreSetup<ObservabilityAIAssistantPluginStartDependencies>;
}) {
const [coreStart] = await core.getStartServices();
const { asInternalUser } = coreStart.elasticsearch.client;
const hasKbIndex = await asInternalUser.indices.exists({
index: resourceNames.aliases.kb,
});
const hasConversationIndex = await asInternalUser.indices.exists({
index: resourceNames.aliases.conversations,
});
if (!hasKbIndex && !hasConversationIndex) {
logger.debug('Index assets do not exist. Aborting updating index assets');
return;
}
await createOrUpdateIndexAssets({ logger, core });
}
export async function createOrUpdateIndexAssets({
logger,
core,
}: {
@ -56,7 +82,7 @@ export async function setupConversationAndKbIndexAssets({
alias: conversationAliasName,
pattern: `${conversationAliasName}*`,
basePattern: `${conversationAliasName}*`,
name: `${conversationAliasName}-000001`,
name: resourceNames.concreteIndexName.conversations,
template: resourceNames.indexTemplate.conversations,
},
dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts: false }),
@ -86,20 +112,7 @@ export async function setupConversationAndKbIndexAssets({
});
// Knowledge base: write index
const kbAliasName = resourceNames.aliases.kb;
await createConcreteWriteIndex({
esClient: asInternalUser,
logger,
totalFieldsLimit: 10000,
indexPatterns: {
alias: kbAliasName,
pattern: `${kbAliasName}*`,
basePattern: `${kbAliasName}*`,
name: `${kbAliasName}-000001`,
template: resourceNames.indexTemplate.kb,
},
dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts: false }),
});
await createKbConcreteIndex({ logger, esClient: coreStart.elasticsearch.client });
logger.info('Successfully set up index assets');
} catch (error) {
@ -107,3 +120,28 @@ export async function setupConversationAndKbIndexAssets({
logger.debug(error);
}
}
export async function createKbConcreteIndex({
logger,
esClient,
}: {
logger: Logger;
esClient: {
asInternalUser: ElasticsearchClient;
};
}) {
const kbAliasName = resourceNames.aliases.kb;
return createConcreteWriteIndex({
esClient: esClient.asInternalUser,
logger,
totalFieldsLimit: 10000,
indexPatterns: {
alias: kbAliasName,
pattern: `${kbAliasName}*`,
basePattern: `${kbAliasName}*`,
name: resourceNames.concreteIndexName.kb,
template: resourceNames.indexTemplate.kb,
},
dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts: false }),
});
}

View file

@ -17,7 +17,7 @@ import { ObservabilityAIAssistantClient } from './client';
import { KnowledgeBaseService } from './knowledge_base_service';
import type { RegistrationCallback, RespondFunctionResources } from './types';
import { ObservabilityAIAssistantConfig } from '../config';
import { setupConversationAndKbIndexAssets } from './setup_conversation_and_kb_index_assets';
import { createOrUpdateIndexAssets } from './create_or_update_index_assets';
function getResourceName(resource: string) {
return `.kibana-observability-ai-assistant-${resource}`;
@ -40,11 +40,15 @@ export const resourceNames = {
conversations: getResourceName('index-template-conversations'),
kb: getResourceName('index-template-kb'),
},
concreteIndexName: {
conversations: getResourceName('conversations-000001'),
kb: getResourceName('kb-000001'),
},
};
const createIndexAssetsOnce = once(
(logger: Logger, core: CoreSetup<ObservabilityAIAssistantPluginStartDependencies>) =>
pRetry(() => setupConversationAndKbIndexAssets({ logger, core }))
pRetry(() => createOrUpdateIndexAssets({ logger, core }))
);
export class ObservabilityAIAssistantService {

View file

@ -28,6 +28,11 @@ import {
import { recallFromSearchConnectors } from './recall_from_search_connectors';
import { ObservabilityAIAssistantPluginStartDependencies } from '../../types';
import { ObservabilityAIAssistantConfig } from '../../config';
import {
isKnowledgeBaseIndexWriteBlocked,
isSemanticTextUnsupportedError,
} from './reindex_knowledge_base';
import { scheduleKbSemanticTextMigrationTask } from '../task_manager_definitions/register_kb_semantic_text_migration_task';
interface Dependencies {
core: CoreSetup<ObservabilityAIAssistantPluginStartDependencies>;
@ -406,7 +411,9 @@ export class KnowledgeBaseService {
}
try {
await this.dependencies.esClient.asInternalUser.index({
await this.dependencies.esClient.asInternalUser.index<
Omit<KnowledgeBaseEntry, 'id'> & { namespace: string }
>({
index: resourceNames.aliases.kb,
id,
document: {
@ -418,10 +425,40 @@ export class KnowledgeBaseService {
},
refresh: 'wait_for',
});
this.dependencies.logger.debug(`Entry added to knowledge base`);
} catch (error) {
this.dependencies.logger.debug(`Failed to add entry to knowledge base ${error}`);
if (isInferenceEndpointMissingOrUnavailable(error)) {
throwKnowledgeBaseNotReady(error.body);
}
if (isSemanticTextUnsupportedError(error)) {
this.dependencies.core
.getStartServices()
.then(([_, pluginsStart]) => {
return scheduleKbSemanticTextMigrationTask({
taskManager: pluginsStart.taskManager,
logger: this.dependencies.logger,
runSoon: true,
});
})
.catch((e) => {
this.dependencies.logger.error(
`Failed to schedule knowledge base semantic text migration task: ${e}`
);
});
throw serverUnavailable(
'The knowledge base is currently being re-indexed. Please try again later'
);
}
if (isKnowledgeBaseIndexWriteBlocked(error)) {
throw new Error(
`Writes to the knowledge base are currently blocked due to an Elasticsearch write index block. This is most likely due to an ongoing re-indexing operation. Please try again later. Error: ${error.message}`
);
}
throw error;
}
};

View file

@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { errors as EsErrors } from '@elastic/elasticsearch';
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import { resourceNames } from '..';
import { createKbConcreteIndex } from '../create_or_update_index_assets';
export async function reIndexKnowledgeBase({
logger,
esClient,
}: {
logger: Logger;
esClient: {
asInternalUser: ElasticsearchClient;
};
}): Promise<void> {
logger.debug('Initiating knowledge base re-indexing...');
try {
const originalIndex = resourceNames.concreteIndexName.kb;
const tempIndex = `${resourceNames.aliases.kb}-000002`;
const indexSettingsResponse = await esClient.asInternalUser.indices.getSettings({
index: originalIndex,
});
const indexSettings = indexSettingsResponse[originalIndex].settings;
const createdVersion = parseInt(indexSettings?.index?.version?.created ?? '', 10);
// Check if the index was created before version 8.11
const versionThreshold = 8110000; // Version 8.11.0
if (createdVersion >= versionThreshold) {
logger.warn(
`Knowledge base index "${originalIndex}" was created in version ${createdVersion}, and does not require re-indexing. Semantic text field is already supported. Aborting`
);
return;
}
logger.info(
`Knowledge base index was created in ${createdVersion} and must be re-indexed in order to support semantic_text field. Re-indexing now...`
);
// Create temporary index
logger.debug(`Creating temporary index "${tempIndex}"...`);
await esClient.asInternalUser.indices.delete({ index: tempIndex }, { ignore: [404] });
await esClient.asInternalUser.indices.create({ index: tempIndex });
// Perform reindex to temporary index
logger.debug(`Re-indexing knowledge base to temporary index "${tempIndex}"...`);
await esClient.asInternalUser.reindex({
body: {
source: { index: originalIndex },
dest: { index: tempIndex },
},
refresh: true,
wait_for_completion: true,
});
// Delete and re-create original index
logger.debug(`Deleting original index "${originalIndex}" and re-creating it...`);
await esClient.asInternalUser.indices.delete({ index: originalIndex });
await createKbConcreteIndex({ logger, esClient });
// Perform reindex back to original index
logger.debug(`Re-indexing knowledge base back to original index "${originalIndex}"...`);
await esClient.asInternalUser.reindex({
body: {
source: { index: tempIndex },
dest: { index: originalIndex },
},
refresh: true,
wait_for_completion: true,
});
// Delete temporary index
logger.debug(`Deleting temporary index "${tempIndex}"...`);
await esClient.asInternalUser.indices.delete({ index: tempIndex });
logger.info(
'Re-indexing knowledge base completed successfully. Semantic text field is now supported.'
);
} catch (error) {
throw new Error(`Failed to reindex knowledge base: ${error.message}`);
}
}
export function isKnowledgeBaseIndexWriteBlocked(error: any) {
return (
error instanceof EsErrors.ResponseError &&
error.message.includes(
`cluster_block_exception: index [${resourceNames.concreteIndexName.kb}] blocked`
)
);
}
export function isSemanticTextUnsupportedError(error: Error) {
const semanticTextUnsupportedError =
'The [sparse_vector] field type is not supported on indices created on versions 8.0 to 8.10';
const isSemanticTextUnspported =
error instanceof EsErrors.ResponseError &&
(error.message.includes(semanticTextUnsupportedError) ||
// @ts-expect-error
error.meta?.body?.error?.caused_by?.reason.includes(semanticTextUnsupportedError));
return isSemanticTextUnspported;
}

View file

@ -0,0 +1,228 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import pLimit from 'p-limit';
import { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import type { CoreSetup, CoreStart, Logger } from '@kbn/core/server';
import pRetry from 'p-retry';
import { KnowledgeBaseEntry } from '../../../common';
import { resourceNames } from '..';
import { getElserModelStatus } from '../inference_endpoint';
import { ObservabilityAIAssistantPluginStartDependencies } from '../../types';
import { ObservabilityAIAssistantConfig } from '../../config';
import { reIndexKnowledgeBase } from '../knowledge_base_service/reindex_knowledge_base';
const TASK_ID = 'obs-ai-assistant:knowledge-base-migration-task-id';
const TASK_TYPE = 'obs-ai-assistant:knowledge-base-migration';
// This task will re-index all knowledge base entries without `semantic_text` field
// to ensure the field is populated with the correct embeddings.
// After the migration we will no longer need to use the `ml.tokens` field.
export async function registerAndScheduleKbSemanticTextMigrationTask({
taskManager,
logger,
core,
config,
}: {
taskManager: TaskManagerSetupContract;
logger: Logger;
core: CoreSetup<ObservabilityAIAssistantPluginStartDependencies>;
config: ObservabilityAIAssistantConfig;
}) {
const [coreStart, pluginsStart] = await core.getStartServices();
// register task
registerKbSemanticTextMigrationTask({ taskManager, logger, coreStart, config });
// schedule task
await scheduleKbSemanticTextMigrationTask({ taskManager: pluginsStart.taskManager, logger });
}
function registerKbSemanticTextMigrationTask({
taskManager,
logger,
coreStart,
config,
}: {
taskManager: TaskManagerSetupContract;
logger: Logger;
coreStart: CoreStart;
config: ObservabilityAIAssistantConfig;
}) {
try {
logger.debug(`Register task "${TASK_TYPE}"`);
taskManager.registerTaskDefinitions({
[TASK_TYPE]: {
title: 'Add support for semantic_text in Knowledge Base',
description: `This task will reindex the knowledge base and populate the semantic_text fields for all entries without it.`,
timeout: '1h',
maxAttempts: 5,
createTaskRunner() {
return {
async run() {
logger.debug(`Run task: "${TASK_TYPE}"`);
const esClient = coreStart.elasticsearch.client;
const hasKbIndex = await esClient.asInternalUser.indices.exists({
index: resourceNames.aliases.kb,
});
if (!hasKbIndex) {
logger.debug('Knowledge base index does not exist. Skipping migration.');
return;
}
if (config.disableKbSemanticTextMigration) {
logger.info(
'Semantic text migration is disabled via config "xpack.observabilityAIAssistant.disableKbSemanticTextMigration=true". Skipping migration.'
);
return;
}
await reIndexKnowledgeBaseAndPopulateSemanticTextField({ esClient, logger, config });
},
};
},
},
});
} catch (error) {
logger.error(`Failed to register task "${TASK_TYPE}". Error: ${error}`);
}
}
export async function scheduleKbSemanticTextMigrationTask({
taskManager,
logger,
runSoon = false,
}: {
taskManager: ObservabilityAIAssistantPluginStartDependencies['taskManager'];
logger: Logger;
runSoon?: boolean;
}) {
logger.debug('Schedule migration task');
await taskManager.ensureScheduled({
id: TASK_ID,
taskType: TASK_TYPE,
scope: ['aiAssistant'],
params: {},
state: {},
});
if (runSoon) {
logger.debug('Run migration task soon');
await taskManager.runSoon(TASK_ID);
}
}
export async function reIndexKnowledgeBaseAndPopulateSemanticTextField({
esClient,
logger,
config,
}: {
esClient: { asInternalUser: ElasticsearchClient };
logger: Logger;
config: ObservabilityAIAssistantConfig;
}) {
logger.debug('Starting migration...');
try {
await reIndexKnowledgeBase({ logger, esClient });
await populateSemanticTextFieldRecursively({ esClient, logger, config });
} catch (e) {
logger.error(`Migration failed: ${e.message}`);
}
logger.debug('Migration succeeded');
}
async function populateSemanticTextFieldRecursively({
esClient,
logger,
config,
}: {
esClient: { asInternalUser: ElasticsearchClient };
logger: Logger;
config: ObservabilityAIAssistantConfig;
}) {
logger.debug('Populating semantic_text field for entries without it');
const response = await esClient.asInternalUser.search<KnowledgeBaseEntry>({
size: 100,
track_total_hits: true,
index: [resourceNames.aliases.kb],
query: {
bool: {
must_not: {
exists: {
field: 'semantic_text',
},
},
},
},
_source: {
excludes: ['ml.tokens'],
},
});
if (response.hits.hits.length === 0) {
logger.debug('No remaining entries to migrate');
return;
}
logger.debug(`Found ${response.hits.hits.length} entries to migrate`);
await waitForModel({ esClient, logger, config });
// Limit the number of concurrent requests to avoid overloading the cluster
const limiter = pLimit(10);
const promises = response.hits.hits.map((hit) => {
return limiter(() => {
if (!hit._source || !hit._id) {
return;
}
return esClient.asInternalUser.update({
refresh: 'wait_for',
index: resourceNames.aliases.kb,
id: hit._id,
body: {
doc: {
...hit._source,
semantic_text: hit._source.text,
},
},
});
});
});
await Promise.all(promises);
logger.debug(`Populated ${promises.length} entries`);
await populateSemanticTextFieldRecursively({ esClient, logger, config });
}
async function waitForModel({
esClient,
logger,
config,
}: {
esClient: { asInternalUser: ElasticsearchClient };
logger: Logger;
config: ObservabilityAIAssistantConfig;
}) {
return pRetry(
async () => {
const { ready } = await getElserModelStatus({ esClient, logger, config });
if (!ready) {
logger.debug('Elser model is not yet ready. Retrying...');
throw new Error('Elser model is not yet ready');
}
},
{ retries: 30, factor: 2, maxTimeout: 30_000 }
);
}

View file

@ -1,186 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import pLimit from 'p-limit';
import { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import type { CoreSetup, Logger } from '@kbn/core/server';
import pRetry from 'p-retry';
import { KnowledgeBaseEntry } from '../../../common';
import { resourceNames } from '..';
import { getElserModelStatus } from '../inference_endpoint';
import { ObservabilityAIAssistantPluginStartDependencies } from '../../types';
import { ObservabilityAIAssistantConfig } from '../../config';
import { setupConversationAndKbIndexAssets } from '../setup_conversation_and_kb_index_assets';
const TASK_ID = 'obs-ai-assistant:knowledge-base-migration-task-id';
const TASK_TYPE = 'obs-ai-assistant:knowledge-base-migration';
// This task will re-index all knowledge base entries without `semantic_text` field
// to ensure the field is populated with the correct embeddings.
// After the migration we will no longer need to use the `ml.tokens` field.
export async function registerMigrateKnowledgeBaseEntriesTask({
taskManager,
logger,
core,
config,
}: {
taskManager: TaskManagerSetupContract;
logger: Logger;
core: CoreSetup<ObservabilityAIAssistantPluginStartDependencies>;
config: ObservabilityAIAssistantConfig;
}) {
const [coreStart, pluginsStart] = await core.getStartServices();
try {
logger.debug(`Register task "${TASK_TYPE}"`);
taskManager.registerTaskDefinitions({
[TASK_TYPE]: {
title: 'Migrate AI Assistant Knowledge Base',
description: `Migrates AI Assistant knowledge base entries`,
timeout: '1h',
maxAttempts: 5,
createTaskRunner() {
return {
async run() {
logger.debug(`Run task: "${TASK_TYPE}"`);
const esClient = coreStart.elasticsearch.client;
const hasKbIndex = await esClient.asInternalUser.indices.exists({
index: resourceNames.aliases.kb,
});
if (!hasKbIndex) {
logger.debug(
'Knowledge base index does not exist. Skipping semantic text migration.'
);
return;
}
// update fields and mappings
await setupConversationAndKbIndexAssets({ logger, core });
// run migration
await runSemanticTextKnowledgeBaseMigration({ esClient, logger, config });
},
};
},
},
});
} catch (error) {
logger.error(`Failed to register task "${TASK_TYPE}". Error: ${error}`);
}
try {
logger.debug(`Scheduled task: "${TASK_TYPE}"`);
await scheduleSemanticTextMigration(pluginsStart);
} catch (error) {
logger.error(`Failed to schedule task "${TASK_TYPE}". Error: ${error}`);
}
}
export function scheduleSemanticTextMigration(
pluginsStart: ObservabilityAIAssistantPluginStartDependencies
) {
return pluginsStart.taskManager.ensureScheduled({
id: TASK_ID,
taskType: TASK_TYPE,
scope: ['aiAssistant'],
params: {},
state: {},
});
}
export async function runSemanticTextKnowledgeBaseMigration({
esClient,
logger,
config,
}: {
esClient: { asInternalUser: ElasticsearchClient };
logger: Logger;
config: ObservabilityAIAssistantConfig;
}) {
logger.debug('Knowledge base migration: Running migration');
try {
const response = await esClient.asInternalUser.search<KnowledgeBaseEntry>({
size: 100,
track_total_hits: true,
index: [resourceNames.aliases.kb],
query: {
bool: {
must_not: {
exists: {
field: 'semantic_text',
},
},
},
},
_source: {
excludes: ['ml.tokens'],
},
});
if (response.hits.hits.length === 0) {
logger.debug('Knowledge base migration: No remaining entries to migrate');
return;
}
logger.debug(`Knowledge base migration: Found ${response.hits.hits.length} entries to migrate`);
await waitForModel({ esClient, logger, config });
// Limit the number of concurrent requests to avoid overloading the cluster
const limiter = pLimit(10);
const promises = response.hits.hits.map((hit) => {
return limiter(() => {
if (!hit._source || !hit._id) {
return;
}
return esClient.asInternalUser.update({
refresh: 'wait_for',
index: resourceNames.aliases.kb,
id: hit._id,
body: {
doc: {
...hit._source,
semantic_text: hit._source.text,
},
},
});
});
});
await Promise.all(promises);
logger.debug(`Knowledge base migration: Migrated ${promises.length} entries`);
await runSemanticTextKnowledgeBaseMigration({ esClient, logger, config });
} catch (e) {
logger.error(`Knowledge base migration failed: ${e.message}`);
}
}
async function waitForModel({
esClient,
logger,
config,
}: {
esClient: { asInternalUser: ElasticsearchClient };
logger: Logger;
config: ObservabilityAIAssistantConfig;
}) {
return pRetry(
async () => {
const { ready } = await getElserModelStatus({ esClient, logger, config });
if (!ready) {
logger.debug('Elser model is not yet ready. Retrying...');
throw new Error('Elser model is not yet ready');
}
},
{ retries: 30, factor: 2, maxTimeout: 30_000 }
);
}

View file

@ -38,7 +38,7 @@ export async function getApiIntegrationConfig({ readConfigFile }: FtrConfigProvi
serverArgs: [
...xPackFunctionalTestsConfig.get('esTestCluster.serverArgs'),
'node.attr.name=apiIntegrationTestNode',
'path.repo=/tmp/repo,/tmp/repo_1,/tmp/repo_2,/tmp/cloud-snapshots/',
`path.repo=/tmp/repo,/tmp/repo_1,/tmp/repo_2,/tmp/cloud-snapshots/`,
],
},
};

View file

@ -0,0 +1,2 @@
# unzipped snapshot folder
knowledge_base/snapshot_kb_8.10/

View file

@ -20,7 +20,10 @@ export default function aiAssistantApiIntegrationTests({
loadTestFile(require.resolve('./complete/functions/summarize.spec.ts'));
loadTestFile(require.resolve('./public_complete/public_complete.spec.ts'));
loadTestFile(require.resolve('./knowledge_base/knowledge_base_setup.spec.ts'));
loadTestFile(require.resolve('./knowledge_base/knowledge_base_migration.spec.ts'));
loadTestFile(
require.resolve('./knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts')
);
loadTestFile(require.resolve('./knowledge_base/knowledge_base_reindex.spec.ts'));
loadTestFile(require.resolve('./knowledge_base/knowledge_base_status.spec.ts'));
loadTestFile(require.resolve('./knowledge_base/knowledge_base.spec.ts'));
loadTestFile(require.resolve('./knowledge_base/knowledge_base_user_instructions.spec.ts'));

View file

@ -98,7 +98,7 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon
describe('after migrating', () => {
before(async () => {
const { status } = await observabilityAIAssistantAPIClient.editor({
endpoint: 'POST /internal/observability_ai_assistant/kb/semantic_text_migration',
endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text',
});
expect(status).to.be(200);
});
@ -137,7 +137,7 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon
it('returns entries correctly via API', async () => {
const { status } = await observabilityAIAssistantAPIClient.editor({
endpoint: 'POST /internal/observability_ai_assistant/kb/semantic_text_migration',
endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text',
});
expect(status).to.be(200);

View file

@ -0,0 +1,154 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { resourceNames } from '@kbn/observability-ai-assistant-plugin/server/service';
import AdmZip from 'adm-zip';
import path from 'path';
import { AI_ASSISTANT_SNAPSHOT_REPO_PATH } from '../../../../default_configs/stateful.config.base';
import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context';
import {
deleteKnowledgeBaseModel,
importTinyElserModel,
deleteInferenceEndpoint,
setupKnowledgeBase,
waitForKnowledgeBaseReady,
} from './helpers';
export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderContext) {
const observabilityAIAssistantAPIClient = getService('observabilityAIAssistantApi');
const es = getService('es');
const ml = getService('ml');
const retry = getService('retry');
const log = getService('log');
describe('when the knowledge base index was created before 8.11', function () {
// Intentionally skipped in all serverless environnments (local and MKI)
// because the migration scenario being tested is not relevant to MKI and Serverless.
this.tags(['skipServerless']);
before(async () => {
const zipFilePath = `${AI_ASSISTANT_SNAPSHOT_REPO_PATH}.zip`;
log.debug(`Unzipping ${zipFilePath} to ${AI_ASSISTANT_SNAPSHOT_REPO_PATH}`);
new AdmZip(zipFilePath).extractAllTo(path.dirname(AI_ASSISTANT_SNAPSHOT_REPO_PATH), true);
await importTinyElserModel(ml);
await setupKnowledgeBase(observabilityAIAssistantAPIClient);
await waitForKnowledgeBaseReady({ observabilityAIAssistantAPIClient, log, retry });
});
beforeEach(async () => {
await deleteKbIndex();
await restoreKbSnapshot();
await createOrUpdateIndexAssets();
});
after(async () => {
await deleteKbIndex();
await createOrUpdateIndexAssets();
await deleteKnowledgeBaseModel(ml);
await deleteInferenceEndpoint({ es });
});
it('has an index created version earlier than 8.11', async () => {
await retry.try(async () => {
expect(await getKbIndexCreatedVersion()).to.be.lessThan(8110000);
});
});
function createKnowledgeBaseEntry() {
const knowledgeBaseEntry = {
id: 'my-doc-id-1',
title: 'My title',
text: 'My content',
};
return observabilityAIAssistantAPIClient.editor({
endpoint: 'POST /internal/observability_ai_assistant/kb/entries/save',
params: { body: knowledgeBaseEntry },
});
}
it('cannot add new entries to KB', async () => {
const { status, body } = await createKnowledgeBaseEntry();
// @ts-expect-error
expect(body.message).to.eql(
'The knowledge base is currently being re-indexed. Please try again later'
);
expect(status).to.be(503);
});
it('can add new entries after re-indexing', async () => {
await runKbSemanticTextMigration();
await retry.try(async () => {
const { status } = await createKnowledgeBaseEntry();
expect(status).to.be(200);
});
});
});
async function getKbIndexCreatedVersion() {
const indexSettings = await es.indices.getSettings({
index: resourceNames.concreteIndexName.kb,
});
const { settings } = Object.values(indexSettings)[0];
return parseInt(settings?.index?.version?.created ?? '', 10);
}
async function deleteKbIndex() {
log.debug('Deleting KB index');
await es.indices.delete(
{ index: resourceNames.concreteIndexName.kb, ignore_unavailable: true },
{ ignore: [404] }
);
}
async function restoreKbSnapshot() {
log.debug(
`Restoring snapshot of ${resourceNames.concreteIndexName.kb} from ${AI_ASSISTANT_SNAPSHOT_REPO_PATH}`
);
const snapshotRepoName = 'snapshot-repo-8-10';
const snapshotName = 'my_snapshot';
await es.snapshot.createRepository({
name: snapshotRepoName,
repository: {
type: 'fs',
settings: { location: AI_ASSISTANT_SNAPSHOT_REPO_PATH },
},
});
await es.snapshot.restore({
repository: snapshotRepoName,
snapshot: snapshotName,
wait_for_completion: true,
body: {
indices: resourceNames.concreteIndexName.kb,
},
});
await es.snapshot.deleteRepository({ name: snapshotRepoName });
}
async function createOrUpdateIndexAssets() {
const { status } = await observabilityAIAssistantAPIClient.editor({
endpoint: 'POST /internal/observability_ai_assistant/index_assets',
});
expect(status).to.be(200);
}
async function runKbSemanticTextMigration() {
const { status } = await observabilityAIAssistantAPIClient.editor({
endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text',
});
expect(status).to.be(200);
}
}

View file

@ -34,6 +34,11 @@ interface CreateTestConfigOptions<T extends DeploymentAgnosticCommonServices> {
suiteTags?: { include?: string[]; exclude?: string[] };
}
export const AI_ASSISTANT_SNAPSHOT_REPO_PATH = path.resolve(
REPO_ROOT,
'x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/snapshot_kb_8.10'
);
export function createStatefulTestConfig<T extends DeploymentAgnosticCommonServices>(
options: CreateTestConfigOptions<T>
) {
@ -121,6 +126,7 @@ export function createStatefulTestConfig<T extends DeploymentAgnosticCommonServi
`xpack.security.authc.realms.saml.${MOCK_IDP_REALM_NAME}.attributes.groups=${MOCK_IDP_ATTRIBUTE_ROLES}`,
`xpack.security.authc.realms.saml.${MOCK_IDP_REALM_NAME}.attributes.name=${MOCK_IDP_ATTRIBUTE_NAME}`,
`xpack.security.authc.realms.saml.${MOCK_IDP_REALM_NAME}.attributes.mail=${MOCK_IDP_ATTRIBUTE_EMAIL}`,
`path.repo=${AI_ASSISTANT_SNAPSHOT_REPO_PATH}`,
],
files: [
// Passing the roles that are equivalent to the ones we have in serverless
@ -152,6 +158,7 @@ export function createStatefulTestConfig<T extends DeploymentAgnosticCommonServi
'--xpack.uptime.service.username=localKibanaIntegrationTestsUser',
'--xpack.uptime.service.devUrl=mockDevUrl',
'--xpack.uptime.service.manifestUrl=mockDevUrl',
'--xpack.observabilityAIAssistant.disableKbSemanticTextMigration=true',
],
},
};