Use bulk helper for bulk importing knowledge base entries (#223526)

## Summary

Closes https://github.com/elastic/kibana/issues/223501

* Remove pRetry from `route.ts`. We don't want to retry the whole
bulkImport operation, just the failed documents
* Use [bulk helper
](https://www.elastic.co/docs/reference/elasticsearch/clients/javascript/client-helpers#bulk-helper)
to ingest entries
* concurrency is set to 5 (default), retries is set to 5 (default is
client max retries). On failure (after 5 retries) throws an error.

### Checklist

Check the PR satisfies following conditions. 

Reviewers should verify this PR satisfies this list as well.

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [x] The PR description includes the appropriate Release Notes section,
and the correct `release_note:*` label is applied per the
[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

* I tested it manually with 5000 entries and it worked.
This commit is contained in:
Eleonora 2025-06-24 15:16:27 +01:00 committed by GitHub
parent 12590c9a8a
commit d4b0293d61
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 26 additions and 20 deletions

View file

@ -12,7 +12,6 @@ import {
MlTrainedModelStats,
} from '@elastic/elasticsearch/lib/api/types';
import { InferenceAPIConfigResponse } from '@kbn/ml-trained-models-utils';
import pRetry from 'p-retry';
import { createObservabilityAIAssistantServerRoute } from '../create_observability_ai_assistant_server_route';
import {
Instruction,
@ -296,7 +295,7 @@ const importKnowledgeBaseEntries = createObservabilityAIAssistantServerRoute({
...entry,
}));
await pRetry(() => client.addKnowledgeBaseBulkEntries({ entries }), { retries: 10 });
await client.addKnowledgeBaseBulkEntries({ entries });
resources.logger.info(`Imported ${entries.length} knowledge base entries`);
},

View file

@ -462,31 +462,38 @@ export class KnowledgeBaseService {
}
try {
const bulkBody = entries.flatMap((entry) => [
{ index: { _index: resourceNames.writeIndexAlias.kb, _id: entry.id } },
{
'@timestamp': new Date().toISOString(),
...entry,
...(entry.text ? { semantic_text: entry.text } : {}),
user,
namespace,
const result = await this.dependencies.esClient.asInternalUser.helpers.bulk({
onDocument(doc) {
return [
{ index: { _index: resourceNames.writeIndexAlias.kb, _id: doc.id } },
{
'@timestamp': new Date().toISOString(),
...doc,
...(doc.text ? { semantic_text: doc.text } : {}),
user,
namespace,
},
];
},
]);
const bulkResult = await this.dependencies.esClient.asInternalUser.bulk({
onDrop: (doc) => {
this.dependencies.logger.error(
`Failed ingesting document: ${doc.error?.reason || 'unknown reason'}`
);
},
datasource: entries,
refresh: 'wait_for',
body: bulkBody,
concurrency: 3,
flushBytes: 100 * 1024,
flushInterval: 1000,
retries: 5,
});
if (bulkResult.errors) {
const errorMessages = bulkResult.items
.filter((item: any) => item.index?.error)
.map((item: any) => item.index?.error?.reason);
throw new Error(`Indexing failed: ${errorMessages.join(', ')}`);
if (result.failed > 0) {
throw Error(`Failed ingesting ${result.failed} documents.`);
}
this.dependencies.logger.debug(
`Successfully added ${entries.length} entries to the knowledge base`
`Successfully added ${result.successful} entries to the knowledge base`
);
} catch (error) {
this.dependencies.logger.error(`Failed to add entries to the knowledge base: ${error}`);