[Streams] Make root stream selectively immutable (#205609)

## Summary

This closes https://github.com/elastic/streams-program/issues/54.

The root stream is selectively immutable (processing and fields changes
are not allowed).

## UI

For the UI I've entirely disabled the actions column for the root stream
in the schema editor. All of the information (bar the preview table for
changes) available in the flyout for a field is already available in the
table, so this seems easiest for now to avoid multiple logic forks
wrapping buttons etc.

E.g. flyout vs table

![Screenshot 2025-01-02 at 13 41
55](https://github.com/user-attachments/assets/867fd67c-4acc-4457-ad5f-0eb5e9d9ce3f)
This commit is contained in:
Kerry Gallagher 2025-01-08 12:34:52 +00:00 committed by GitHub
parent e4586dac80
commit 4ba5879fa9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 357 additions and 122 deletions

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { ZodSchema } from '@kbn/zod';
import { ZodSchema, custom } from '@kbn/zod';
import {
AndCondition,
conditionSchema,
@ -71,6 +71,16 @@ export function isWiredStream(subject: StreamDefinition): subject is WiredStream
return isSchema(wiredStreamDefinitonSchema, subject);
}
const rootStreamSchema = custom<'RootStreamSchema'>((val) => {
return val?.name?.split('.').length === 1;
});
export function isRootStream(subject: any) {
return (
(isWiredStream(subject) || isWiredReadStream(subject)) && isSchema(rootStreamSchema, subject)
);
}
export function isWiredStreamConfig(subject: any): subject is WiredStreamConfigDefinition {
return isSchema(wiredStreamConfigDefinitonSchema, subject);
}

View file

@ -13,3 +13,4 @@ export * from './security_exception';
export * from './index_template_not_found';
export * from './fork_condition_missing';
export * from './component_template_not_found';
export * from './root_stream_immutability_exception';

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class RootStreamImmutabilityException extends Error {
constructor(message: string) {
super(message);
this.name = 'RootStreamImmutabilityException';
}
}

View file

@ -10,6 +10,7 @@ import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import { badRequest, internal, notFound } from '@hapi/boom';
import {
isRootStream,
isWiredStream,
isWiredStreamConfig,
streamConfigDefinitionSchema,
@ -17,10 +18,12 @@ import {
WiredStreamConfigDefinition,
WiredStreamDefinition,
} from '@kbn/streams-schema';
import { isEqual } from 'lodash';
import {
DefinitionNotFound,
ForkConditionMissing,
IndexTemplateNotFound,
RootStreamImmutabilityException,
SecurityException,
} from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
@ -71,7 +74,25 @@ export const editStreamRoute = createServerRoute({
return { acknowledged: true };
}
await validateStreamChildren(scopedClusterClient, params.path.id, params.body.ingest.routing);
const currentStreamDefinition = (await readStream({
scopedClusterClient,
id: params.path.id,
})) as WiredStreamDefinition;
if (isRootStream(streamDefinition)) {
await validateRootStreamChanges(
scopedClusterClient,
currentStreamDefinition,
streamDefinition
);
}
await validateStreamChildren(
scopedClusterClient,
currentStreamDefinition,
params.body.ingest.routing
);
if (isWiredStreamConfig(params.body)) {
await validateAncestorFields(
scopedClusterClient,
@ -148,7 +169,8 @@ export const editStreamRoute = createServerRoute({
if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId
e instanceof MalformedStreamId ||
e instanceof RootStreamImmutabilityException
) {
throw badRequest(e);
}
@ -189,15 +211,11 @@ async function updateParentStream(
async function validateStreamChildren(
scopedClusterClient: IScopedClusterClient,
id: string,
currentStreamDefinition: WiredStreamDefinition,
children: WiredStreamConfigDefinition['ingest']['routing']
) {
try {
const oldDefinition = await readStream({
scopedClusterClient,
id,
});
const oldChildren = oldDefinition.stream.ingest.routing.map((child) => child.name);
const oldChildren = currentStreamDefinition.stream.ingest.routing.map((child) => child.name);
const newChildren = new Set(children.map((child) => child.name));
children.forEach((child) => {
validateCondition(child.condition);
@ -214,3 +232,31 @@ async function validateStreamChildren(
}
}
}
/*
* Changes to mappings (fields) and processing rules are not allowed on the root stream.
* Changes to routing rules are allowed.
*/
async function validateRootStreamChanges(
scopedClusterClient: IScopedClusterClient,
currentStreamDefinition: WiredStreamDefinition,
nextStreamDefinition: WiredStreamDefinition
) {
const hasFieldChanges = !isEqual(
currentStreamDefinition.stream.ingest.wired.fields,
nextStreamDefinition.stream.ingest.wired.fields
);
if (hasFieldChanges) {
throw new RootStreamImmutabilityException('Root stream fields cannot be changed');
}
const hasProcessingChanges = !isEqual(
currentStreamDefinition.stream.ingest.processing,
nextStreamDefinition.stream.ingest.processing
);
if (hasProcessingChanges) {
throw new RootStreamImmutabilityException('Root stream processing rules cannot be changed');
}
}

View file

@ -22,7 +22,7 @@ import type {
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import useToggle from 'react-use/lib/useToggle';
import { isWiredReadStream, ReadStreamDefinition } from '@kbn/streams-schema';
import { isRootStream, isWiredReadStream, ReadStreamDefinition } from '@kbn/streams-schema';
import { FieldType } from './field_type';
import { FieldStatus } from './field_status';
import { FieldEntry, SchemaEditorEditingState } from './hooks/use_editing_state';
@ -155,111 +155,113 @@ const FieldsTable = ({ definition, fields, editingState, unpromotingState }: Fie
const [visibleColumns, setVisibleColumns] = useState(Object.keys(COLUMNS));
const trailingColumns = useMemo(() => {
return [
{
id: 'actions',
width: 40,
headerCellRender: () => null,
rowCellRender: ({ rowIndex }) => {
const field = fields[rowIndex];
return !isRootStream(definition)
? ([
{
id: 'actions',
width: 40,
headerCellRender: () => null,
rowCellRender: ({ rowIndex }) => {
const field = fields[rowIndex];
let actions: ActionsCellActionsDescriptor[] = [];
let actions: ActionsCellActionsDescriptor[] = [];
switch (field.status) {
case 'mapped':
actions = [
{
name: i18n.translate('xpack.streams.actions.viewFieldLabel', {
defaultMessage: 'View field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, false);
},
},
{
name: i18n.translate('xpack.streams.actions.editFieldLabel', {
defaultMessage: 'Edit field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, true);
},
},
{
name: i18n.translate('xpack.streams.actions.unpromoteFieldLabel', {
defaultMessage: 'Unmap field',
}),
disabled: unpromotingState.isUnpromotingField,
onClick: (fieldEntry: FieldEntry) => {
unpromotingState.setSelectedField(fieldEntry.name);
},
},
];
break;
case 'unmapped':
actions = [
{
name: i18n.translate('xpack.streams.actions.viewFieldLabel', {
defaultMessage: 'View field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, false);
},
},
{
name: i18n.translate('xpack.streams.actions.mapFieldLabel', {
defaultMessage: 'Map field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, true);
},
},
];
break;
case 'inherited':
actions = [
{
name: i18n.translate('xpack.streams.actions.viewFieldLabel', {
defaultMessage: 'View field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, false);
},
},
];
break;
}
return (
<ActionsCell
panels={[
{
id: 0,
title: i18n.translate(
'xpack.streams.streamDetailSchemaEditorFieldsTableActionsTitle',
switch (field.status) {
case 'mapped':
actions = [
{
defaultMessage: 'Actions',
}
),
items: actions.map((action) => ({
name: action.name,
icon: action.icon,
onClick: (event) => {
action.onClick(field);
name: i18n.translate('xpack.streams.actions.viewFieldLabel', {
defaultMessage: 'View field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, false);
},
},
})),
},
]}
/>
);
},
},
] as EuiDataGridProps['trailingControlColumns'];
}, [editingState, fields, unpromotingState]);
{
name: i18n.translate('xpack.streams.actions.editFieldLabel', {
defaultMessage: 'Edit field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, true);
},
},
{
name: i18n.translate('xpack.streams.actions.unpromoteFieldLabel', {
defaultMessage: 'Unmap field',
}),
disabled: unpromotingState.isUnpromotingField,
onClick: (fieldEntry: FieldEntry) => {
unpromotingState.setSelectedField(fieldEntry.name);
},
},
];
break;
case 'unmapped':
actions = [
{
name: i18n.translate('xpack.streams.actions.viewFieldLabel', {
defaultMessage: 'View field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, false);
},
},
{
name: i18n.translate('xpack.streams.actions.mapFieldLabel', {
defaultMessage: 'Map field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, true);
},
},
];
break;
case 'inherited':
actions = [
{
name: i18n.translate('xpack.streams.actions.viewFieldLabel', {
defaultMessage: 'View field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, false);
},
},
];
break;
}
return (
<ActionsCell
panels={[
{
id: 0,
title: i18n.translate(
'xpack.streams.streamDetailSchemaEditorFieldsTableActionsTitle',
{
defaultMessage: 'Actions',
}
),
items: actions.map((action) => ({
name: action.name,
icon: action.icon,
onClick: (event) => {
action.onClick(field);
},
})),
},
]}
/>
);
},
},
] as EuiDataGridProps['trailingControlColumns'])
: undefined;
}, [definition, editingState, fields, unpromotingState]);
return (
<EuiDataGrid

View file

@ -8,7 +8,14 @@
import expect from '@kbn/expect';
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import { WiredStreamConfigDefinition } from '@kbn/streams-schema';
import { enableStreams, fetchDocument, indexDocument, putStream } from './helpers/requests';
import {
deleteStream,
enableStreams,
fetchDocument,
forkStream,
indexDocument,
putStream,
} from './helpers/requests';
import { FtrProviderContext } from '../../ftr_provider_context';
import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers';
import { cleanUpRootStream } from './helpers/cleanup';
@ -22,9 +29,22 @@ export default function ({ getService }: FtrProviderContext) {
describe('Enrichment', () => {
before(async () => {
await enableStreams(supertest);
const body = {
stream: {
name: 'logs.nginx',
},
condition: {
field: 'host.name',
operator: 'eq',
value: 'routeme',
},
};
// We use a forked stream as processing changes cannot be made to the root stream
await forkStream(supertest, 'logs', body);
});
after(async () => {
await deleteStream(supertest, 'logs.nginx');
await cleanUpRootStream(esClient);
await esClient.indices.deleteDataStream({
name: ['logs*'],
@ -81,7 +101,7 @@ export default function ({ getService }: FtrProviderContext) {
},
},
};
const response = await putStream(supertest, 'logs', body);
const response = await putStream(supertest, 'logs.nginx', body);
expect(response).to.have.property('acknowledged', true);
});
@ -89,15 +109,28 @@ export default function ({ getService }: FtrProviderContext) {
const doc = {
'@timestamp': '2024-01-01T00:00:10.000Z',
message: '2023-01-01T00:00:10.000Z error test',
['host.name']: 'routeme',
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
await waitForDocumentInIndex({ esClient, indexName: 'logs', retryService, logger });
const reroutedDocResponse = await waitForDocumentInIndex({
esClient,
indexName: 'logs.nginx',
retryService,
logger,
});
const result = await fetchDocument(esClient, 'logs', response._id);
const result = await fetchDocument(
esClient,
'logs.nginx',
reroutedDocResponse.hits?.hits[0]?._id!
);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:10.000Z',
message: '2023-01-01T00:00:10.000Z error test',
host: {
name: 'routeme',
},
inner_timestamp: '2023-01-01T00:00:10.000Z',
message2: 'test',
log: {
@ -110,22 +143,30 @@ export default function ({ getService }: FtrProviderContext) {
const doc = {
'@timestamp': '2024-01-01T00:00:11.000Z',
message: '2023-01-01T00:00:10.000Z info mylogger this is the message',
['host.name']: 'routeme',
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
await waitForDocumentInIndex({
const reroutedDocResponse = await waitForDocumentInIndex({
esClient,
indexName: 'logs',
indexName: 'logs.nginx',
retryService,
logger,
docCountTarget: 2,
});
const result = await fetchDocument(esClient, 'logs', response._id);
const result = await fetchDocument(
esClient,
'logs.nginx',
reroutedDocResponse.hits?.hits[0]?._id!
);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:11.000Z',
message: '2023-01-01T00:00:10.000Z info mylogger this is the message',
inner_timestamp: '2023-01-01T00:00:10.000Z',
host: {
name: 'routeme',
},
log: {
level: 'info',
logger: 'mylogger',
@ -137,7 +178,7 @@ export default function ({ getService }: FtrProviderContext) {
it('Doc is searchable', async () => {
const response = await esClient.search({
index: 'logs',
index: 'logs.nginx',
body: {
query: {
match: {
@ -151,7 +192,7 @@ export default function ({ getService }: FtrProviderContext) {
it('Non-indexed field is not searchable', async () => {
const response = await esClient.search({
index: 'logs',
index: 'logs.nginx',
body: {
query: {
match: {

View file

@ -37,9 +37,14 @@ export async function forkStream(supertest: Agent, root: string, body: JsonObjec
return response.body;
}
export async function putStream(supertest: Agent, name: string, body: StreamConfigDefinition) {
export async function putStream(
supertest: Agent,
name: string,
body: StreamConfigDefinition,
expectStatusCode?: number
) {
const req = supertest.put(`/api/streams/${encodeURIComponent(name)}`).set('kbn-xsrf', 'xxx');
const response = await req.send(body).expect(200);
const response = await req.send(body).expect(expectStatusCode ?? 200);
return response.body;
}

View file

@ -15,5 +15,6 @@ export default function ({ loadTestFile }: FtrProviderContext) {
loadTestFile(require.resolve('./flush_config'));
loadTestFile(require.resolve('./assets/dashboard'));
loadTestFile(require.resolve('./schema'));
loadTestFile(require.resolve('./root_stream'));
});
}

View file

@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { WiredStreamConfigDefinition, WiredStreamDefinition } from '@kbn/streams-schema';
import { FtrProviderContext } from '../../ftr_provider_context';
import { cleanUpRootStream } from './helpers/cleanup';
import { enableStreams, putStream } from './helpers/requests';
const rootStreamDefinition: WiredStreamDefinition = {
name: 'logs',
stream: {
ingest: {
processing: [],
routing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
},
},
},
},
},
};
export default function ({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const esClient = getService('es');
describe('Root stream', () => {
before(async () => {
await enableStreams(supertest);
});
after(async () => {
await cleanUpRootStream(esClient);
});
it('Should not allow processing changes', async () => {
const body: WiredStreamConfigDefinition = {
ingest: {
...rootStreamDefinition.stream.ingest,
processing: [
{
config: {
grok: {
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
},
},
},
],
},
};
const response = await putStream(supertest, 'logs', body, 400);
expect(response).to.have.property(
'message',
'Root stream processing rules cannot be changed'
);
});
it('Should not allow fields changes', async () => {
const body: WiredStreamConfigDefinition = {
ingest: {
...rootStreamDefinition.stream.ingest,
wired: {
fields: {
...rootStreamDefinition.stream.ingest.wired.fields,
'log.level': {
type: 'boolean',
},
},
},
},
};
const response = await putStream(supertest, 'logs', body, 400);
expect(response).to.have.property('message', 'Root stream fields cannot be changed');
});
it('Should allow routing changes', async () => {
const body: WiredStreamConfigDefinition = {
ingest: {
...rootStreamDefinition.stream.ingest,
routing: [
{
name: 'logs.gcpcloud',
condition: {
field: 'cloud.provider',
operator: 'eq',
value: 'gcp',
},
},
],
},
};
const response = await putStream(supertest, 'logs', body);
expect(response).to.have.property('acknowledged', true);
});
});
}