[LockManager] Expose as package (#219220)

Expose LockManager as package to make it easier to consume from other
plugins

cc @nchaulet

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Viduni Wickramarachchi <viduni.ushanka@gmail.com>
This commit is contained in:
Søren Louv-Jansen 2025-04-29 18:42:45 +02:00 committed by GitHub
parent 3d9eda9933
commit 8b8d569986
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 205 additions and 30 deletions

View file

@ -0,0 +1,54 @@
# Kibana Lock Manager
A simple, distributed lock manager built on top of Elasticsearch.
Ensures that only one process at a time can hold a named lock, with automatic lease renewal and token fencing for safe release.
# API Documentation
## `withLock<T>(lockId, callback, options)`
Acquires a lock and executes the provided callback. If the lock is already held by another process, the method will throw a `LockAcquisitionError` and the callback will not be executed. When the callback returns the lock is released.
### Parameters
- **`lockId`** (`string`): Unique identifier for the lock
- **`callback`** (`() => Promise<T>`): Asynchronous function to execute once the lock is acquired. This function will be executed only if the lock acquisition succeeds.
- **`options`** (`object`, optional): Additional configuration options.
- **`metadata`** (`Record<string, any>`, optional): Custom metadata to store with the lock.
## Example
```ts
import { LockManagerService, LockAcquisitionError } from '@kbn/lock-manager';
async function reIndexWithLock() {
// Attempt to acquire "my_lock"; if successful, runs the callback.
const lmService = new LockManagerService(coreSetup, logger);
return lmService.withLock('my_lock', async () => {
// …perform your exclusive operation here…
});
}
reIndexWithLock().catch((err) => {
if (err instanceof LockAcquisitionError) {
logger.debug('Re-index already in progress, skipping.');
return;
}
logger.error(`Failed to re-index: ${err.message}`);
});
```
## How It Works
**Atomic Acquire**
Performs one atomic Elasticsearch update that creates a new lock or renews an existing one - so if multiple processes race for the same lock, only one succeeds.
**TTL-Based Lease**
Each lock has a short, fixed lifespan (default 30s) and will automatically expire if not renewed. While the callback is executing, the lock will automatically extend the TTL to keep the lock active. This safeguards against deadlocks because if a Kibana node crashes after having obtained a lock it will automatically be released after 30 seconds.
Note: If Kibana node crashes, another process could acquire the same lock and start that task again when the lock automatically expires. To prevent your operation from running multiple times, include an application-level check (for example, querying Elasticsearch or your own status flag) to verify the operation isnt already in progress before proceeding.
**Token Fencing**
Each lock operation carries a unique token. Only the process with the matching token can extend or release the lock, preventing stale holders from interfering.

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export { LockAcquisitionError } from './src/lock_manager_client';
export { LockManagerService } from './src/lock_manager_service';

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
module.exports = {
preset: '@kbn/test/jest_node',
rootDir: '../..',
roots: ['<rootDir>/packages/kbn-lock-manager'],
};

View file

@ -0,0 +1,6 @@
{
"type": "shared-server",
"id": "@kbn/lock-manager",
"owner": ["@elastic/obs-ai-assistant"],
"devOnly": false
}

View file

@ -0,0 +1,6 @@
{
"name": "@kbn/lock-manager",
"private": true,
"version": "1.0.0",
"license": "Elastic License 2.0 OR AGPL-3.0-only OR SSPL-1.0"
}

View file

@ -0,0 +1,327 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
// eslint-disable-next-line max-classes-per-file
import { errors } from '@elastic/elasticsearch';
import { Logger } from '@kbn/logging';
import { v4 as uuid } from 'uuid';
import prettyMilliseconds from 'pretty-ms';
import { once } from 'lodash';
import { duration } from 'moment';
import { ElasticsearchClient } from '@kbn/core/server';
import { LOCKS_CONCRETE_INDEX_NAME, setupLockManagerIndex } from './setup_lock_manager_index';
export type LockId = string;
export interface LockDocument {
createdAt: string;
expiresAt: string;
metadata: Record<string, any>;
token: string;
}
export interface AcquireOptions {
/**
* Metadata to be stored with the lock. This can be any key-value pair.
* This is not mapped and therefore not searchable.
*/
metadata?: Record<string, any>;
/**
* Time to live (TTL) for the lock in milliseconds. Default is 30 seconds.
* When a lock expires it can be acquired by another process
*/
ttl?: number;
}
// The index assets should only be set up once
// For testing purposes, we need to be able to set it up every time
let runSetupIndexAssetOnce = once(setupLockManagerIndex);
export function runSetupIndexAssetEveryTime() {
runSetupIndexAssetOnce = setupLockManagerIndex;
}
export class LockManager {
private token = uuid();
constructor(
private lockId: LockId,
private esClient: ElasticsearchClient,
private logger: Logger
) {}
/**
* Attempts to acquire a lock by creating a document with the given lockId.
* If the lock exists and is expired, it will be released and acquisition retried.
*/
public async acquire({
metadata = {},
ttl = duration(30, 'seconds').asMilliseconds(),
}: AcquireOptions = {}): Promise<boolean> {
let response: Awaited<ReturnType<ElasticsearchClient['update']>>;
await runSetupIndexAssetOnce(this.esClient, this.logger);
this.token = uuid();
try {
response = await this.esClient.update<LockDocument>(
{
index: LOCKS_CONCRETE_INDEX_NAME,
id: this.lockId,
scripted_upsert: true,
script: {
lang: 'painless',
source: `
// Get the current time on the ES server.
long now = System.currentTimeMillis();
// If creating the document, or if the lock is expired,
// or if the current document is owned by the same token, then update it.
if (ctx.op == 'create' ||
Instant.parse(ctx._source.expiresAt).toEpochMilli() < now ||
ctx._source.token == params.token) {
def instantNow = Instant.ofEpochMilli(now);
ctx._source.createdAt = instantNow.toString();
ctx._source.expiresAt = instantNow.plusMillis(params.ttl).toString();
} else {
ctx.op = 'noop';
}
`,
params: {
ttl,
token: this.token,
},
},
// @ts-expect-error
upsert: {
metadata,
token: this.token,
},
},
{
retryOnTimeout: true,
maxRetries: 3,
}
);
} catch (e) {
if (isVersionConflictException(e)) {
this.logger.debug(`Lock "${this.lockId}" already held (version conflict)`);
return false;
}
this.logger.error(`Failed to acquire lock "${this.lockId}": ${e.message}`);
return false;
}
switch (response.result) {
case 'created': {
this.logger.debug(
`Lock "${this.lockId}" with token = ${
this.token
} acquired with ttl = ${prettyMilliseconds(ttl)}`
);
return true;
}
case 'updated': {
this.logger.debug(
`Lock "${this.lockId}" was expired and re-acquired with ttl = ${prettyMilliseconds(
ttl
)} and token = ${this.token}`
);
return true;
}
case 'noop': {
this.logger.debug(
`Lock "${this.lockId}" with token = ${this.token} could not be acquired. It is already held`
);
return false;
}
}
this.logger.warn(`Unexpected response: ${response.result}`);
return false;
}
/**
* Releases the lock by deleting the document with the given lockId and token
*/
public async release(): Promise<boolean> {
let response: Awaited<ReturnType<ElasticsearchClient['update']>>;
try {
response = await this.esClient.update<LockDocument>(
{
index: LOCKS_CONCRETE_INDEX_NAME,
id: this.lockId,
scripted_upsert: false,
script: {
lang: 'painless',
source: `
if (ctx._source.token == params.token) {
ctx.op = 'delete';
} else {
ctx.op = 'noop';
}
`,
params: { token: this.token },
},
},
{
retryOnTimeout: true,
maxRetries: 3,
}
);
} catch (error: any) {
if (isDocumentMissingException(error)) {
this.logger.debug(`Lock "${this.lockId}" already released.`);
return false;
}
this.logger.error(`Failed to release lock "${this.lockId}": ${error.message}`);
throw error;
}
switch (response.result) {
case 'deleted':
this.logger.debug(`Lock "${this.lockId}" released with token ${this.token}.`);
return true;
case 'noop':
this.logger.debug(
`Lock "${this.lockId}" with token = ${this.token} could not be released. Token does not match.`
);
return false;
}
this.logger.warn(`Unexpected response: ${response.result}`);
return false;
}
/**
* Retrieves the lock document for a given lockId.
* If the lock is expired, it will not be returned
*/
public async get(): Promise<LockDocument | undefined> {
const result = await this.esClient.get<LockDocument>(
{ index: LOCKS_CONCRETE_INDEX_NAME, id: this.lockId },
{ ignore: [404] }
);
if (!result._source) {
return undefined;
}
const isExpired = new Date(result._source?.expiresAt).getTime() < Date.now();
if (isExpired) {
return undefined;
}
return result._source;
}
public async extendTtl(ttl: number): Promise<boolean> {
try {
await this.esClient.update<LockDocument>({
index: LOCKS_CONCRETE_INDEX_NAME,
id: this.lockId,
script: {
lang: 'painless',
source: `
if (ctx._source.token == params.token) {
long now = System.currentTimeMillis();
ctx._source.expiresAt = Instant.ofEpochMilli(now + params.ttl).toString();
} else {
ctx.op = 'noop';
}`,
params: {
ttl,
token: this.token,
},
},
});
this.logger.debug(`Lock "${this.lockId}" extended ttl with ${prettyMilliseconds(ttl)}.`);
return true;
} catch (error) {
if (isVersionConflictException(error) || isDocumentMissingException(error)) {
this.logger.debug(`Lock "${this.lockId}" was released concurrently. Not extending TTL.`);
return false;
}
this.logger.error(`Failed to extend lock "${this.lockId}": ${error.message}`);
this.logger.debug(error);
return false;
}
}
}
export async function withLock<T>(
{
esClient,
logger,
lockId,
metadata,
ttl = duration(30, 'seconds').asMilliseconds(),
}: {
esClient: ElasticsearchClient;
logger: Logger;
lockId: LockId;
} & AcquireOptions,
callback: () => Promise<T>
): Promise<T> {
const lockManager = new LockManager(lockId, esClient, logger);
const acquired = await lockManager.acquire({ metadata, ttl });
if (!acquired) {
logger.debug(`Lock "${lockId}" not acquired. Exiting.`);
throw new LockAcquisitionError(`Lock "${lockId}" not acquired`);
}
// extend the ttl periodically
const extendInterval = Math.floor(ttl / 4);
logger.debug(
`Lock "${lockId}" acquired. Extending TTL every ${prettyMilliseconds(extendInterval)}`
);
let extendTTlPromise = Promise.resolve(true);
const intervalId = setInterval(() => {
// wait for the previous extendTtl request to finish before sending the next one. This is to avoid flooding ES with extendTtl requests in cases where ES is slow to respond.
extendTTlPromise = extendTTlPromise
.then(() => lockManager.extendTtl(ttl))
.catch((err) => {
logger.error(`Failed to extend lock "${lockId}":`, err);
return false;
});
}, extendInterval);
try {
return await callback();
} finally {
try {
clearInterval(intervalId);
await extendTTlPromise;
await lockManager.release();
} catch (error) {
logger.error(`Failed to release lock "${lockId}" in withLock: ${error.message}`);
logger.debug(error);
}
}
}
function isVersionConflictException(e: Error): boolean {
return (
e instanceof errors.ResponseError && e.body?.error?.type === 'version_conflict_engine_exception'
);
}
function isDocumentMissingException(e: Error): boolean {
return e instanceof errors.ResponseError && e.body?.error?.type === 'document_missing_exception';
}
export class LockAcquisitionError extends Error {
constructor(message: string) {
super(message);
this.name = 'LockAcquisitionError';
}
}

View file

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { CoreSetup, Logger } from '@kbn/core/server';
import { LockId, withLock } from './lock_manager_client';
export class LockManagerService {
constructor(private readonly coreSetup: CoreSetup<any>, private readonly logger: Logger) {}
/**
* Acquires a lock with the given ID and executes the callback function.
* If the lock is already held by another process, the callback will not be executed.
*
* Example usage:
*
* const { withLock } = new LockManagerService(coreSetup, logger);
* await withLock('my_lock', () => {
* // perform operation
* });
*/
async withLock<T>(
lockId: LockId,
callback: () => Promise<T>,
{
metadata,
}: {
metadata?: Record<string, any>;
} = {}
) {
const [coreStart] = await this.coreSetup.getStartServices();
const esClient = coreStart.elasticsearch.client.asInternalUser;
const logger = this.logger.get('LockManager');
return withLock<T>({ esClient, logger, lockId, metadata }, callback);
}
}

View file

@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { errors } from '@elastic/elasticsearch';
import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { IndicesGetMappingResponse } from '@elastic/elasticsearch/lib/api/types';
const LOCKS_INDEX_ALIAS = '.kibana_locks';
const INDEX_PATTERN = `${LOCKS_INDEX_ALIAS}*`;
export const LOCKS_CONCRETE_INDEX_NAME = `${LOCKS_INDEX_ALIAS}-000001`;
export const LOCKS_COMPONENT_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-component`;
export const LOCKS_INDEX_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-index-template`;
export async function removeLockIndexWithIncorrectMappings(
esClient: ElasticsearchClient,
logger: Logger
) {
let res: IndicesGetMappingResponse;
try {
res = await esClient.indices.getMapping({ index: LOCKS_CONCRETE_INDEX_NAME });
} catch (error) {
const isNotFoundError = error instanceof errors.ResponseError && error.statusCode === 404;
if (!isNotFoundError) {
logger.error(
`Failed to get mapping for lock index "${LOCKS_CONCRETE_INDEX_NAME}": ${error.message}`
);
}
return;
}
const { mappings } = res[LOCKS_CONCRETE_INDEX_NAME];
const hasIncorrectMappings =
mappings.properties?.token?.type !== 'keyword' ||
mappings.properties?.expiresAt?.type !== 'date';
if (hasIncorrectMappings) {
logger.warn(`Lock index "${LOCKS_CONCRETE_INDEX_NAME}" has incorrect mappings.`);
try {
await esClient.indices.delete({ index: LOCKS_CONCRETE_INDEX_NAME });
logger.info(`Lock index "${LOCKS_CONCRETE_INDEX_NAME}" removed successfully.`);
} catch (error) {
logger.error(`Failed to remove lock index "${LOCKS_CONCRETE_INDEX_NAME}": ${error.message}`);
}
}
}
export async function ensureTemplatesAndIndexCreated(
esClient: ElasticsearchClient,
logger: Logger
): Promise<void> {
await esClient.cluster.putComponentTemplate({
name: LOCKS_COMPONENT_TEMPLATE_NAME,
template: {
mappings: {
dynamic: false,
properties: {
token: { type: 'keyword' },
metadata: { enabled: false },
createdAt: { type: 'date' },
expiresAt: { type: 'date' },
},
},
},
});
logger.info(
`Component template ${LOCKS_COMPONENT_TEMPLATE_NAME} created or updated successfully.`
);
await esClient.indices.putIndexTemplate({
name: LOCKS_INDEX_TEMPLATE_NAME,
index_patterns: [INDEX_PATTERN],
composed_of: [LOCKS_COMPONENT_TEMPLATE_NAME],
priority: 500,
template: {
settings: {
number_of_shards: 1,
auto_expand_replicas: '0-1',
hidden: true,
},
},
});
logger.info(`Index template ${LOCKS_INDEX_TEMPLATE_NAME} created or updated successfully.`);
try {
await esClient.indices.create({ index: LOCKS_CONCRETE_INDEX_NAME });
logger.info(`Index ${LOCKS_CONCRETE_INDEX_NAME} created successfully.`);
} catch (error) {
const isIndexAlreadyExistsError =
error instanceof errors.ResponseError &&
error.body.error.type === 'resource_already_exists_exception';
if (isIndexAlreadyExistsError) {
logger.debug(`Index ${LOCKS_CONCRETE_INDEX_NAME} already exists. Skipping creation.`);
return;
}
logger.error(`Unable to create index ${LOCKS_CONCRETE_INDEX_NAME}: ${error.message}`);
throw error;
}
}
export async function setupLockManagerIndex(esClient: ElasticsearchClient, logger: Logger) {
await removeLockIndexWithIncorrectMappings(esClient, logger); // TODO: should be removed in the future (after 9.1). See https://github.com/elastic/kibana/issues/218944
await ensureTemplatesAndIndexCreated(esClient, logger);
}

View file

@ -0,0 +1,20 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "target/types",
"types": [
"jest",
"node"
]
},
"include": [
"**/*.ts"
],
"exclude": [
"target/**/*"
],
"kbn_references": [
"@kbn/logging",
"@kbn/core",
]
}