[LockManager] Add isLockAcquisitionError helper (#220067)

- Adds a small helper method for handling errors of type
`LockAcquisitionError`.
- Flips retry logic for `populateMissingSemanticTextFieldWithLock ` so
we retry all errors except `LockAcquisitionError`
This commit is contained in:
Søren Louv-Jansen 2025-05-05 23:49:18 +02:00 committed by GitHub
parent 8e2800ca27
commit c33c95e507
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 17 additions and 12 deletions

View file

@ -7,5 +7,5 @@
* License v3.0 only", or the "Server Side Public License, v 1". * License v3.0 only", or the "Server Side Public License, v 1".
*/ */
export { LockAcquisitionError } from './src/lock_manager_client'; export { LockAcquisitionError, isLockAcquisitionError } from './src/lock_manager_client';
export { LockManagerService } from './src/lock_manager_service'; export { LockManagerService } from './src/lock_manager_service';

View file

@ -269,6 +269,10 @@ export async function getLock({
return lockManager.get(); return lockManager.get();
} }
export function isLockAcquisitionError(error: unknown): error is LockAcquisitionError {
return error instanceof LockAcquisitionError;
}
export async function withLock<T>( export async function withLock<T>(
{ {
esClient, esClient,

View file

@ -31,8 +31,7 @@ import { v4 } from 'uuid';
import type { AssistantScope } from '@kbn/ai-assistant-common'; import type { AssistantScope } from '@kbn/ai-assistant-common';
import type { InferenceClient } from '@kbn/inference-plugin/server'; import type { InferenceClient } from '@kbn/inference-plugin/server';
import { ChatCompleteResponse, FunctionCallingMode, ToolChoiceType } from '@kbn/inference-common'; import { ChatCompleteResponse, FunctionCallingMode, ToolChoiceType } from '@kbn/inference-common';
import { isLockAcquisitionError } from '@kbn/lock-manager';
import { LockAcquisitionError } from '@kbn/lock-manager';
import { resourceNames } from '..'; import { resourceNames } from '..';
import { import {
ChatCompletionChunkEvent, ChatCompletionChunkEvent,
@ -730,8 +729,7 @@ export class ObservabilityAIAssistantClient {
}); });
}) })
.catch((e) => { .catch((e) => {
const isLockAcquisitionError = e instanceof LockAcquisitionError; if (isLockAcquisitionError(e)) {
if (isLockAcquisitionError) {
logger.info(e.message); logger.info(e.message);
} else { } else {
logger.error( logger.error(

View file

@ -10,7 +10,7 @@ import type { CoreSetup, ElasticsearchClient, IUiSettingsClient } from '@kbn/cor
import type { Logger } from '@kbn/logging'; import type { Logger } from '@kbn/logging';
import { orderBy } from 'lodash'; import { orderBy } from 'lodash';
import { encode } from 'gpt-tokenizer'; import { encode } from 'gpt-tokenizer';
import { LockAcquisitionError } from '@kbn/lock-manager'; import { isLockAcquisitionError } from '@kbn/lock-manager';
import { resourceNames } from '..'; import { resourceNames } from '..';
import { import {
Instruction, Instruction,
@ -449,7 +449,7 @@ export class KnowledgeBaseService {
esClient: this.dependencies.esClient, esClient: this.dependencies.esClient,
inferenceId, inferenceId,
}).catch((e) => { }).catch((e) => {
if (error instanceof LockAcquisitionError) { if (isLockAcquisitionError(e)) {
this.dependencies.logger.info(`Re-indexing operation is already in progress`); this.dependencies.logger.info(`Re-indexing operation is already in progress`);
return; return;
} }

View file

@ -9,7 +9,7 @@ import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { CoreSetup, Logger } from '@kbn/core/server'; import type { CoreSetup, Logger } from '@kbn/core/server';
import pRetry from 'p-retry'; import pRetry from 'p-retry';
import { errors } from '@elastic/elasticsearch'; import { errors } from '@elastic/elasticsearch';
import { LockAcquisitionError, LockManagerService } from '@kbn/lock-manager'; import { LockManagerService, isLockAcquisitionError } from '@kbn/lock-manager';
import { resourceNames } from '..'; import { resourceNames } from '..';
import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; import { ObservabilityAIAssistantPluginStartDependencies } from '../../types';
import { ObservabilityAIAssistantConfig } from '../../config'; import { ObservabilityAIAssistantConfig } from '../../config';
@ -66,8 +66,10 @@ export async function runStartupMigrations({
retries: 5, retries: 5,
minTimeout: 10_000, minTimeout: 10_000,
onFailedAttempt: async (error) => { onFailedAttempt: async (error) => {
const isLockAcquisitionError = error instanceof LockAcquisitionError; // if the error is a LockAcquisitionError the operation is already in progress and we should not retry
if (!isLockAcquisitionError) { // for other errors we should retry
// throwing the error will cause pRetry to abort all retries
if (isLockAcquisitionError(error)) {
throw error; throw error;
} }
}, },
@ -75,10 +77,11 @@ export async function runStartupMigrations({
); );
}) })
.catch((error) => { .catch((error) => {
const isLockAcquisitionError = error instanceof LockAcquisitionError; // we should propogate the error if it is not a LockAcquisitionError
if (!isLockAcquisitionError) { if (!isLockAcquisitionError(error)) {
throw error; throw error;
} }
logger.info('Startup migrations are already in progress. Aborting startup migrations');
}); });
} }