mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
# Backport This will backport the following commits from `main` to `8.x`: - [🌊 Make client check for hierarchy conflicts before creating streams (#208914)](https://github.com/elastic/kibana/pull/208914) <!--- Backport version: 9.4.3 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) <!--BACKPORT [{"author":{"name":"Milton Hultgren","email":"milton.hultgren@elastic.co"},"sourceCommit":{"committedDate":"2025-02-05T14:01:47Z","message":"🌊 Make client check for hierarchy conflicts before creating streams (#208914)\n\n## Summary\r\n\r\nIf you enable streams (which creates `logs`) and then try to create\r\n`logs.child.grandchild` but `logs.child` already exists as either an\r\nindex or an unwired (Classic) stream, then we end up in a weird state\r\nwhere `logs.child.grandchild` gets created as a wired child but then the\r\nrequest fails as it tries to turn the unwired stream into a wired\r\nstream.\r\n\r\nThis PR adds a step that asserts that there are no such conflicts in the\r\nhierarchy before proceeding.\r\nIt also adds a check to ensure Streams are enabled before allowing the\r\ncreation of any streams, as well as blocking the creation of a root\r\nstream that isn't `logs`.\r\nFinally, there is some minor improvements to error handling for when a\r\ndata stream isn't found and error messages.","sha":"3c4694e1ddee4cd956c4b70d7f5c0f521f504212","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","backport:version","Feature:Streams","v9.1.0","v8.19.0"],"title":"🌊 Make client check for hierarchy conflicts before creating streams","number":208914,"url":"https://github.com/elastic/kibana/pull/208914","mergeCommit":{"message":"🌊 Make client check for hierarchy conflicts before creating streams (#208914)\n\n## Summary\r\n\r\nIf you enable streams (which creates `logs`) and then try to create\r\n`logs.child.grandchild` but `logs.child` already exists as either an\r\nindex or an unwired (Classic) stream, then we end up in a weird state\r\nwhere `logs.child.grandchild` gets created as a wired child but then the\r\nrequest fails as it tries to turn the unwired stream into a wired\r\nstream.\r\n\r\nThis PR adds a step that asserts that there are no such conflicts in the\r\nhierarchy before proceeding.\r\nIt also adds a check to ensure Streams are enabled before allowing the\r\ncreation of any streams, as well as blocking the creation of a root\r\nstream that isn't `logs`.\r\nFinally, there is some minor improvements to error handling for when a\r\ndata stream isn't found and error messages.","sha":"3c4694e1ddee4cd956c4b70d7f5c0f521f504212"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/208914","number":208914,"mergeCommit":{"message":"🌊 Make client check for hierarchy conflicts before creating streams (#208914)\n\n## Summary\r\n\r\nIf you enable streams (which creates `logs`) and then try to create\r\n`logs.child.grandchild` but `logs.child` already exists as either an\r\nindex or an unwired (Classic) stream, then we end up in a weird state\r\nwhere `logs.child.grandchild` gets created as a wired child but then the\r\nrequest fails as it tries to turn the unwired stream into a wired\r\nstream.\r\n\r\nThis PR adds a step that asserts that there are no such conflicts in the\r\nhierarchy before proceeding.\r\nIt also adds a check to ensure Streams are enabled before allowing the\r\ncreation of any streams, as well as blocking the creation of a root\r\nstream that isn't `logs`.\r\nFinally, there is some minor improvements to error handling for when a\r\ndata stream isn't found and error messages.","sha":"3c4694e1ddee4cd956c4b70d7f5c0f521f504212"}},{"branch":"8.x","label":"v8.19.0","branchLabelMappingKey":"^v8.19.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}] BACKPORT--> Co-authored-by: Milton Hultgren <milton.hultgren@elastic.co>
This commit is contained in:
parent
dd8084645e
commit
308f2228c6
7 changed files with 119 additions and 8 deletions
|
@ -554,6 +554,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings,
|
|||
request: {} as unknown as DiagnosticResult['meta']['request'],
|
||||
},
|
||||
warnings: [],
|
||||
body: 'resource_not_found_exception',
|
||||
statusCode: 404,
|
||||
});
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { errors } from '@elastic/elasticsearch';
|
||||
import { DiagnosticResult, errors } from '@elastic/elasticsearch';
|
||||
import {
|
||||
IndicesDataStream,
|
||||
QueryDslQueryContainer,
|
||||
|
@ -50,7 +50,7 @@ import {
|
|||
validateStreamLifecycle,
|
||||
validateStreamTypeChanges,
|
||||
} from './helpers/validate_stream';
|
||||
import { rootStreamDefinition } from './root_stream_definition';
|
||||
import { LOGS_ROOT_STREAM_NAME, rootStreamDefinition } from './root_stream_definition';
|
||||
import { StreamsStorageClient } from './service';
|
||||
import {
|
||||
checkAccess,
|
||||
|
@ -64,6 +64,7 @@ import { DefinitionNotFoundError } from './errors/definition_not_found_error';
|
|||
import { MalformedStreamIdError } from './errors/malformed_stream_id_error';
|
||||
import { SecurityError } from './errors/security_error';
|
||||
import { findInheritedLifecycle, findInheritingStreams } from './helpers/lifecycle';
|
||||
import { NameTakenError } from './errors/name_taken_error';
|
||||
import { MalformedStreamError } from './errors/malformed_stream_error';
|
||||
|
||||
interface AcknowledgeResponse<TResult extends Result> {
|
||||
|
@ -79,8 +80,6 @@ export type ForkStreamResponse = AcknowledgeResponse<'created'>;
|
|||
export type ResyncStreamsResponse = AcknowledgeResponse<'updated'>;
|
||||
export type UpsertStreamResponse = AcknowledgeResponse<'updated' | 'created'>;
|
||||
|
||||
const LOGS_ROOT_STREAM_NAME = 'logs';
|
||||
|
||||
function isElasticsearch404(error: unknown): error is errors.ResponseError & { statusCode: 404 } {
|
||||
return isResponseError(error) && error.statusCode === 404;
|
||||
}
|
||||
|
@ -311,6 +310,10 @@ export class StreamsClient {
|
|||
result: 'created' | 'updated';
|
||||
parentDefinition?: WiredStreamDefinition;
|
||||
}> {
|
||||
if (isWiredStreamDefinition(definition)) {
|
||||
await this.assertNoHierarchicalConflicts(definition.name);
|
||||
}
|
||||
|
||||
const existingDefinition = await this.getStream(definition.name).catch((error) => {
|
||||
if (isDefinitionNotFoundError(error)) {
|
||||
return undefined;
|
||||
|
@ -374,6 +377,47 @@ export class StreamsClient {
|
|||
};
|
||||
}
|
||||
|
||||
private async assertNoHierarchicalConflicts(definitionName: string) {
|
||||
const streamNames = [...getAncestors(definitionName), definitionName];
|
||||
const hasConflict = await Promise.all(
|
||||
streamNames.map((streamName) => this.isStreamNameTaken(streamName))
|
||||
);
|
||||
const conflicts = streamNames.filter((_, index) => hasConflict[index]);
|
||||
|
||||
if (conflicts.length !== 0) {
|
||||
throw new NameTakenError(
|
||||
`Cannot create stream "${definitionName}" due to hierarchical conflicts caused by existing unwired stream definition, index or data stream: [${conflicts.join(
|
||||
', '
|
||||
)}]`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async isStreamNameTaken(streamName: string): Promise<boolean> {
|
||||
try {
|
||||
const definition = await this.getStream(streamName);
|
||||
return isUnwiredStreamDefinition(definition);
|
||||
} catch (error) {
|
||||
if (!isDefinitionNotFoundError(error)) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await this.dependencies.scopedClusterClient.asCurrentUser.indices.get({
|
||||
index: streamName,
|
||||
});
|
||||
|
||||
return true;
|
||||
} catch (error) {
|
||||
if (isElasticsearch404(error)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates whether:
|
||||
* - there are no conflicting field types,
|
||||
|
@ -621,6 +665,22 @@ export class StreamsClient {
|
|||
return this.dependencies.scopedClusterClient.asCurrentUser.indices
|
||||
.getDataStream({ name })
|
||||
.then((response) => {
|
||||
if (response.data_streams.length === 0) {
|
||||
throw new errors.ResponseError({
|
||||
meta: {
|
||||
aborted: false,
|
||||
attempts: 1,
|
||||
connection: null,
|
||||
context: null,
|
||||
name: 'resource_not_found_exception',
|
||||
request: {} as unknown as DiagnosticResult['meta']['request'],
|
||||
},
|
||||
warnings: [],
|
||||
body: 'resource_not_found_exception',
|
||||
statusCode: 404,
|
||||
});
|
||||
}
|
||||
|
||||
const dataStream = response.data_streams[0];
|
||||
return dataStream;
|
||||
});
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { StatusError } from './status_error';
|
||||
|
||||
export class NameTakenError extends StatusError {
|
||||
constructor(message: string) {
|
||||
super(message, 409);
|
||||
this.name = 'NameTakenError';
|
||||
}
|
||||
}
|
|
@ -5,10 +5,12 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { WiredStreamDefinition } from '@kbn/streams-schema';
|
||||
import { WiredStreamDefinition, getSegments } from '@kbn/streams-schema';
|
||||
|
||||
export const LOGS_ROOT_STREAM_NAME = 'logs';
|
||||
|
||||
export const rootStreamDefinition: WiredStreamDefinition = {
|
||||
name: 'logs',
|
||||
name: LOGS_ROOT_STREAM_NAME,
|
||||
ingest: {
|
||||
lifecycle: { dsl: {} },
|
||||
processing: [],
|
||||
|
@ -34,3 +36,8 @@ export const rootStreamDefinition: WiredStreamDefinition = {
|
|||
},
|
||||
},
|
||||
};
|
||||
|
||||
export function hasSupportedStreamsRoot(streamName: string) {
|
||||
const root = getSegments(streamName)[0];
|
||||
return [LOGS_ROOT_STREAM_NAME].includes(root);
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
|
||||
import { createServerRouteFactory } from '@kbn/server-route-repository';
|
||||
import { CreateServerRouteFactory } from '@kbn/server-route-repository-utils/src/typings';
|
||||
import { badRequest, forbidden, internal, notFound } from '@hapi/boom';
|
||||
import { badRequest, conflict, forbidden, internal, notFound } from '@hapi/boom';
|
||||
import { errors } from '@elastic/elasticsearch';
|
||||
import { StreamsRouteHandlerResources } from './types';
|
||||
import { StatusError } from '../lib/streams/errors/status_error';
|
||||
|
@ -33,6 +33,9 @@ export const createServerRoute: CreateServerRouteFactory<
|
|||
case 404:
|
||||
throw notFound(error);
|
||||
|
||||
case 409:
|
||||
throw conflict(error);
|
||||
|
||||
case 500:
|
||||
throw internal(error);
|
||||
}
|
||||
|
|
|
@ -10,9 +10,12 @@ import {
|
|||
isGroupStreamDefinition,
|
||||
StreamDefinition,
|
||||
StreamGetResponse,
|
||||
isWiredStreamDefinition,
|
||||
streamUpsertRequestSchema,
|
||||
} from '@kbn/streams-schema';
|
||||
import { z } from '@kbn/zod';
|
||||
import { badData, badRequest } from '@hapi/boom';
|
||||
import { hasSupportedStreamsRoot } from '../../../lib/streams/root_stream_definition';
|
||||
import { UpsertStreamResponse } from '../../../lib/streams/client';
|
||||
import { createServerRoute } from '../../create_server_route';
|
||||
import { readStream } from './read_stream';
|
||||
|
@ -148,6 +151,18 @@ export const editStreamRoute = createServerRoute({
|
|||
}),
|
||||
handler: async ({ params, request, getScopedClients }): Promise<UpsertStreamResponse> => {
|
||||
const { streamsClient } = await getScopedClients({ request });
|
||||
|
||||
if (!(await streamsClient.isStreamsEnabled())) {
|
||||
throw badData('Streams are not enabled');
|
||||
}
|
||||
|
||||
if (
|
||||
isWiredStreamDefinition({ ...params.body.stream, name: params.path.id }) &&
|
||||
!hasSupportedStreamsRoot(params.path.id)
|
||||
) {
|
||||
throw badRequest('Cannot create wired stream due to unsupported root stream');
|
||||
}
|
||||
|
||||
return await streamsClient.upsertStream({
|
||||
request: params.body,
|
||||
name: params.path.id,
|
||||
|
|
|
@ -6,6 +6,8 @@
|
|||
*/
|
||||
|
||||
import { z } from '@kbn/zod';
|
||||
import { conflict } from '@hapi/boom';
|
||||
import { NameTakenError } from '../../../lib/streams/errors/name_taken_error';
|
||||
import { DisableStreamsResponse, EnableStreamsResponse } from '../../../lib/streams/client';
|
||||
import { createServerRoute } from '../../create_server_route';
|
||||
|
||||
|
@ -27,7 +29,15 @@ export const enableStreamsRoute = createServerRoute({
|
|||
request,
|
||||
});
|
||||
|
||||
return await streamsClient.enableStreams();
|
||||
try {
|
||||
return await streamsClient.enableStreams();
|
||||
} catch (error) {
|
||||
if (error instanceof NameTakenError) {
|
||||
throw conflict(`Cannot enable Streams, failed to create root stream: ${error.message}`);
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue