🌊 Streams: Prevent concurrent access (#222961)

This PR guards changes to the streams state that go through
`State.attemptChanges` via the newly introduced lock manager.

If two requests are happening at the same time, one of them now fails
with a 409.

## Concerns

* Lock expiry is 30s for now - is this too little? Should be good enough
for now, maybe we need to reconsider once we introduce the bulk api
* This is only guarding changes that go through the `State` class - some
things like queries and dashboards do not, so they can still be subject
to race conditions. We could sprinkle more locks over the code base, but
I would like to solve this by moving them into `State` as well, that
seems like the cleaner approach, even though a bit more effort
* Biggest question - on this PR the concurrent request fails directly
with a 409. Is this OK or should it wait and retry a couple times? I'm
in favor of starting like this and seeing if this is actually a problem.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Kevin Lacabane <kevin.lacabane@elastic.co>
This commit is contained in:
Joe Reuter 2025-06-16 17:52:26 +02:00 committed by GitHub
parent b08d2284a1
commit a8b2ac6c48
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 124 additions and 15 deletions

View file

@ -11,9 +11,10 @@ import {
QueryDslQueryContainer, QueryDslQueryContainer,
Result, Result,
} from '@elastic/elasticsearch/lib/api/types'; } from '@elastic/elasticsearch/lib/api/types';
import type { IScopedClusterClient, KibanaRequest, Logger } from '@kbn/core/server'; import type { IScopedClusterClient, Logger, KibanaRequest } from '@kbn/core/server';
import { isNotFoundError } from '@kbn/es-errors'; import { isNotFoundError } from '@kbn/es-errors';
import { Condition, Streams, getAncestors, getParentId } from '@kbn/streams-schema'; import { Condition, Streams, getAncestors, getParentId } from '@kbn/streams-schema';
import { LockManagerService } from '@kbn/lock-manager';
import { AssetClient } from './assets/asset_client'; import { AssetClient } from './assets/asset_client';
import { ASSET_ID, ASSET_TYPE } from './assets/fields'; import { ASSET_ID, ASSET_TYPE } from './assets/fields';
import { QueryClient } from './assets/query/query_client'; import { QueryClient } from './assets/query/query_client';
@ -58,6 +59,7 @@ function wrapEsCall<T>(p: Promise<T>): Promise<T> {
export class StreamsClient { export class StreamsClient {
constructor( constructor(
private readonly dependencies: { private readonly dependencies: {
lockManager: LockManagerService;
scopedClusterClient: IScopedClusterClient; scopedClusterClient: IScopedClusterClient;
assetClient: AssetClient; assetClient: AssetClient;
queryClient: QueryClient; queryClient: QueryClient;

View file

@ -8,6 +8,7 @@
import type { CoreSetup, KibanaRequest, Logger } from '@kbn/core/server'; import type { CoreSetup, KibanaRequest, Logger } from '@kbn/core/server';
import { IStorageClient, StorageIndexAdapter, StorageSettings, types } from '@kbn/storage-adapter'; import { IStorageClient, StorageIndexAdapter, StorageSettings, types } from '@kbn/storage-adapter';
import { Streams } from '@kbn/streams-schema'; import { Streams } from '@kbn/streams-schema';
import { LockManagerService } from '@kbn/lock-manager';
import type { StreamsPluginStartDependencies } from '../../types'; import type { StreamsPluginStartDependencies } from '../../types';
import { AssetClient } from './assets/asset_client'; import { AssetClient } from './assets/asset_client';
import { QueryClient } from './assets/query/query_client'; import { QueryClient } from './assets/query/query_client';
@ -66,6 +67,7 @@ export class StreamsService {
queryClient, queryClient,
logger, logger,
scopedClusterClient, scopedClusterClient,
lockManager: new LockManagerService(this.coreSetup, logger),
storageClient: storageAdapter.getClient(), storageClient: storageAdapter.getClient(),
request, request,
isServerless, isServerless,

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StatusError } from '../../errors/status_error';
export class ConcurrentAccessError extends StatusError {
constructor(message: string) {
super(message, 409);
this.name = 'ConcurrentAccessError';
}
}

View file

@ -21,6 +21,7 @@ import { StreamChange } from './types';
import { ElasticsearchAction } from './execution_plan/types'; import { ElasticsearchAction } from './execution_plan/types';
import { ExecutionPlan } from './execution_plan/execution_plan'; import { ExecutionPlan } from './execution_plan/execution_plan';
import { Streams } from '@kbn/streams-schema'; import { Streams } from '@kbn/streams-schema';
import { LockManagerService } from '@kbn/lock-manager';
describe('State', () => { describe('State', () => {
const searchMock = jest.fn(); const searchMock = jest.fn();
@ -29,6 +30,9 @@ describe('State', () => {
}; };
const stateDependenciesMock = { const stateDependenciesMock = {
storageClient: storageClientMock, storageClient: storageClientMock,
lockManager: {
withLock: (_, cb) => cb(),
} as LockManagerService,
isDev: true, isDev: true,
} as any; } as any;

View file

@ -6,6 +6,7 @@
*/ */
import { difference, intersection, isEqual } from 'lodash'; import { difference, intersection, isEqual } from 'lodash';
import { isLockAcquisitionError } from '@kbn/lock-manager';
import { StatusError } from '../errors/status_error'; import { StatusError } from '../errors/status_error';
import { FailedToApplyRequestedChangesError } from './errors/failed_to_apply_requested_changes_error'; import { FailedToApplyRequestedChangesError } from './errors/failed_to_apply_requested_changes_error';
import { FailedToDetermineElasticsearchActionsError } from './errors/failed_to_determine_elasticsearch_actions_error'; import { FailedToDetermineElasticsearchActionsError } from './errors/failed_to_determine_elasticsearch_actions_error';
@ -20,6 +21,7 @@ import type {
} from './stream_active_record/stream_active_record'; } from './stream_active_record/stream_active_record';
import { streamFromDefinition } from './stream_active_record/stream_from_definition'; import { streamFromDefinition } from './stream_active_record/stream_from_definition';
import type { StateDependencies, StreamChange } from './types'; import type { StateDependencies, StreamChange } from './types';
import { ConcurrentAccessError } from './errors/concurrent_access_error';
interface Changes { interface Changes {
created: string[]; created: string[];
@ -81,19 +83,29 @@ export class State {
elasticsearchActions, elasticsearchActions,
}; };
} else { } else {
const lmService = dependencies.lockManager;
return lmService
.withLock('streams/apply_changes', async () => {
try { try {
await desiredState.commitChanges(startingState); await desiredState.commitChanges(startingState);
return { status: 'success', changes: desiredState.changes(startingState) }; return { status: 'success' as const, changes: desiredState.changes(startingState) };
} catch (error) { } catch (error) {
await desiredState.attemptRollback(startingState, error); await desiredState.attemptRollback(startingState, error);
return { return {
status: 'failed_with_rollback', status: 'failed_with_rollback' as const,
error: new StatusError( error: new StatusError(
`Failed to apply changes but successfully rolled back to previous state: ${error.message}`, `Failed to apply changes but successfully rolled back to previous state: ${error.message}`,
error.statusCode ?? 500 error.statusCode ?? 500
), ),
}; };
} }
})
.catch((error) => {
if (isLockAcquisitionError(error)) {
throw new ConcurrentAccessError('Could not acquire lock for applying changes');
}
throw error;
});
} }
} }

View file

@ -7,6 +7,7 @@
import type { IScopedClusterClient, Logger } from '@kbn/core/server'; import type { IScopedClusterClient, Logger } from '@kbn/core/server';
import { Streams } from '@kbn/streams-schema'; import { Streams } from '@kbn/streams-schema';
import { LockManagerService } from '@kbn/lock-manager';
import type { AssetClient } from '../assets/asset_client'; import type { AssetClient } from '../assets/asset_client';
import type { StreamsClient } from '../client'; import type { StreamsClient } from '../client';
import type { StreamsStorageClient } from '../service'; import type { StreamsStorageClient } from '../service';
@ -25,6 +26,7 @@ export type StreamChange = StreamUpsertChange | StreamDeleteChange;
export interface StateDependencies { export interface StateDependencies {
logger: Logger; logger: Logger;
lockManager: LockManagerService;
streamsClient: StreamsClient; streamsClient: StreamsClient;
storageClient: StreamsStorageClient; storageClient: StreamsStorageClient;
scopedClusterClient: IScopedClusterClient; scopedClusterClient: IScopedClusterClient;

View file

@ -50,6 +50,7 @@
"@kbn/i18n", "@kbn/i18n",
"@kbn/zod-helpers", "@kbn/zod-helpers",
"@kbn/core-http-server-utils", "@kbn/core-http-server-utils",
"@kbn/inference-common" "@kbn/inference-common",
"@kbn/lock-manager"
] ]
} }

View file

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import { disableStreams, enableStreams, forkStream } from './helpers/requests';
import {
StreamsSupertestRepositoryClient,
createStreamsRepositoryAdminClient,
} from './helpers/repository_client';
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const roleScopedSupertest = getService('roleScopedSupertest');
let apiClient: StreamsSupertestRepositoryClient;
describe('conflict handling', function () {
before(async () => {
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
await enableStreams(apiClient);
});
after(async () => {
await disableStreams(apiClient);
});
it('should not allow multiple requests manipulating streams state at once', async () => {
const stream1 = {
stream: {
name: 'logs.nginx',
},
if: {
field: 'resource.attributes.host.name',
operator: 'eq' as const,
value: 'routeme',
},
};
const stream2 = {
stream: {
name: 'logs.apache',
},
if: {
field: 'resource.attributes.host.name',
operator: 'eq' as const,
value: 'routeme2',
},
};
const responses = await Promise.allSettled([
forkStream(apiClient, 'logs', stream1),
forkStream(apiClient, 'logs', stream2),
]);
// Assert than one of the requests failed with a conflict error and the other succeeded
// It needs to check either way (success or failure) because the order of execution is not guaranteed
expect(responses).to.have.length(2);
const successResponse = responses.find(
(response) => response.status === 'fulfilled' && response.value.acknowledged
);
const conflictResponse = responses.find(
(response) =>
response.status === 'rejected' &&
String(response.reason).toLowerCase().includes('conflict')
);
expect(successResponse).to.not.be(undefined);
expect(conflictResponse).to.not.be(undefined);
});
});
}

View file

@ -26,5 +26,6 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext)
loadTestFile(require.resolve('./content')); loadTestFile(require.resolve('./content'));
loadTestFile(require.resolve('./migration_on_read')); loadTestFile(require.resolve('./migration_on_read'));
loadTestFile(require.resolve('./meta_data')); loadTestFile(require.resolve('./meta_data'));
loadTestFile(require.resolve('./conflicts'));
}); });
} }