🌊 Make client check for hierarchy conflicts before creating streams (#208914)

## Summary

If you enable streams (which creates `logs`) and then try to create
`logs.child.grandchild` but `logs.child` already exists as either an
index or an unwired (Classic) stream, then we end up in a weird state
where `logs.child.grandchild` gets created as a wired child but then the
request fails as it tries to turn the unwired stream into a wired
stream.

This PR adds a step that asserts that there are no such conflicts in the
hierarchy before proceeding.
It also adds a check to ensure Streams are enabled before allowing the
creation of any streams, as well as blocking the creation of a root
stream that isn't `logs`.
Finally, there is some minor improvements to error handling for when a
data stream isn't found and error messages.
This commit is contained in:
Milton Hultgren 2025-02-05 15:01:47 +01:00 committed by GitHub
parent 1c4d0e99b4
commit 3c4694e1dd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 119 additions and 8 deletions

View file

@ -554,6 +554,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings,
request: {} as unknown as DiagnosticResult['meta']['request'],
},
warnings: [],
body: 'resource_not_found_exception',
statusCode: 404,
});
}

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { errors } from '@elastic/elasticsearch';
import { DiagnosticResult, errors } from '@elastic/elasticsearch';
import {
IndicesDataStream,
QueryDslQueryContainer,
@ -50,7 +50,7 @@ import {
validateStreamLifecycle,
validateStreamTypeChanges,
} from './helpers/validate_stream';
import { rootStreamDefinition } from './root_stream_definition';
import { LOGS_ROOT_STREAM_NAME, rootStreamDefinition } from './root_stream_definition';
import { StreamsStorageClient } from './service';
import {
checkAccess,
@ -64,6 +64,7 @@ import { DefinitionNotFoundError } from './errors/definition_not_found_error';
import { MalformedStreamIdError } from './errors/malformed_stream_id_error';
import { SecurityError } from './errors/security_error';
import { findInheritedLifecycle, findInheritingStreams } from './helpers/lifecycle';
import { NameTakenError } from './errors/name_taken_error';
import { MalformedStreamError } from './errors/malformed_stream_error';
interface AcknowledgeResponse<TResult extends Result> {
@ -79,8 +80,6 @@ export type ForkStreamResponse = AcknowledgeResponse<'created'>;
export type ResyncStreamsResponse = AcknowledgeResponse<'updated'>;
export type UpsertStreamResponse = AcknowledgeResponse<'updated' | 'created'>;
const LOGS_ROOT_STREAM_NAME = 'logs';
function isElasticsearch404(error: unknown): error is errors.ResponseError & { statusCode: 404 } {
return isResponseError(error) && error.statusCode === 404;
}
@ -311,6 +310,10 @@ export class StreamsClient {
result: 'created' | 'updated';
parentDefinition?: WiredStreamDefinition;
}> {
if (isWiredStreamDefinition(definition)) {
await this.assertNoHierarchicalConflicts(definition.name);
}
const existingDefinition = await this.getStream(definition.name).catch((error) => {
if (isDefinitionNotFoundError(error)) {
return undefined;
@ -374,6 +377,47 @@ export class StreamsClient {
};
}
private async assertNoHierarchicalConflicts(definitionName: string) {
const streamNames = [...getAncestors(definitionName), definitionName];
const hasConflict = await Promise.all(
streamNames.map((streamName) => this.isStreamNameTaken(streamName))
);
const conflicts = streamNames.filter((_, index) => hasConflict[index]);
if (conflicts.length !== 0) {
throw new NameTakenError(
`Cannot create stream "${definitionName}" due to hierarchical conflicts caused by existing unwired stream definition, index or data stream: [${conflicts.join(
', '
)}]`
);
}
}
private async isStreamNameTaken(streamName: string): Promise<boolean> {
try {
const definition = await this.getStream(streamName);
return isUnwiredStreamDefinition(definition);
} catch (error) {
if (!isDefinitionNotFoundError(error)) {
throw error;
}
}
try {
await this.dependencies.scopedClusterClient.asCurrentUser.indices.get({
index: streamName,
});
return true;
} catch (error) {
if (isElasticsearch404(error)) {
return false;
}
throw error;
}
}
/**
* Validates whether:
* - there are no conflicting field types,
@ -621,6 +665,22 @@ export class StreamsClient {
return this.dependencies.scopedClusterClient.asCurrentUser.indices
.getDataStream({ name })
.then((response) => {
if (response.data_streams.length === 0) {
throw new errors.ResponseError({
meta: {
aborted: false,
attempts: 1,
connection: null,
context: null,
name: 'resource_not_found_exception',
request: {} as unknown as DiagnosticResult['meta']['request'],
},
warnings: [],
body: 'resource_not_found_exception',
statusCode: 404,
});
}
const dataStream = response.data_streams[0];
return dataStream;
});

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StatusError } from './status_error';
export class NameTakenError extends StatusError {
constructor(message: string) {
super(message, 409);
this.name = 'NameTakenError';
}
}

View file

@ -5,10 +5,12 @@
* 2.0.
*/
import { WiredStreamDefinition } from '@kbn/streams-schema';
import { WiredStreamDefinition, getSegments } from '@kbn/streams-schema';
export const LOGS_ROOT_STREAM_NAME = 'logs';
export const rootStreamDefinition: WiredStreamDefinition = {
name: 'logs',
name: LOGS_ROOT_STREAM_NAME,
ingest: {
lifecycle: { dsl: {} },
processing: [],
@ -34,3 +36,8 @@ export const rootStreamDefinition: WiredStreamDefinition = {
},
},
};
export function hasSupportedStreamsRoot(streamName: string) {
const root = getSegments(streamName)[0];
return [LOGS_ROOT_STREAM_NAME].includes(root);
}

View file

@ -7,7 +7,7 @@
import { createServerRouteFactory } from '@kbn/server-route-repository';
import { CreateServerRouteFactory } from '@kbn/server-route-repository-utils/src/typings';
import { badRequest, forbidden, internal, notFound } from '@hapi/boom';
import { badRequest, conflict, forbidden, internal, notFound } from '@hapi/boom';
import { errors } from '@elastic/elasticsearch';
import { StreamsRouteHandlerResources } from './types';
import { StatusError } from '../lib/streams/errors/status_error';
@ -33,6 +33,9 @@ export const createServerRoute: CreateServerRouteFactory<
case 404:
throw notFound(error);
case 409:
throw conflict(error);
case 500:
throw internal(error);
}

View file

@ -10,9 +10,12 @@ import {
isGroupStreamDefinition,
StreamDefinition,
StreamGetResponse,
isWiredStreamDefinition,
streamUpsertRequestSchema,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { badData, badRequest } from '@hapi/boom';
import { hasSupportedStreamsRoot } from '../../../lib/streams/root_stream_definition';
import { UpsertStreamResponse } from '../../../lib/streams/client';
import { createServerRoute } from '../../create_server_route';
import { readStream } from './read_stream';
@ -148,6 +151,18 @@ export const editStreamRoute = createServerRoute({
}),
handler: async ({ params, request, getScopedClients }): Promise<UpsertStreamResponse> => {
const { streamsClient } = await getScopedClients({ request });
if (!(await streamsClient.isStreamsEnabled())) {
throw badData('Streams are not enabled');
}
if (
isWiredStreamDefinition({ ...params.body.stream, name: params.path.id }) &&
!hasSupportedStreamsRoot(params.path.id)
) {
throw badRequest('Cannot create wired stream due to unsupported root stream');
}
return await streamsClient.upsertStream({
request: params.body,
name: params.path.id,

View file

@ -6,6 +6,8 @@
*/
import { z } from '@kbn/zod';
import { conflict } from '@hapi/boom';
import { NameTakenError } from '../../../lib/streams/errors/name_taken_error';
import { DisableStreamsResponse, EnableStreamsResponse } from '../../../lib/streams/client';
import { createServerRoute } from '../../create_server_route';
@ -27,7 +29,15 @@ export const enableStreamsRoute = createServerRoute({
request,
});
return await streamsClient.enableStreams();
try {
return await streamsClient.enableStreams();
} catch (error) {
if (error instanceof NameTakenError) {
throw conflict(`Cannot enable Streams, failed to create root stream: ${error.message}`);
}
throw error;
}
},
});