🌊 Streams: Move routing into wired object in the API (#213121)

Since we decided we don't want to provide routing for classic streams,
it doesn't make sense to be on the level of the ingest stream in the
API. This PR moves routing next to fields to make clear that it's only
supported for wired streams.
This commit is contained in:
Joe Reuter 2025-03-06 18:52:09 +01:00 committed by GitHub
parent 20ec004a95
commit a81e692556
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 143 additions and 120 deletions

View file

@ -22,15 +22,5 @@ export const ingestStreamConfig = {
},
},
],
routing: [
{
name: 'logs.errors',
condition: {
field: 'log.level',
operator: 'eq',
value: 'error',
},
},
],
},
};

View file

@ -23,22 +23,22 @@ export const wiredStreamConfig = {
},
},
],
routing: [
{
name: 'logs.errors',
condition: {
field: 'log.level',
operator: 'eq',
value: 'error',
},
},
],
wired: {
fields: {
new_field: {
type: 'long',
},
},
routing: [
{
name: 'logs.errors',
condition: {
field: 'log.level',
operator: 'eq',
value: 'error',
},
},
],
},
},
};

View file

@ -16,12 +16,12 @@ import { IngestStreamLifecycle, ingestStreamLifecycleSchema } from './lifecycle'
interface IngestBase {
lifecycle: IngestStreamLifecycle;
processing: ProcessorDefinition[];
routing: RoutingDefinition[];
}
interface WiredIngest extends IngestBase {
wired: {
fields: FieldDefinition;
routing: RoutingDefinition[];
};
}
@ -50,7 +50,6 @@ type IngestStreamDefinition = WiredStreamDefinition | UnwiredStreamDefinition;
const ingestBaseSchema: z.Schema<IngestBase> = z.object({
lifecycle: ingestStreamLifecycleSchema,
processing: z.array(processorDefinitionSchema),
routing: z.array(routingDefinitionSchema),
});
const unwiredIngestSchema: z.Schema<UnwiredIngest> = z.intersection(
@ -65,6 +64,7 @@ const wiredIngestSchema: z.Schema<WiredIngest> = z.intersection(
z.object({
wired: z.object({
fields: fieldDefinitionSchema,
routing: z.array(routingDefinitionSchema),
}),
})
);

View file

@ -245,7 +245,7 @@ export class StreamsClient {
});
if (parentDefinition) {
const isRoutingToChild = parentDefinition.ingest.routing.find(
const isRoutingToChild = parentDefinition.ingest.wired.routing.find(
(item) => item.destination === name
);
@ -255,7 +255,7 @@ export class StreamsClient {
// The user can set the condition later on the parent
await this.updateStreamRouting({
definition: parentDefinition,
routing: parentDefinition.ingest.routing.concat({
routing: parentDefinition.ingest.wired.routing.concat({
destination: name,
if: { never: {} },
}),
@ -275,14 +275,14 @@ export class StreamsClient {
ingest: {
lifecycle: { inherit: {} },
processing: [],
routing: [
{
destination: stream.name,
if: { never: {} },
},
],
wired: {
fields: {},
routing: [
{
destination: stream.name,
if: { never: {} },
},
],
},
},
},
@ -471,7 +471,7 @@ export class StreamsClient {
validateStreamChildrenChanges(existingDefinition, definition);
}
for (const item of definition.ingest.routing) {
for (const item of definition.ingest.wired.routing) {
if (descendantsById[item.destination]) {
continue;
}
@ -486,9 +486,9 @@ export class StreamsClient {
ingest: {
lifecycle: { inherit: {} },
processing: [],
routing: [],
wired: {
fields: {},
routing: [],
},
},
},
@ -543,14 +543,21 @@ export class StreamsClient {
if: Condition;
}): Promise<ForkStreamResponse> {
const parentDefinition = asIngestStreamDefinition(await this.getStream(parent));
if (!isWiredStreamDefinition(parentDefinition)) {
throw new MalformedStreamIdError('Cannot fork a stream that is not managed');
}
const childDefinition: WiredStreamDefinition = {
name,
ingest: { lifecycle: { inherit: {} }, processing: [], routing: [], wired: { fields: {} } },
ingest: { lifecycle: { inherit: {} }, processing: [], wired: { fields: {}, routing: [] } },
};
// check whether root stream has a child of the given name already
if (parentDefinition.ingest.routing.some((item) => item.destination === childDefinition.name)) {
if (
parentDefinition.ingest.wired.routing.some(
(item) => item.destination === childDefinition.name
)
) {
throw new MalformedStreamIdError(
`The stream with ID (${name}) already exists as a child of the parent stream`
);
@ -569,7 +576,7 @@ export class StreamsClient {
await this.updateStreamRouting({
definition: updatedParentDefinition!,
routing: parentDefinition.ingest.routing.concat({
routing: parentDefinition.ingest.wired.routing.concat({
destination: name,
if: condition,
}),
@ -696,7 +703,6 @@ export class StreamsClient {
name: dataStream.name,
ingest: {
lifecycle: { inherit: {} },
routing: [],
processing: [],
unwired: {},
},
@ -757,7 +763,6 @@ export class StreamsClient {
ingest: {
lifecycle: { inherit: {} },
processing: [],
routing: [],
unwired: {},
},
}));
@ -820,7 +825,7 @@ export class StreamsClient {
await this.updateStreamRouting({
definition: parentDefinition,
routing: parentDefinition.ingest.routing.filter(
routing: parentDefinition.ingest.wired.routing.filter(
(item) => item.destination !== definition.name
),
});
@ -828,7 +833,7 @@ export class StreamsClient {
// delete the children first, as this will update
// the parent as well
for (const item of definition.ingest.routing) {
for (const item of definition.ingest.wired.routing) {
await this.deleteStream(item.destination);
}
@ -856,10 +861,10 @@ export class StreamsClient {
routing,
}: {
definition: WiredStreamDefinition;
routing: WiredStreamDefinition['ingest']['routing'];
routing: WiredStreamDefinition['ingest']['wired']['routing'];
}) {
const update = cloneDeep(definition);
update.ingest.routing = routing;
update.ingest.wired.routing = routing;
await this.updateStoredStream(update);

View file

@ -241,9 +241,6 @@ export async function syncUnwiredStreamDefinitionObjects({
dataStream: IndicesDataStream;
definition: UnwiredStreamDefinition;
}) {
if (definition.ingest.routing.length) {
throw new Error('Unmanaged streams cannot have managed children, coming soon');
}
const unmanagedAssets = await getUnmanagedElasticsearchAssets({
dataStream,
scopedClusterClient,

View file

@ -91,11 +91,11 @@ export function validateStreamChildrenChanges(
currentStreamDefinition: WiredStreamDefinition,
nextStreamDefinition: WiredStreamDefinition
) {
const existingChildren = currentStreamDefinition.ingest.routing.map(
const existingChildren = currentStreamDefinition.ingest.wired.routing.map(
(routingDefinition) => routingDefinition.destination
);
const nextChildren = nextStreamDefinition.ingest.routing.map(
const nextChildren = nextStreamDefinition.ingest.wired.routing.map(
(routingDefinition) => routingDefinition.destination
);

View file

@ -5,19 +5,19 @@
* 2.0.
*/
import { IngestStreamDefinition } from '@kbn/streams-schema';
import { WiredStreamDefinition } from '@kbn/streams-schema';
import { ASSET_VERSION } from '../../../../common/constants';
import { conditionToPainless } from '../helpers/condition_to_painless';
import { getReroutePipelineName } from './name';
interface GenerateReroutePipelineParams {
definition: IngestStreamDefinition;
definition: WiredStreamDefinition;
}
export function generateReroutePipeline({ definition }: GenerateReroutePipelineParams) {
return {
id: getReroutePipelineName(definition.name),
processors: definition.ingest.routing.map((child) => {
processors: definition.ingest.wired.routing.map((child) => {
return {
reroute: {
destination: child.destination,

View file

@ -14,8 +14,8 @@ export const rootStreamDefinition: WiredStreamDefinition = {
ingest: {
lifecycle: { dsl: {} },
processing: [],
routing: [],
wired: {
routing: [],
fields: {
'@timestamp': {
type: 'date',

View file

@ -203,9 +203,9 @@ export const FieldSummary = (props: FieldSummaryProps) => {
<EuiHorizontalRule margin="xs" />
</EuiFlexGroup>
{isEditing && stream.ingest.routing.length > 0 ? (
{isEditing && stream.ingest.wired.routing.length > 0 ? (
<EuiFlexItem grow={false}>
<ChildrenAffectedCallout childStreams={stream.ingest.routing} />
<ChildrenAffectedCallout childStreams={stream.ingest.wired.routing} />
</EuiFlexItem>
) : null}
</>

View file

@ -120,6 +120,7 @@ export const useSchemaFields = ({
ingest: {
...definition.stream.ingest,
wired: {
...definition.stream.ingest.wired,
fields: {
...definition.stream.ingest.wired.fields,
[field.name]: nextFieldDefinitionConfig,
@ -171,6 +172,7 @@ export const useSchemaFields = ({
ingest: {
...definition.stream.ingest,
wired: {
...definition.stream.ingest.wired,
fields: omit(definition.stream.ingest.wired.fields, fieldName),
},
},

View file

@ -142,7 +142,9 @@ export const useDefinition = (
ingest: {
...definition.stream.ingest,
processing: nextProcessorDefinitions,
...(isWiredStreamGetResponse(definition) && { wired: { fields } }),
...(isWiredStreamGetResponse(definition) && {
wired: { ...definition.stream.ingest.wired, fields },
}),
},
} as IngestUpsertRequest,
},

View file

@ -89,7 +89,10 @@ export function ControlBar({
const request = {
ingest: {
...stream.ingest,
routing,
wired: {
...stream.ingest.wired,
routing,
},
},
} as IngestUpsertRequest;

View file

@ -39,15 +39,15 @@ export function useRoutingState({
// Child streams: either represents the child streams as they are, or the new order from drag and drop.
const [childStreams, setChildStreams] = React.useState<
WiredStreamGetResponse['stream']['ingest']['routing']
>(definition?.stream.ingest.routing ?? []);
WiredStreamGetResponse['stream']['ingest']['wired']['routing']
>(definition?.stream.ingest.wired.routing ?? []);
useEffect(() => {
setChildStreams(definition?.stream.ingest.routing ?? []);
setChildStreams(definition?.stream.ingest.wired.routing ?? []);
}, [definition]);
// Note: just uses reference equality to check if the order has changed as onChildStreamReorder will create a new array.
const hasChildStreamsOrderChanged = childStreams !== definition?.stream.ingest.routing;
const hasChildStreamsOrderChanged = childStreams !== definition?.stream.ingest.wired.routing;
// Child stream currently being dragged
const [draggingChildStream, setDraggingChildStream] = React.useState<string | undefined>();
@ -73,8 +73,8 @@ export function useRoutingState({
const cancelChanges = useCallback(() => {
setChildUnderEdit(undefined);
setChildStreams(definition?.stream.ingest.routing ?? []);
}, [definition?.stream.ingest.routing]);
setChildStreams(definition?.stream.ingest.wired.routing ?? []);
}, [definition?.stream.ingest.wired.routing]);
const debouncedChildUnderEdit = useDebounced(childUnderEdit, 300);

View file

@ -50,7 +50,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
ingest: {
lifecycle: { inherit: {} },
processing: [],
routing: [],
unwired: {},
},
});
@ -67,7 +66,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
lifecycle: { inherit: {} },
routing: [],
processing: [
{
grok: {
@ -122,7 +120,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
},
],
routing: [],
unwired: {},
},
});
@ -188,7 +185,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
ingest: {
lifecycle: { inherit: {} },
processing: [],
routing: [],
unwired: {},
},
},
@ -284,7 +280,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
lifecycle: { inherit: {} },
routing: [],
processing: [
{
grok: {

View file

@ -77,8 +77,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
},
],
routing: [],
wired: {
routing: [],
fields: {
'@timestamp': {
type: 'date',

View file

@ -8,6 +8,7 @@
import expect from '@kbn/expect';
import {
isGroupStreamDefinitionBase,
isUnwiredStreamDefinition,
StreamGetResponse,
WiredStreamGetResponse,
} from '@kbn/streams-schema';
@ -40,8 +41,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('checks whether deeply nested stream is created correctly', async () => {
function getChildNames(stream: StreamGetResponse['stream']): string[] {
if (isGroupStreamDefinitionBase(stream)) return [];
return stream.ingest.routing.map((r) => r.destination);
if (isGroupStreamDefinitionBase(stream) || isUnwiredStreamDefinition(stream)) return [];
return stream.ingest.wired.routing.map((r) => r.destination);
}
const logs = await apiClient.fetch('GET /api/streams/{name}', {
params: {

View file

@ -36,33 +36,33 @@ const streams: StreamPutItem[] = [
type: 'keyword',
},
},
routing: [
{
destination: 'logs.test',
if: {
and: [
{
field: 'numberfield',
operator: 'gt',
value: 15,
},
],
},
},
{
destination: 'logs.test2',
if: {
and: [
{
field: 'field2',
operator: 'eq',
value: 'abc',
},
],
},
},
],
},
routing: [
{
destination: 'logs.test',
if: {
and: [
{
field: 'numberfield',
operator: 'gt',
value: 15,
},
],
},
},
{
destination: 'logs.test2',
if: {
and: [
{
field: 'field2',
operator: 'eq',
value: 'abc',
},
],
},
},
],
},
},
},
@ -71,9 +71,9 @@ const streams: StreamPutItem[] = [
stream: {
ingest: {
lifecycle: { inherit: {} },
routing: [],
processing: [],
wired: {
routing: [],
fields: {
numberfield: {
type: 'long',
@ -103,8 +103,8 @@ const streams: StreamPutItem[] = [
type: 'keyword',
},
},
routing: [],
},
routing: [],
},
},
},
@ -120,8 +120,8 @@ const streams: StreamPutItem[] = [
type: 'keyword',
},
},
routing: [],
},
routing: [],
},
},
},

View file

@ -86,9 +86,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
lifecycle: { inherit: {} },
routing: [],
processing: [],
wired: { fields: {} },
wired: { fields: {}, routing: [] },
},
},
dashboards: [],
@ -159,7 +158,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
...wiredPutBody.stream.ingest,
routing: [{ destination: 'logs.overrides.lifecycle', if: { never: {} } }],
wired: {
fields: {},
routing: [{ destination: 'logs.overrides.lifecycle', if: { never: {} } }],
},
lifecycle: { dsl: { data_retention: '1d' } },
},
},
@ -203,7 +205,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
...wiredPutBody.stream.ingest,
routing: [{ destination: 'logs.10d.20d.inherits', if: { never: {} } }],
wired: {
fields: {},
routing: [{ destination: 'logs.10d.20d.inherits', if: { never: {} } }],
},
},
},
});
@ -266,7 +271,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
...wiredPutBody.stream.ingest,
routing: [{ destination: 'logs.ilm.stream', if: { never: {} } }],
wired: {
fields: {},
routing: [{ destination: 'logs.ilm.stream', if: { never: {} } }],
},
lifecycle: { ilm: { policy: 'my-policy' } },
},
},
@ -285,7 +293,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
...wiredPutBody.stream.ingest,
routing: [],
lifecycle: { ilm: { policy: 'my-policy' } },
},
},
@ -299,7 +306,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
...wiredPutBody.stream.ingest,
routing: [],
wired: {
fields: {},
routing: [],
},
lifecycle: { dsl: { data_retention: '7d' } },
},
},
@ -315,7 +325,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
...wiredPutBody.stream.ingest,
routing: [],
wired: {
fields: {},
routing: [],
},
lifecycle: { dsl: { data_retention: '7d' } },
},
},
@ -329,7 +342,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
...wiredPutBody.stream.ingest,
routing: [],
wired: {
routing: [],
fields: {},
},
lifecycle: { ilm: { policy: 'my-policy' } },
},
},
@ -345,7 +361,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
lifecycle: { inherit: {} },
routing: [],
processing: [],
unwired: {},
},
@ -447,7 +462,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
...wiredPutBody.stream.ingest,
routing: [],
wired: {
fields: {},
routing: [],
},
lifecycle: { dsl: { data_retention: '1d' } },
},
},
@ -463,7 +481,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
...wiredPutBody.stream.ingest,
routing: [],
wired: {
fields: {},
routing: [],
},
lifecycle: { ilm: { policy: 'this-stream-policy-does-not-exist' } },
},
},
@ -490,7 +511,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
...wiredPutBody.stream.ingest,
routing: [],
wired: {
fields: {},
routing: [],
},
lifecycle: { ilm: { policy: policyName } },
},
},

View file

@ -19,8 +19,8 @@ const rootStreamDefinition: WiredStreamDefinition = {
ingest: {
lifecycle: { dsl: {} },
processing: [],
routing: [],
wired: {
routing: [],
fields: {
'@timestamp': {
type: 'date',
@ -91,6 +91,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
ingest: {
...rootStreamDefinition.ingest,
wired: {
...rootStreamDefinition.ingest.wired,
fields: {
...rootStreamDefinition.ingest.wired.fields,
'log.level': {
@ -111,16 +112,19 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
ingest: {
...rootStreamDefinition.ingest,
routing: [
{
destination: 'logs.gcpcloud',
if: {
field: 'cloud.provider',
operator: 'eq',
value: 'gcp',
wired: {
...rootStreamDefinition.ingest.wired,
routing: [
{
destination: 'logs.gcpcloud',
if: {
field: 'cloud.provider',
operator: 'eq',
value: 'gcp',
},
},
},
],
],
},
},
},
};