🌊 Refactor APIs to follow Elasticsearch conventions (#204671)

## Summary

This PR refactors the API by creating a new packaged called
`@kbn/streams-schema` where you can find all the Zod types along with
some type guards. I've also updated all the API's and calls to use the
new schemas.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
This commit is contained in:
Chris Cowan 2024-12-23 10:27:16 -07:00 committed by GitHub
parent 34bc507dc3
commit 5ae53d4f0b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
90 changed files with 1488 additions and 615 deletions

1
.github/CODEOWNERS vendored
View file

@ -763,6 +763,7 @@ x-pack/packages/kbn-ai-assistant @elastic/search-kibana
x-pack/packages/kbn-alerting-comparators @elastic/response-ops
x-pack/packages/kbn-alerting-state-types @elastic/response-ops
x-pack/packages/kbn-random-sampling @elastic/kibana-visualizations
x-pack/packages/kbn-streams-schema @elastic/streams-program-team
x-pack/packages/kbn-synthetics-private-location @elastic/obs-ux-management-team
x-pack/packages/maps/vector_tile_utils @elastic/kibana-presentation
x-pack/packages/observability/observability_utils/observability_utils_browser @elastic/observability-ui

View file

@ -945,6 +945,7 @@
"@kbn/std": "link:packages/kbn-std",
"@kbn/streams-app-plugin": "link:x-pack/solutions/observability/plugins/streams_app",
"@kbn/streams-plugin": "link:x-pack/solutions/observability/plugins/streams",
"@kbn/streams-schema": "link:x-pack/packages/kbn-streams-schema",
"@kbn/synthetics-plugin": "link:x-pack/solutions/observability/plugins/synthetics",
"@kbn/synthetics-private-location": "link:x-pack/packages/kbn-synthetics-private-location",
"@kbn/task-manager-fixture-plugin": "link:x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture",

View file

@ -1878,6 +1878,8 @@
"@kbn/streams-app-plugin/*": ["x-pack/solutions/observability/plugins/streams_app/*"],
"@kbn/streams-plugin": ["x-pack/solutions/observability/plugins/streams"],
"@kbn/streams-plugin/*": ["x-pack/solutions/observability/plugins/streams/*"],
"@kbn/streams-schema": ["x-pack/packages/kbn-streams-schema"],
"@kbn/streams-schema/*": ["x-pack/packages/kbn-streams-schema/*"],
"@kbn/synthetics-e2e": ["x-pack/solutions/observability/plugins/synthetics/e2e"],
"@kbn/synthetics-e2e/*": ["x-pack/solutions/observability/plugins/synthetics/e2e/*"],
"@kbn/synthetics-plugin": ["x-pack/solutions/observability/plugins/synthetics"],

View file

@ -0,0 +1,3 @@
# @kbn/streams-schema
This shared package contains the Zod schema definition for the Streams project.

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './src/apis';
export * from './src/models';
export * from './src/helpers';

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
module.exports = {
preset: '@kbn/test',
rootDir: '../../..',
roots: ['<rootDir>/x-pack/packages/kbn-streams-schema'],
};

View file

@ -0,0 +1,8 @@
{
"type": "shared-common",
"id": "@kbn/streams-schema",
"owner": "@elastic/streams-program-team",
"group": "observability",
"visibility": "shared"
}

View file

@ -0,0 +1,7 @@
{
"name": "@kbn/streams-schema",
"description": "Streams Zod schema definition and common models shared between public and server.",
"private": true,
"version": "1.0.0",
"license": "Elastic License 2.0"
}

View file

@ -0,0 +1,105 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`ReadStreamResponse should successfully parse 1`] = `
Object {
"streams": Array [
Object {
"elasticsearch_assets": Array [],
"inherited_fields": Object {
"@timestamp": Object {
"from": "logs",
"type": "date",
},
"message": Object {
"from": "logs",
"type": "match_only_text",
},
},
"name": "logs.nginx",
"stream": Object {
"ingest": Object {
"processing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"config": Object {
"grok": Object {
"field": "message",
"patterns": Array [
"%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}",
],
},
},
},
],
"routing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"name": "logs.errors",
},
],
"wired": Object {
"fields": Object {
"new_field": Object {
"type": "long",
},
},
},
},
},
},
Object {
"elasticsearch_assets": Array [],
"inherited_fields": Object {
"@timestamp": Object {
"from": "logs",
"type": "date",
},
"message": Object {
"from": "logs",
"type": "match_only_text",
},
},
"name": "logs.nginx",
"stream": Object {
"ingest": Object {
"processing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"config": Object {
"grok": Object {
"field": "message",
"patterns": Array [
"%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}",
],
},
},
},
],
"routing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"name": "logs.errors",
},
],
},
},
},
],
}
`;

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './read_streams_response';
export * from './list_streams_response';

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { streamDefintionSchema } from '../models';
export const listStreamsResponseSchema = z.object({
streams: z.array(streamDefintionSchema),
});
export type ListStreamsResponse = z.infer<typeof listStreamsResponseSchema>;

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { readStreamResponse } from '../fixtures/read_streams_response';
import { readStreamResponseSchema } from './read_streams_response';
describe('ReadStreamResponse', () => {
it('should successfully parse', () => {
expect(readStreamResponseSchema.parse(readStreamResponse)).toMatchSnapshot();
});
});

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { readStreamDefinitonSchema } from '../models';
export const readStreamResponseSchema = z.object({
streams: z.array(readStreamDefinitonSchema),
});
export type ReadStreamResponse = z.infer<typeof readStreamResponseSchema>;

View file

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ingestStream } from './ingest_stream';
export const ingestReadStream = {
...ingestStream,
inherited_fields: {
'@timestamp': {
type: 'date',
from: 'logs',
},
message: {
type: 'match_only_text',
from: 'logs',
},
},
};

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ingestStreamConfig } from './ingest_stream_config';
export const ingestStream = {
name: 'logs.nginx',
elasticsearch_assets: [],
stream: ingestStreamConfig,
};

View file

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const ingestStreamConfig = {
ingest: {
processing: [
{
config: {
grok: {
field: 'message',
patterns: ['%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}'],
},
},
condition: {
field: 'log.level',
operator: 'eq',
value: 'error',
},
},
],
routing: [
{
name: 'logs.errors',
condition: {
field: 'log.level',
operator: 'eq',
value: 'error',
},
},
],
},
};

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ingestReadStream } from './ingest_read_stream';
import { wiredReadStream } from './wired_read_stream';
export const readStreamResponse = {
streams: [wiredReadStream, ingestReadStream],
};

View file

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { wiredStream } from './wired_stream';
export const wiredReadStream = {
...wiredStream,
inherited_fields: {
'@timestamp': {
type: 'date',
from: 'logs',
},
message: {
type: 'match_only_text',
from: 'logs',
},
},
};

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { wiredStreamConfig } from './wired_stream_config';
export const wiredStream = {
name: 'logs.nginx',
elasticsearch_assets: [],
stream: wiredStreamConfig,
};

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const wiredStreamConfig = {
ingest: {
processing: [
{
config: {
grok: {
field: 'message',
patterns: ['%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}'],
},
},
condition: {
field: 'log.level',
operator: 'eq',
value: 'error',
},
},
],
routing: [
{
name: 'logs.errors',
condition: {
field: 'log.level',
operator: 'eq',
value: 'error',
},
},
],
wired: {
fields: {
new_field: {
type: 'long',
},
},
},
},
};

View file

@ -5,4 +5,4 @@
* 2.0.
*/
export type { StreamDefinition, ReadStreamDefinition } from './types';
export * from './type_guards';

View file

@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ZodSchema } from '@kbn/zod';
import {
AndCondition,
conditionSchema,
dissectProcessingDefinitionSchema,
DissectProcssingDefinition,
FilterCondition,
filterConditionSchema,
GrokProcessingDefinition,
grokProcessingDefinitionSchema,
IngestReadStreamDefinition,
ingestReadStreamDefinitonSchema,
IngestStreamDefinition,
ingestStreamDefinitonSchema,
OrCondition,
ReadStreamDefinition,
readStreamDefinitonSchema,
StreamDefinition,
streamDefintionSchema,
WiredReadStreamDefinition,
wiredReadStreamDefinitonSchema,
WiredStreamDefinition,
wiredStreamDefinitonSchema,
} from '../models';
import {
IngestStreamConfigDefinition,
ingestStreamConfigDefinitonSchema,
StreamConfigDefinition,
streamConfigDefinitionSchema,
WiredStreamConfigDefinition,
wiredStreamConfigDefinitonSchema,
} from '../models/stream_config';
export function isSchema<T>(zodSchema: ZodSchema, subject: T) {
try {
zodSchema.parse(subject);
return true;
} catch (e) {
return false;
}
}
export function isReadStream(subject: any): subject is ReadStreamDefinition {
return isSchema(readStreamDefinitonSchema, subject);
}
export function isWiredReadStream(subject: any): subject is WiredReadStreamDefinition {
return isSchema(wiredReadStreamDefinitonSchema, subject);
}
export function isIngestReadStream(subject: any): subject is IngestReadStreamDefinition {
return isSchema(ingestReadStreamDefinitonSchema, subject);
}
export function isStream(subject: any): subject is StreamDefinition {
return isSchema(streamDefintionSchema, subject);
}
export function isIngestStream(
subject: IngestStreamDefinition | WiredStreamDefinition
): subject is IngestStreamDefinition {
return isSchema(ingestStreamDefinitonSchema, subject);
}
export function isWiredStream(
subject: IngestStreamDefinition | WiredStreamDefinition
): subject is WiredStreamDefinition {
return isSchema(wiredStreamDefinitonSchema, subject);
}
export function isWiredStreamConfig(subject: any): subject is WiredStreamConfigDefinition {
return isSchema(wiredStreamConfigDefinitonSchema, subject);
}
export function isIngestStreamConfig(subject: any): subject is IngestStreamConfigDefinition {
return isSchema(ingestStreamConfigDefinitonSchema, subject);
}
export function isStreamConfig(subject: any): subject is StreamConfigDefinition {
return isSchema(streamConfigDefinitionSchema, subject);
}
export function isGrokProcessor(subject: any): subject is GrokProcessingDefinition {
return isSchema(grokProcessingDefinitionSchema, subject);
}
export function isDissectProcessor(subject: any): subject is DissectProcssingDefinition {
return isSchema(dissectProcessingDefinitionSchema, subject);
}
export function isFilterCondition(subject: any): subject is FilterCondition {
return isSchema(filterConditionSchema, subject);
}
export function isAndCondition(subject: any): subject is AndCondition {
return isSchema(conditionSchema, subject) && subject.and != null;
}
export function isOrCondition(subject: any): subject is OrCondition {
return isSchema(conditionSchema, subject) && subject.or != null;
}

View file

@ -48,71 +48,71 @@ export const conditionSchema: z.ZodType<Condition> = z.lazy(() =>
);
export const grokProcessingDefinitionSchema = z.object({
type: z.literal('grok'),
field: z.string(),
patterns: z.array(z.string()),
pattern_definitions: z.optional(z.record(z.string())),
grok: z.object({
field: z.string(),
patterns: z.array(z.string()),
pattern_definitions: z.optional(z.record(z.string())),
}),
});
export type GrokProcessingDefinition = z.infer<typeof grokProcessingDefinitionSchema>;
export const dissectProcessingDefinitionSchema = z.object({
type: z.literal('dissect'),
field: z.string(),
pattern: z.string(),
dissect: z.object({
field: z.string(),
pattern: z.string(),
}),
});
export type DissectProcssingDefinition = z.infer<typeof dissectProcessingDefinitionSchema>;
export const processingConfigSchema = z.union([
grokProcessingDefinitionSchema,
dissectProcessingDefinitionSchema,
]);
export const processingDefinitionSchema = z.object({
condition: z.optional(conditionSchema),
config: z.discriminatedUnion('type', [
grokProcessingDefinitionSchema,
dissectProcessingDefinitionSchema,
]),
config: processingConfigSchema,
});
export type ProcessingDefinition = z.infer<typeof processingDefinitionSchema>;
export const fieldDefinitionSchema = z.object({
name: z.string(),
export const fieldDefinitionConfigSchema = z.object({
type: z.enum(['keyword', 'match_only_text', 'long', 'double', 'date', 'boolean', 'ip']),
format: z.optional(z.string()),
});
export type FieldDefinitionConfig = z.infer<typeof fieldDefinitionConfigSchema>;
export const fieldDefinitionSchema = z.record(z.string(), fieldDefinitionConfigSchema);
export type FieldDefinition = z.infer<typeof fieldDefinitionSchema>;
export const inheritedFieldDefinitionSchema = z.record(
z.string(),
fieldDefinitionConfigSchema.extend({ from: z.string() })
);
export type InheritedFieldDefinition = z.infer<typeof inheritedFieldDefinitionSchema>;
export const fieldDefinitionConfigWithNameSchema = fieldDefinitionConfigSchema.extend({
name: z.string(),
});
export type FieldDefinitionConfigWithName = z.infer<typeof fieldDefinitionConfigWithNameSchema>;
export const streamChildSchema = z.object({
id: z.string(),
name: z.string(),
condition: z.optional(conditionSchema),
});
export type StreamChild = z.infer<typeof streamChildSchema>;
export const streamWithoutIdDefinitonSchema = z.object({
processing: z.array(processingDefinitionSchema).default([]),
fields: z.array(fieldDefinitionSchema).default([]),
managed: z.boolean().default(true),
children: z.array(streamChildSchema).default([]),
});
export const elasticsearchAssetSchema = z.array(
z.object({
type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']),
id: z.string(),
})
);
export type StreamWithoutIdDefinition = z.infer<typeof streamDefinitonSchema>;
export const unmanagedElasticsearchAsset = z.object({
type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']),
id: z.string(),
});
export type UnmanagedElasticsearchAsset = z.infer<typeof unmanagedElasticsearchAsset>;
export const streamDefinitonSchema = streamWithoutIdDefinitonSchema.extend({
id: z.string(),
unmanaged_elasticsearch_assets: z.optional(z.array(unmanagedElasticsearchAsset)),
});
export type StreamDefinition = z.infer<typeof streamDefinitonSchema>;
export const streamDefinitonWithoutChildrenSchema = streamDefinitonSchema.omit({ children: true });
export type StreamWithoutChildrenDefinition = z.infer<typeof streamDefinitonWithoutChildrenSchema>;
export const readStreamDefinitonSchema = streamDefinitonSchema.extend({
inheritedFields: z.array(fieldDefinitionSchema.extend({ from: z.string() })).default([]),
});
export type ReadStreamDefinition = z.infer<typeof readStreamDefinitonSchema>;
export type ElasticsearchAsset = z.infer<typeof elasticsearchAssetSchema>;

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './common';
export * from './read_streams';
export * from './streams';
export * from './stream_config';

View file

@ -0,0 +1,104 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`ReadStream should successfully parse ingestReadStream 1`] = `
Object {
"elasticsearch_assets": Array [],
"inherited_fields": Object {
"@timestamp": Object {
"from": "logs",
"type": "date",
},
"message": Object {
"from": "logs",
"type": "match_only_text",
},
},
"name": "logs.nginx",
"stream": Object {
"ingest": Object {
"processing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"config": Object {
"grok": Object {
"field": "message",
"patterns": Array [
"%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}",
],
},
},
},
],
"routing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"name": "logs.errors",
},
],
},
},
}
`;
exports[`ReadStream should successfully parse wiredReadStream 1`] = `
Object {
"elasticsearch_assets": Array [],
"inherited_fields": Object {
"@timestamp": Object {
"from": "logs",
"type": "date",
},
"message": Object {
"from": "logs",
"type": "match_only_text",
},
},
"name": "logs.nginx",
"stream": Object {
"ingest": Object {
"processing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"config": Object {
"grok": Object {
"field": "message",
"patterns": Array [
"%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}",
],
},
},
},
],
"routing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"name": "logs.errors",
},
],
"wired": Object {
"fields": Object {
"new_field": Object {
"type": "long",
},
},
},
},
},
}
`;

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './ingest_read_stream';
export * from './wired_read_stream';
export * from './read_stream';

View file

@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { ingestStreamDefinitonSchema } from '../streams';
import { inheritedFieldDefinitionSchema } from '../common';
export const ingestReadStreamDefinitonSchema = ingestStreamDefinitonSchema
.extend({
inherited_fields: inheritedFieldDefinitionSchema.default({}),
})
.strict();
export type IngestReadStreamDefinition = z.infer<typeof ingestReadStreamDefinitonSchema>;

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ingestReadStream } from '../../fixtures/ingest_read_stream';
import { wiredReadStream } from '../../fixtures/wired_read_stream';
import { readStreamDefinitonSchema } from './read_stream';
describe('ReadStream', () => {
it('should successfully parse wiredReadStream', () => {
expect(readStreamDefinitonSchema.parse(wiredReadStream)).toMatchSnapshot();
});
it('should successfully parse ingestReadStream', () => {
expect(readStreamDefinitonSchema.parse(ingestReadStream)).toMatchSnapshot();
});
});

View file

@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { ingestReadStreamDefinitonSchema } from './ingest_read_stream';
import { wiredReadStreamDefinitonSchema } from './wired_read_stream';
export const readStreamDefinitonSchema = z.union([
wiredReadStreamDefinitonSchema,
ingestReadStreamDefinitonSchema,
]);
export type ReadStreamDefinition = z.infer<typeof readStreamDefinitonSchema>;

View file

@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { wiredStreamDefinitonSchema } from '../streams';
import { inheritedFieldDefinitionSchema } from '../common';
export const wiredReadStreamDefinitonSchema = wiredStreamDefinitonSchema
.extend({
inherited_fields: inheritedFieldDefinitionSchema.default({}),
})
.strict();
export type WiredReadStreamDefinition = z.infer<typeof wiredReadStreamDefinitonSchema>;

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './wired_stream_config';
export * from './ingest_stream_config';
export * from './stream_config';

View file

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { processingDefinitionSchema, streamChildSchema } from '../common';
export const ingestStreamConfigDefinitonSchema = z
.object({
ingest: z.object({
processing: z.array(processingDefinitionSchema).default([]),
routing: z.array(streamChildSchema).default([]),
}),
})
.strict();
export type IngestStreamConfigDefinition = z.infer<typeof ingestStreamConfigDefinitonSchema>;

View file

@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { ingestStreamConfigDefinitonSchema } from './ingest_stream_config';
import { wiredStreamConfigDefinitonSchema } from './wired_stream_config';
export const streamConfigDefinitionSchema = z.union([
wiredStreamConfigDefinitonSchema,
ingestStreamConfigDefinitonSchema,
]);
export type StreamConfigDefinition = z.infer<typeof streamConfigDefinitionSchema>;

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { fieldDefinitionSchema, processingDefinitionSchema, streamChildSchema } from '../common';
export const wiredStreamConfigDefinitonSchema = z
.object({
ingest: z.object({
processing: z.array(processingDefinitionSchema).default([]),
wired: z.object({
fields: fieldDefinitionSchema.default({}),
}),
routing: z.array(streamChildSchema).default([]),
}),
})
.strict();
export type WiredStreamConfigDefinition = z.infer<typeof wiredStreamConfigDefinitonSchema>;

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './ingest_stream';
export * from './wired_stream';
export * from './stream';

View file

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { ingestStreamConfigDefinitonSchema } from '../stream_config';
import { elasticsearchAssetSchema } from '../common';
export const ingestStreamDefinitonSchema = z
.object({
name: z.string(),
elasticsearch_assets: z.optional(elasticsearchAssetSchema),
stream: ingestStreamConfigDefinitonSchema,
})
.strict();
export type IngestStreamDefinition = z.infer<typeof ingestStreamDefinitonSchema>;

View file

@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { wiredStreamDefinitonSchema } from './wired_stream';
import { ingestStreamDefinitonSchema } from './ingest_stream';
export const streamDefintionSchema = z.union([
wiredStreamDefinitonSchema,
ingestStreamDefinitonSchema,
]);
export type StreamDefinition = z.infer<typeof streamDefintionSchema>;

View file

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { wiredStreamConfigDefinitonSchema } from '../stream_config';
import { elasticsearchAssetSchema } from '../common';
export const wiredStreamDefinitonSchema = z
.object({
name: z.string(),
elasticsearch_assets: z.optional(elasticsearchAssetSchema),
stream: wiredStreamConfigDefinitonSchema,
})
.strict();
export type WiredStreamDefinition = z.infer<typeof wiredStreamDefinitonSchema>;

View file

@ -0,0 +1,20 @@
{
"extends": "../../../tsconfig.base.json",
"compilerOptions": {
"outDir": "target/types",
"types": [
"jest",
"node"
]
},
"include": [
"**/*.ts"
],
"kbn_references": [
"@kbn/zod"
],
"exclude": [
"target/**/*"
]
}

View file

@ -17,5 +17,3 @@ export const plugin = async (context: PluginInitializerContext<StreamsConfig>) =
const { StreamsPlugin } = await import('./plugin');
return new StreamsPlugin(context);
};
export type { ListStreamResponse } from './lib/streams/stream_crud';

View file

@ -10,7 +10,7 @@ import {
MappingDateProperty,
MappingProperty,
} from '@elastic/elasticsearch/lib/api/types';
import { StreamDefinition } from '../../../../common/types';
import { WiredStreamDefinition } from '@kbn/streams-schema';
import { ASSET_VERSION } from '../../../../common/constants';
import { logsSettings } from './logs_layer';
import { isRoot } from '../helpers/hierarchy';
@ -18,26 +18,26 @@ import { getComponentTemplateName } from './name';
export function generateLayer(
id: string,
definition: StreamDefinition
definition: WiredStreamDefinition
): ClusterPutComponentTemplateRequest {
const properties: Record<string, MappingProperty> = {};
definition.fields.forEach((field) => {
Object.entries(definition.stream.ingest.wired.fields).forEach(([field, props]) => {
const property: MappingProperty = {
type: field.type,
type: props.type,
};
if (field.name === '@timestamp') {
if (field === '@timestamp') {
// @timestamp can't ignore malformed dates as it's used for sorting in logsdb
(property as MappingDateProperty).ignore_malformed = false;
}
if (field.type === 'date' && field.format) {
(property as MappingDateProperty).format = field.format;
if (props.type === 'date' && props.format) {
(property as MappingDateProperty).format = props.format;
}
properties[field.name] = property;
properties[field] = property;
});
return {
name: getComponentTemplateName(id),
template: {
settings: isRoot(definition.id) ? logsSettings : {},
settings: isRoot(definition.name) ? logsSettings : {},
mappings: {
subobjects: false,
dynamic: false,

View file

@ -5,8 +5,13 @@
* 2.0.
*/
import { Condition, FilterCondition } from '../../../../common/types';
import { isAndCondition, isFilterCondition, isOrCondition } from './condition_guards';
import {
Condition,
FilterCondition,
isAndCondition,
isFilterCondition,
isOrCondition,
} from '@kbn/streams-schema';
export function isComplete(condition: Condition): boolean {
if (isFilterCondition(condition)) {

View file

@ -1,29 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
AndCondition,
conditionSchema,
FilterCondition,
filterConditionSchema,
OrCondition,
} from '../../../../common/types';
export function isFilterCondition(subject: any): subject is FilterCondition {
const result = filterConditionSchema.safeParse(subject);
return result.success;
}
export function isAndCondition(subject: any): subject is AndCondition {
const result = conditionSchema.safeParse(subject);
return result.success && subject.and != null;
}
export function isOrCondition(subject: any): subject is OrCondition {
const result = conditionSchema.safeParse(subject);
return result.success && subject.or != null;
}

View file

@ -11,8 +11,10 @@ import {
Condition,
FilterCondition,
UnaryFilterCondition,
} from '../../../../common/types';
import { isAndCondition, isFilterCondition, isOrCondition } from './condition_guards';
isAndCondition,
isFilterCondition,
isOrCondition,
} from '@kbn/streams-schema';
function safePainlessField(conditionOrField: FilterCondition | string) {
if (isFilterCondition(conditionOrField)) {

View file

@ -5,8 +5,13 @@
* 2.0.
*/
import { Condition, FilterCondition } from '../../../../common/types';
import { isAndCondition, isFilterCondition, isOrCondition } from './condition_guards';
import {
Condition,
FilterCondition,
isAndCondition,
isFilterCondition,
isOrCondition,
} from '@kbn/streams-schema';
function conditionToClause(condition: FilterCondition) {
switch (condition.operator) {

View file

@ -5,15 +5,16 @@
* 2.0.
*/
import { StreamDefinition } from '../../../../common/types';
import { StreamDefinition } from '@kbn/streams-schema';
export function isDescendandOf(parent: StreamDefinition, child: StreamDefinition) {
return child.id.startsWith(parent.id);
return child.name.startsWith(parent.name);
}
export function isChildOf(parent: StreamDefinition, child: StreamDefinition) {
return (
isDescendandOf(parent, child) && child.id.split('.').length === parent.id.split('.').length + 1
isDescendandOf(parent, child) &&
child.name.split('.').length === parent.name.split('.').length + 1
);
}

View file

@ -5,16 +5,33 @@
* 2.0.
*/
import { StreamDefinition } from '../../../../common/types';
import {
isDissectProcessor,
isGrokProcessor,
ProcessingDefinition,
StreamDefinition,
} from '@kbn/streams-schema';
import { get } from 'lodash';
import { ASSET_VERSION } from '../../../../common/constants';
import { conditionToPainless } from '../helpers/condition_to_painless';
import { logsDefaultPipelineProcessors } from './logs_default_pipeline';
import { isRoot } from '../helpers/hierarchy';
import { getProcessingPipelineName } from './name';
function getProcessorType(processor: ProcessingDefinition) {
if (isGrokProcessor(processor.config)) {
return 'grok';
}
if (isDissectProcessor(processor.config)) {
return 'dissect';
}
throw new Error('Unknown processor type');
}
function generateProcessingSteps(definition: StreamDefinition) {
return definition.processing.map((processor) => {
const { type, ...config } = processor.config;
return definition.stream.ingest.processing.map((processor) => {
const type = getProcessorType(processor);
const config = get(processor.config, type);
return {
[type]: {
...config,
@ -28,7 +45,7 @@ export function generateIngestPipeline(id: string, definition: StreamDefinition)
return {
id: getProcessingPipelineName(id),
processors: [
...(isRoot(definition.id) ? logsDefaultPipelineProcessors : []),
...(isRoot(definition.name) ? logsDefaultPipelineProcessors : []),
...generateProcessingSteps(definition),
{
pipeline: {
@ -49,7 +66,7 @@ export function generateClassicIngestPipelineBody(definition: StreamDefinition)
return {
processors: generateProcessingSteps(definition),
_meta: {
description: `Stream-managed pipeline for the ${definition.id} stream`,
description: `Stream-managed pipeline for the ${definition.name} stream`,
managed: true,
},
version: ASSET_VERSION,

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { StreamDefinition } from '../../../../common/types';
import { StreamDefinition } from '@kbn/streams-schema';
import { ASSET_VERSION } from '../../../../common/constants';
import { conditionToPainless } from '../helpers/condition_to_painless';
import { getReroutePipelineName } from './name';
@ -16,17 +16,17 @@ interface GenerateReroutePipelineParams {
export async function generateReroutePipeline({ definition }: GenerateReroutePipelineParams) {
return {
id: getReroutePipelineName(definition.id),
processors: definition.children.map((child) => {
id: getReroutePipelineName(definition.name),
processors: definition.stream.ingest.routing.map((child) => {
return {
reroute: {
destination: child.id,
destination: child.name,
if: conditionToPainless(child.condition),
},
};
}),
_meta: {
description: `Reoute pipeline for the ${definition.id} stream`,
description: `Reoute pipeline for the ${definition.name} stream`,
managed: true,
},
version: ASSET_VERSION,

View file

@ -14,23 +14,16 @@ export function createStreamsIndex(scopedClusterClient: IScopedClusterClient) {
mappings: {
dynamic: 'strict',
properties: {
processing: {
type: 'object',
enabled: false,
},
fields: {
type: 'object',
enabled: false,
},
children: {
type: 'object',
enabled: false,
},
id: {
name: {
type: 'keyword',
},
managed: {
type: 'boolean',
stream: {
properties: {
ingest: {
type: 'object',
enabled: false,
},
},
},
},
},

View file

@ -5,29 +5,30 @@
* 2.0.
*/
import { StreamDefinition } from '../../../common/types';
import { WiredStreamDefinition } from '@kbn/streams-schema';
export const rootStreamDefinition: StreamDefinition = {
id: 'logs',
managed: true,
processing: [],
children: [],
fields: [
{
name: '@timestamp',
type: 'date',
export const rootStreamDefinition: WiredStreamDefinition = {
name: 'logs',
stream: {
ingest: {
processing: [],
routing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
},
},
},
},
{
name: 'message',
type: 'match_only_text',
},
{
name: 'host.name',
type: 'keyword',
},
{
name: 'log.level',
type: 'keyword',
},
],
},
};

View file

@ -10,8 +10,16 @@ import { Logger } from '@kbn/logging';
import { IngestPipeline, IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types';
import { set } from '@kbn/safer-lodash-set';
import { IndicesDataStream } from '@elastic/elasticsearch/lib/api/types';
import {
IngestStreamDefinition,
WiredStreamDefinition,
StreamDefinition,
ListStreamsResponse,
isWiredStream,
FieldDefinition,
} from '@kbn/streams-schema';
import { omit } from 'lodash';
import { STREAMS_INDEX } from '../../../common/constants';
import { FieldDefinition, StreamDefinition } from '../../../common/types';
import { generateLayer } from './component_templates/generate_layer';
import { deleteComponent, upsertComponent } from './component_templates/manage_component_templates';
import { getComponentTemplateName } from './component_templates/name';
@ -142,58 +150,58 @@ export async function deleteStreamObjects({ id, scopedClusterClient, logger }: D
async function upsertInternalStream({ definition, scopedClusterClient }: BaseParamsWithDefinition) {
return scopedClusterClient.asInternalUser.index({
id: definition.id,
id: definition.name,
index: STREAMS_INDEX,
document: { ...definition },
document: { ...omit(definition, 'elasticsearch_assets') },
refresh: 'wait_for',
});
}
type ListStreamsParams = BaseParams;
export interface ListStreamResponse {
definitions: StreamDefinition[];
}
export async function listStreams({
scopedClusterClient,
}: ListStreamsParams): Promise<ListStreamResponse> {
const response = await scopedClusterClient.asInternalUser.search<StreamDefinition>({
}: ListStreamsParams): Promise<ListStreamsResponse> {
const response = await scopedClusterClient.asInternalUser.search<WiredStreamDefinition>({
index: STREAMS_INDEX,
size: 10000,
sort: [{ id: 'asc' }],
sort: [{ name: 'asc' }],
});
const dataStreams = await listDataStreamsAsStreams({ scopedClusterClient });
let definitions = response.hits.hits.map((hit) => ({ ...hit._source! }));
const hasAccess = await Promise.all(
definitions.map((definition) => checkReadAccess({ id: definition.id, scopedClusterClient }))
definitions.map((definition) => checkReadAccess({ id: definition.name, scopedClusterClient }))
);
definitions = definitions.filter((_, index) => hasAccess[index]);
const definitionMap = new Map(definitions.map((definition) => [definition.id, definition]));
const definitionMap = new Map<string, StreamDefinition>(
definitions.map((definition) => [definition.name, definition])
);
dataStreams.forEach((dataStream) => {
if (!definitionMap.has(dataStream.id)) {
definitionMap.set(dataStream.id, dataStream);
if (!definitionMap.has(dataStream.name)) {
definitionMap.set(dataStream.name, dataStream);
}
});
return {
definitions: Array.from(definitionMap.values()),
streams: Array.from(definitionMap.values()),
};
}
export async function listDataStreamsAsStreams({
scopedClusterClient,
}: ListStreamsParams): Promise<StreamDefinition[]> {
}: ListStreamsParams): Promise<IngestStreamDefinition[]> {
const response = await scopedClusterClient.asInternalUser.indices.getDataStream();
return response.data_streams
.filter((dataStream) => dataStream.template.endsWith('@stream') === false)
.map((dataStream) => ({
id: dataStream.name,
managed: false,
children: [],
fields: [],
processing: [],
name: dataStream.name,
stream: {
ingest: {
processing: [],
routing: [],
},
},
}));
}
@ -202,15 +210,11 @@ interface ReadStreamParams extends BaseParams {
skipAccessCheck?: boolean;
}
export interface ReadStreamResponse {
definition: StreamDefinition;
}
export async function readStream({
id,
scopedClusterClient,
skipAccessCheck,
}: ReadStreamParams): Promise<ReadStreamResponse> {
}: ReadStreamParams): Promise<StreamDefinition> {
try {
const response = await scopedClusterClient.asInternalUser.get<StreamDefinition>({
id,
@ -223,11 +227,7 @@ export async function readStream({
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}
}
return {
definition: {
...definition,
},
};
return definition;
} catch (e) {
if (e.meta?.statusCode === 404) {
return readDataStreamAsStream({ id, scopedClusterClient, skipAccessCheck });
@ -237,20 +237,22 @@ export async function readStream({
}
export async function readDataStreamAsStream({ id, scopedClusterClient }: ReadStreamParams) {
const definition: StreamDefinition = {
id,
managed: false,
children: [],
fields: [],
processing: [],
const definition: IngestStreamDefinition = {
name: id,
stream: {
ingest: {
routing: [],
processing: [],
},
},
};
definition.unmanaged_elasticsearch_assets = await getUnmanagedElasticsearchAssets({
definition.elasticsearch_assets = await getUnmanagedElasticsearchAssets({
name: id,
scopedClusterClient,
});
return { definition };
return definition;
}
interface ReadUnmanagedAssetsParams extends BaseParams {
@ -314,19 +316,24 @@ interface ReadAncestorsParams extends BaseParams {
}
export interface ReadAncestorsResponse {
ancestors: Array<{ definition: StreamDefinition }>;
ancestors: StreamDefinition[];
}
export async function readAncestors({
id,
scopedClusterClient,
}: ReadAncestorsParams): Promise<ReadAncestorsResponse> {
}: ReadAncestorsParams): Promise<{ ancestors: WiredStreamDefinition[] }> {
const ancestorIds = getAncestors(id);
return {
ancestors: await Promise.all(
ancestorIds.map((ancestorId) =>
readStream({ scopedClusterClient, id: ancestorId, skipAccessCheck: true })
ancestorIds.map(
(ancestorId) =>
readStream({
scopedClusterClient,
id: ancestorId,
skipAccessCheck: true,
}) as unknown as WiredStreamDefinition
)
),
};
@ -337,7 +344,7 @@ interface ReadDescendantsParams extends BaseParams {
}
export async function readDescendants({ id, scopedClusterClient }: ReadDescendantsParams) {
const response = await scopedClusterClient.asInternalUser.search<StreamDefinition>({
const response = await scopedClusterClient.asInternalUser.search<WiredStreamDefinition>({
index: STREAMS_INDEX,
size: 10000,
body: {
@ -357,27 +364,30 @@ export async function readDescendants({ id, scopedClusterClient }: ReadDescendan
},
},
});
return response.hits.hits.map((hit) => hit._source as StreamDefinition);
return response.hits.hits.map((hit) => hit._source as WiredStreamDefinition);
}
export async function validateAncestorFields(
scopedClusterClient: IScopedClusterClient,
id: string,
fields: FieldDefinition[]
fields: FieldDefinition
) {
const { ancestors } = await readAncestors({
id,
scopedClusterClient,
});
for (const ancestor of ancestors) {
for (const field of fields) {
for (const name in fields) {
if (
ancestor.definition.fields.some(
(ancestorField) => ancestorField.type !== field.type && ancestorField.name === field.name
Object.hasOwn(fields, name) &&
isWiredStream(ancestor) &&
Object.entries(ancestor.stream.ingest.wired.fields).some(
([ancestorFieldName, attr]) =>
attr.type !== fields[name].type && ancestorFieldName === name
)
) {
throw new MalformedFields(
`Field ${field.name} is already defined with incompatible type in the parent stream ${ancestor.definition.id}`
`Field ${name} is already defined with incompatible type in the parent stream ${ancestor.name}`
);
}
}
@ -387,22 +397,23 @@ export async function validateAncestorFields(
export async function validateDescendantFields(
scopedClusterClient: IScopedClusterClient,
id: string,
fields: FieldDefinition[]
fields: FieldDefinition
) {
const descendants = await readDescendants({
id,
scopedClusterClient,
});
for (const descendant of descendants) {
for (const field of fields) {
for (const name in fields) {
if (
descendant.fields.some(
(descendantField) =>
descendantField.type !== field.type && descendantField.name === field.name
Object.hasOwn(fields, name) &&
Object.entries(descendant.stream.ingest.wired.fields).some(
([descendantFieldName, attr]) =>
attr.type !== fields[name].type && descendantFieldName === name
)
) {
throw new MalformedFields(
`Field ${field.name} is already defined with incompatible type in the child stream ${descendant.id}`
`Field ${name} is already defined with incompatible type in the child stream ${descendant.name}`
);
}
}
@ -449,7 +460,7 @@ export async function syncStream({
rootDefinition,
logger,
}: SyncStreamParams) {
if (!definition.managed) {
if (!isWiredStream(definition)) {
await syncUnmanagedStream({ scopedClusterClient, definition, logger });
await upsertInternalStream({
scopedClusterClient,
@ -457,7 +468,7 @@ export async function syncStream({
});
return;
}
const componentTemplate = generateLayer(definition.id, definition);
const componentTemplate = generateLayer(definition.name, definition);
await upsertComponent({
esClient: scopedClusterClient.asCurrentUser,
logger,
@ -466,7 +477,7 @@ export async function syncStream({
await upsertIngestPipeline({
esClient: scopedClusterClient.asCurrentUser,
logger,
pipeline: generateIngestPipeline(definition.id, definition),
pipeline: generateIngestPipeline(definition.name, definition),
});
const reroutePipeline = await generateReroutePipeline({
definition,
@ -479,12 +490,13 @@ export async function syncStream({
await upsertTemplate({
esClient: scopedClusterClient.asCurrentUser,
logger,
template: generateIndexTemplate(definition.id),
template: generateIndexTemplate(definition.name),
});
if (rootDefinition) {
const parentReroutePipeline = await generateReroutePipeline({
definition: rootDefinition,
});
await upsertIngestPipeline({
esClient: scopedClusterClient.asCurrentUser,
logger,
@ -494,7 +506,7 @@ export async function syncStream({
await upsertDataStream({
esClient: scopedClusterClient.asCurrentUser,
logger,
name: definition.id,
name: definition.name,
});
await upsertInternalStream({
scopedClusterClient,
@ -502,7 +514,7 @@ export async function syncStream({
});
await rolloverDataStreamIfNecessary({
esClient: scopedClusterClient.asCurrentUser,
name: definition.id,
name: definition.name,
logger,
mappings: componentTemplate.template.mappings?.properties,
});
@ -514,24 +526,19 @@ interface ExecutionPlanStep {
body?: Record<string, unknown>;
}
async function syncUnmanagedStream({ scopedClusterClient, definition, logger }: SyncStreamParams) {
if (definition.managed) {
async function syncUnmanagedStream({ scopedClusterClient, definition }: SyncStreamParams) {
if (isWiredStream(definition)) {
throw new Error('Got an unmanaged stream that is marked as managed');
}
if (definition.fields.length) {
throw new Error(
'Unmanaged streams cannot have managed fields, please edit the component templates directly'
);
}
if (definition.children.length) {
if (definition.stream.ingest.routing.length) {
throw new Error('Unmanaged streams cannot have managed children, coming soon');
}
const unmanagedAssets = await getUnmanagedElasticsearchAssets({
name: definition.id,
name: definition.name,
scopedClusterClient,
});
const executionPlan: ExecutionPlanStep[] = [];
const streamManagedPipelineName = getProcessingPipelineName(definition.id);
const streamManagedPipelineName = getProcessingPipelineName(definition.name);
const pipelineName = unmanagedAssets.find((asset) => asset.type === 'ingest_pipeline')?.id;
if (!pipelineName) {
throw new Error('Unmanaged stream needs a default ingest pipeline');
@ -546,7 +553,7 @@ async function syncUnmanagedStream({ scopedClusterClient, definition, logger }:
executionPlan
);
if (definition.processing.length) {
if (definition.stream.ingest.processing.length) {
// if the stream has processing, we need to create or update the stream managed pipeline
executionPlan.push({
method: 'PUT',
@ -629,14 +636,14 @@ async function ensureStreamManagedPipelineReference(
definition: StreamDefinition,
executionPlan: ExecutionPlanStep[]
) {
const streamManagedPipelineName = getProcessingPipelineName(definition.id);
const streamManagedPipelineName = getProcessingPipelineName(definition.name);
const { targetPipelineName, targetPipeline, referencesStreamManagedPipeline } =
await findStreamManagedPipelineReference(scopedClusterClient, pipelineName, definition.id);
await findStreamManagedPipelineReference(scopedClusterClient, pipelineName, definition.name);
if (!referencesStreamManagedPipeline) {
const callStreamManagedPipelineProcessor: IngestProcessorContainer = {
pipeline: {
name: streamManagedPipelineName,
if: `ctx._index == '${definition.id}'`,
if: `ctx._index == '${definition.name}'`,
ignore_missing_pipeline: true,
description:
"Call the stream's managed pipeline - do not change this manually but instead use the streams UI or API",

View file

@ -9,6 +9,7 @@ import { z } from '@kbn/zod';
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import { badRequest, internal, notFound } from '@hapi/boom';
import { isWiredStream } from '@kbn/streams-schema';
import {
DefinitionNotFound,
ForkConditionMissing,
@ -43,7 +44,6 @@ export const deleteStreamRoute = createServerRoute({
}),
}),
handler: async ({
response,
params,
logger,
request,
@ -79,8 +79,8 @@ export async function deleteStream(
logger: Logger
) {
try {
const { definition } = await readStream({ scopedClusterClient, id });
if (!definition.managed) {
const definition = await readStream({ scopedClusterClient, id });
if (!isWiredStream(definition)) {
await deleteUnmanagedStreamObjects({ scopedClusterClient, id, logger });
return;
}
@ -92,8 +92,8 @@ export async function deleteStream(
// need to update parent first to cut off documents streaming down
await updateParentStream(scopedClusterClient, id, parentId, logger);
for (const child of definition.children) {
await deleteStream(scopedClusterClient, child.id, logger);
for (const child of definition.stream.ingest.routing) {
await deleteStream(scopedClusterClient, child.name, logger);
}
await deleteStreamObjects({ scopedClusterClient, id, logger });
} catch (e) {
@ -111,12 +111,14 @@ async function updateParentStream(
parentId: string,
logger: Logger
) {
const { definition: parentDefinition } = await readStream({
const parentDefinition = await readStream({
scopedClusterClient,
id: parentId,
});
parentDefinition.children = parentDefinition.children.filter((child) => child.id !== id);
parentDefinition.stream.ingest.routing = parentDefinition.stream.ingest.routing.filter(
(child) => child.name !== id
);
await syncStream({
scopedClusterClient,

View file

@ -22,12 +22,7 @@ export const disableStreamsRoute = createServerRoute({
requiredPrivileges: ['streams_write'],
},
},
handler: async ({
request,
response,
logger,
getScopedClients,
}): Promise<{ acknowledged: true }> => {
handler: async ({ request, logger, getScopedClients }): Promise<{ acknowledged: true }> => {
try {
const { scopedClusterClient } = await getScopedClients({ request });

View file

@ -9,6 +9,14 @@ import { z } from '@kbn/zod';
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import { badRequest, internal, notFound } from '@hapi/boom';
import {
isWiredStream,
isWiredStreamConfig,
streamConfigDefinitionSchema,
StreamDefinition,
WiredStreamConfigDefinition,
WiredStreamDefinition,
} from '@kbn/streams-schema';
import {
DefinitionNotFound,
ForkConditionMissing,
@ -16,7 +24,6 @@ import {
SecurityException,
} from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
import { StreamDefinition, streamWithoutIdDefinitonSchema } from '../../../common/types';
import {
syncStream,
readStream,
@ -45,48 +52,63 @@ export const editStreamRoute = createServerRoute({
path: z.object({
id: z.string(),
}),
body: streamWithoutIdDefinitonSchema,
body: streamConfigDefinitionSchema,
}),
handler: async ({ response, params, logger, request, getScopedClients }) => {
handler: async ({ params, logger, request, getScopedClients }) => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
const streamDefinition = { ...params.body, id: params.path.id };
const streamDefinition: StreamDefinition = { stream: params.body, name: params.path.id };
if (!streamDefinition.managed) {
if (!isWiredStream(streamDefinition)) {
await syncStream({
scopedClusterClient,
definition: { ...streamDefinition, id: params.path.id },
definition: streamDefinition,
rootDefinition: undefined,
logger,
});
return { acknowledged: true };
}
await validateStreamChildren(scopedClusterClient, params.path.id, params.body.children);
await validateAncestorFields(scopedClusterClient, params.path.id, params.body.fields);
await validateDescendantFields(scopedClusterClient, params.path.id, params.body.fields);
await validateStreamChildren(scopedClusterClient, params.path.id, params.body.ingest.routing);
if (isWiredStreamConfig(params.body)) {
await validateAncestorFields(
scopedClusterClient,
params.path.id,
params.body.ingest.wired.fields
);
await validateDescendantFields(
scopedClusterClient,
params.path.id,
params.body.ingest.wired.fields
);
}
const parentId = getParentId(params.path.id);
let parentDefinition: StreamDefinition | undefined;
let parentDefinition: WiredStreamDefinition | undefined;
// always need to go from the leaves to the parent when syncing ingest pipelines, otherwise data
// will be routed before the data stream is ready
for (const child of streamDefinition.children) {
for (const child of streamDefinition.stream.ingest.routing) {
const streamExists = await checkStreamExists({
scopedClusterClient,
id: child.id,
id: child.name,
});
if (streamExists) {
continue;
}
// create empty streams for each child if they don't exist
const childDefinition = {
id: child.id,
children: [],
fields: [],
processing: [],
managed: true,
const childDefinition: WiredStreamDefinition = {
name: child.name,
stream: {
ingest: {
processing: [],
routing: [],
wired: {
fields: {},
},
},
},
};
await syncStream({
@ -98,7 +120,7 @@ export const editStreamRoute = createServerRoute({
await syncStream({
scopedClusterClient,
definition: { ...streamDefinition, id: params.path.id, managed: true },
definition: { ...streamDefinition, name: params.path.id },
rootDefinition: parentDefinition,
logger,
});
@ -137,15 +159,15 @@ async function updateParentStream(
id: string,
logger: Logger
) {
const { definition: parentDefinition } = await readStream({
const parentDefinition = await readStream({
scopedClusterClient,
id: parentId,
});
if (!parentDefinition.children.some((child) => child.id === id)) {
if (!parentDefinition.stream.ingest.routing.some((child) => child.name === id)) {
// add the child to the parent stream with an empty condition for now
parentDefinition.children.push({
id,
parentDefinition.stream.ingest.routing.push({
name: id,
condition: undefined,
});
@ -155,21 +177,21 @@ async function updateParentStream(
logger,
});
}
return parentDefinition;
return parentDefinition as WiredStreamDefinition;
}
async function validateStreamChildren(
scopedClusterClient: IScopedClusterClient,
id: string,
children: StreamDefinition['children']
children: WiredStreamConfigDefinition['ingest']['routing']
) {
try {
const { definition: oldDefinition } = await readStream({
const oldDefinition = await readStream({
scopedClusterClient,
id,
});
const oldChildren = oldDefinition.children.map((child) => child.id);
const newChildren = new Set(children.map((child) => child.id));
const oldChildren = oldDefinition.stream.ingest.routing.map((child) => child.name);
const newChildren = new Set(children.map((child) => child.name));
children.forEach((child) => {
validateCondition(child.condition);
});

View file

@ -28,7 +28,6 @@ export const enableStreamsRoute = createServerRoute({
},
handler: async ({
request,
response,
logger,
getScopedClients,
}): Promise<{ acknowledged: true; message: string }> => {

View file

@ -7,6 +7,7 @@
import { z } from '@kbn/zod';
import { badRequest, internal, notFound } from '@hapi/boom';
import { conditionSchema, isWiredStream, WiredStreamDefinition } from '@kbn/streams-schema';
import {
DefinitionNotFound,
ForkConditionMissing,
@ -14,7 +15,6 @@ import {
SecurityException,
} from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
import { conditionSchema, streamDefinitonWithoutChildrenSchema } from '../../../common/types';
import { syncStream, readStream, validateAncestorFields } from '../../lib/streams/stream_crud';
import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id';
import { isChildOf } from '../../lib/streams/helpers/hierarchy';
@ -36,7 +36,7 @@ export const forkStreamsRoute = createServerRoute({
path: z.object({
id: z.string(),
}),
body: z.object({ stream: streamDefinitonWithoutChildrenSchema, condition: conditionSchema }),
body: z.object({ stream: z.object({ name: z.string() }), condition: conditionSchema }),
}),
handler: async ({
params,
@ -53,34 +53,39 @@ export const forkStreamsRoute = createServerRoute({
const { scopedClusterClient } = await getScopedClients({ request });
const { definition: rootDefinition } = await readStream({
const rootDefinition = await readStream({
scopedClusterClient,
id: params.path.id,
});
if (rootDefinition.managed === false) {
if (!isWiredStream(rootDefinition)) {
throw new MalformedStreamId('Cannot fork a stream that is not managed');
}
const childDefinition = { ...params.body.stream, children: [] };
const childDefinition: WiredStreamDefinition = {
...params.body.stream,
stream: { ingest: { processing: [], routing: [], wired: { fields: {} } } },
};
// check whether root stream has a child of the given name already
if (rootDefinition.children.some((child) => child.id === childDefinition.id)) {
if (
rootDefinition.stream.ingest.routing.some((child) => child.name === childDefinition.name)
) {
throw new MalformedStreamId(
`The stream with ID (${params.body.stream.id}) already exists as a child of the parent stream`
`The stream with ID (${params.body.stream.name}) already exists as a child of the parent stream`
);
}
if (!isChildOf(rootDefinition, childDefinition)) {
throw new MalformedStreamId(
`The ID (${params.body.stream.id}) from the new stream must start with the parent's id (${rootDefinition.id}), followed by a dot and a name`
`The ID (${params.body.stream.name}) from the new stream must start with the parent's id (${rootDefinition.name}), followed by a dot and a name`
);
}
await validateAncestorFields(
scopedClusterClient,
params.body.stream.id,
params.body.stream.fields
childDefinition.name,
childDefinition.stream.ingest.wired.fields
);
// need to create the child first, otherwise we risk streaming data even though the child data stream is not ready
@ -91,8 +96,8 @@ export const forkStreamsRoute = createServerRoute({
logger,
});
rootDefinition.children.push({
id: params.body.stream.id,
rootDefinition.stream.ingest.routing.push({
name: params.body.stream.name,
condition: params.body.condition,
});

View file

@ -7,10 +7,10 @@
import { z } from '@kbn/zod';
import { notFound, internal } from '@hapi/boom';
import { ListStreamsResponse } from '@kbn/streams-schema';
import { createServerRoute } from '../create_server_route';
import { DefinitionNotFound } from '../../lib/streams/errors';
import { listStreams } from '../../lib/streams/stream_crud';
import { StreamDefinition } from '../../../common';
export const listStreamsRoute = createServerRoute({
endpoint: 'GET /api/streams',
@ -25,18 +25,10 @@ export const listStreamsRoute = createServerRoute({
},
},
params: z.object({}),
handler: async ({
response,
request,
getScopedClients,
}): Promise<{ definitions: StreamDefinition[] }> => {
handler: async ({ request, getScopedClients }): Promise<ListStreamsResponse> => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
const { definitions } = await listStreams({ scopedClusterClient });
return {
definitions,
};
return listStreams({ scopedClusterClient });
} catch (e) {
if (e instanceof DefinitionNotFound) {
throw notFound(e);

View file

@ -7,7 +7,12 @@
import { z } from '@kbn/zod';
import { notFound, internal } from '@hapi/boom';
import { ReadStreamDefinition } from '../../../common/types';
import {
FieldDefinitionConfig,
isIngestStream,
isWiredStream,
ReadStreamDefinition,
} from '@kbn/streams-schema';
import { createServerRoute } from '../create_server_route';
import { DefinitionNotFound } from '../../lib/streams/errors';
import { readAncestors, readStream } from '../../lib/streams/stream_crud';
@ -27,13 +32,7 @@ export const readStreamRoute = createServerRoute({
params: z.object({
path: z.object({ id: z.string() }),
}),
handler: async ({
response,
params,
request,
logger,
getScopedClients,
}): Promise<ReadStreamDefinition> => {
handler: async ({ params, request, getScopedClients }): Promise<ReadStreamDefinition> => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
const streamEntity = await readStream({
@ -41,23 +40,29 @@ export const readStreamRoute = createServerRoute({
id: params.path.id,
});
if (streamEntity.definition.managed === false) {
// TODO: I have no idea why I can just do `isIngestStream` here but when I do,
// streamEntity becomes `streamEntity: never` in the statements afterwards
if (!isWiredStream(streamEntity) && isIngestStream(streamEntity)) {
return {
...streamEntity.definition,
inheritedFields: [],
...streamEntity,
inherited_fields: {},
};
}
const { ancestors } = await readAncestors({
id: streamEntity.definition.id,
id: streamEntity.name,
scopedClusterClient,
});
const body = {
...streamEntity.definition,
inheritedFields: ancestors.flatMap(({ definition: { id, fields } }) =>
fields.map((field) => ({ ...field, from: id }))
),
...streamEntity,
inherited_fields: ancestors.reduce((acc, def) => {
Object.entries(def.stream.ingest.wired.fields).forEach(([key, fieldDef]) => {
acc[key] = { ...fieldDef, from: def.name };
});
return acc;
// TODO: replace this with a proper type
}, {} as Record<string, FieldDefinitionConfig & { from: string }>),
};
return body;

View file

@ -22,20 +22,15 @@ export const resyncStreamsRoute = createServerRoute({
},
},
params: z.object({}),
handler: async ({
response,
logger,
request,
getScopedClients,
}): Promise<{ acknowledged: true }> => {
handler: async ({ logger, request, getScopedClients }): Promise<{ acknowledged: true }> => {
const { scopedClusterClient } = await getScopedClients({ request });
const { definitions: streams } = await listStreams({ scopedClusterClient });
const { streams } = await listStreams({ scopedClusterClient });
for (const stream of streams) {
const { definition } = await readStream({
const definition = await readStream({
scopedClusterClient,
id: stream.id,
id: stream.name,
});
await syncStream({

View file

@ -7,7 +7,7 @@
import { z } from '@kbn/zod';
import { notFound, internal } from '@hapi/boom';
import { conditionSchema } from '../../../common/types';
import { conditionSchema } from '@kbn/streams-schema';
import { createServerRoute } from '../create_server_route';
import { DefinitionNotFound } from '../../lib/streams/errors';
import { checkReadAccess } from '../../lib/streams/stream_crud';
@ -35,13 +35,7 @@ export const sampleStreamRoute = createServerRoute({
number: z.optional(z.number()),
}),
}),
handler: async ({
response,
params,
request,
logger,
getScopedClients,
}): Promise<{ documents: unknown[] }> => {
handler: async ({ params, request, getScopedClients }): Promise<{ documents: unknown[] }> => {
try {
const { scopedClusterClient } = await getScopedClients({ request });

View file

@ -8,7 +8,7 @@
import { z } from '@kbn/zod';
import { notFound, internal } from '@hapi/boom';
import { getFlattenedObject } from '@kbn/std';
import { fieldDefinitionSchema } from '../../../../common/types';
import { fieldDefinitionConfigSchema } from '@kbn/streams-schema';
import { createServerRoute } from '../../create_server_route';
import { DefinitionNotFound } from '../../../lib/streams/errors';
import { checkReadAccess } from '../../../lib/streams/stream_crud';
@ -30,14 +30,12 @@ export const schemaFieldsSimulationRoute = createServerRoute({
params: z.object({
path: z.object({ id: z.string() }),
body: z.object({
field_definitions: z.array(fieldDefinitionSchema),
field_definitions: z.array(fieldDefinitionConfigSchema.extend({ name: z.string() })),
}),
}),
handler: async ({
response,
params,
request,
logger,
getScopedClients,
}): Promise<{
status: 'unknown' | 'success' | 'failure';

View file

@ -8,6 +8,7 @@
import { z } from '@kbn/zod';
import { internal, notFound } from '@hapi/boom';
import { getFlattenedObject } from '@kbn/std';
import { isWiredStream } from '@kbn/streams-schema';
import { DefinitionNotFound } from '../../../lib/streams/errors';
import { checkReadAccess, readAncestors, readStream } from '../../../lib/streams/stream_crud';
import { createServerRoute } from '../../create_server_route';
@ -29,13 +30,7 @@ export const unmappedFieldsRoute = createServerRoute({
params: z.object({
path: z.object({ id: z.string() }),
}),
handler: async ({
response,
params,
request,
logger,
getScopedClients,
}): Promise<{ unmappedFields: string[] }> => {
handler: async ({ params, request, getScopedClients }): Promise<{ unmappedFields: string[] }> => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
@ -76,7 +71,11 @@ export const unmappedFieldsRoute = createServerRoute({
// Mapped fields from the stream's definition and inherited from ancestors
const mappedFields = new Set<string>();
streamEntity.definition.fields.forEach((field) => mappedFields.add(field.name));
if (isWiredStream(streamEntity)) {
Object.keys(streamEntity.stream.ingest.wired.fields).forEach((name) =>
mappedFields.add(name)
);
}
const { ancestors } = await readAncestors({
id: params.path.id,
@ -84,7 +83,7 @@ export const unmappedFieldsRoute = createServerRoute({
});
for (const ancestor of ancestors) {
ancestor.definition.fields.forEach((field) => mappedFields.add(field.name));
Object.keys(ancestor.stream.ingest.wired.fields).forEach((name) => mappedFields.add(name));
}
const unmappedFields = Array.from(sourceFields)

View file

@ -31,6 +31,7 @@
"@kbn/observability-utils-server",
"@kbn/observability-utils-common",
"@kbn/std",
"@kbn/safer-lodash-set"
"@kbn/safer-lodash-set",
"@kbn/streams-schema"
]
}

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import type { StreamDefinition } from '@kbn/streams-plugin/common';
import type { StreamDefinition } from '@kbn/streams-schema';
interface EntityBase {
type: string;

View file

@ -21,7 +21,7 @@ import {
Condition,
FilterCondition,
OrCondition,
} from '@kbn/streams-plugin/common/types';
} from '@kbn/streams-schema';
import React, { useEffect } from 'react';
import { i18n } from '@kbn/i18n';
import { css } from '@emotion/css';

View file

@ -8,7 +8,7 @@ import { EuiFlexGroup, EuiFlexItem, EuiIcon, EuiLink, EuiPanel, EuiBadge } from
import { i18n } from '@kbn/i18n';
import React from 'react';
import { css } from '@emotion/css';
import { StreamDefinition } from '@kbn/streams-plugin/common';
import { isIngestStream, StreamDefinition } from '@kbn/streams-schema';
import { useStreamsAppBreadcrumbs } from '../../hooks/use_streams_app_breadcrumbs';
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { EntityOverviewTabList } from '../entity_overview_tab_list';
@ -101,7 +101,7 @@ export function EntityDetailViewWithoutParams({
title={
<>
{entity.displayName}
{definition && !definition.managed ? (
{definition && isIngestStream(definition) ? (
<>
{' '}
<EuiBadge>

View file

@ -4,7 +4,7 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StreamDefinition } from '@kbn/streams-plugin/common';
import { StreamDefinition } from '@kbn/streams-schema';
import React from 'react';
export function StreamDetailEnriching({

View file

@ -6,7 +6,7 @@
*/
import React from 'react';
import { i18n } from '@kbn/i18n';
import { ReadStreamDefinition, StreamDefinition } from '@kbn/streams-plugin/common';
import { ReadStreamDefinition, StreamDefinition } from '@kbn/streams-schema';
import { EuiFlexGroup, EuiListGroup, EuiText } from '@elastic/eui';
import { useStreamsAppParams } from '../../hooks/use_streams_app_params';
import { RedirectTo } from '../redirect_to';
@ -66,7 +66,7 @@ function UnmanagedStreamOverview({ definition }: { definition: StreamDefinition
http: { basePath },
},
} = useKibana();
const groupedAssets = (definition.unmanaged_elasticsearch_assets ?? []).reduce((acc, asset) => {
const groupedAssets = (definition.elasticsearch_assets ?? []).reduce((acc, asset) => {
const title = assetToTitle(asset);
if (title) {
acc[title] = acc[title] ?? [];

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import React from 'react';
import { ReadStreamDefinition } from '@kbn/streams-plugin/common';
import { isWiredReadStream, ReadStreamDefinition } from '@kbn/streams-schema';
import { WiredStreamDetailManagement } from './wired';
import { ClassicStreamDetailManagement } from './classic';
@ -22,7 +22,7 @@ export function StreamDetailManagement({
return null;
}
if (definition.managed) {
if (isWiredReadStream(definition)) {
return (
<WiredStreamDetailManagement
definition={definition}

View file

@ -6,7 +6,7 @@
*/
import React from 'react';
import { i18n } from '@kbn/i18n';
import { ReadStreamDefinition } from '@kbn/streams-plugin/common';
import { WiredReadStreamDefinition } from '@kbn/streams-schema';
import { useStreamsAppParams } from '../../hooks/use_streams_app_params';
import { RedirectTo } from '../redirect_to';
import { StreamDetailRouting } from '../stream_detail_routing';
@ -25,7 +25,7 @@ export function WiredStreamDetailManagement({
refreshDefinition,
isLoadingDefinition,
}: {
definition?: ReadStreamDefinition;
definition?: WiredReadStreamDefinition;
refreshDefinition: () => void;
isLoadingDefinition: boolean;
}) {

View file

@ -10,7 +10,7 @@ import { i18n } from '@kbn/i18n';
import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range';
import moment from 'moment';
import React, { useMemo } from 'react';
import { ReadStreamDefinition } from '@kbn/streams-plugin/common';
import { ReadStreamDefinition } from '@kbn/streams-schema';
import { useKibana } from '../../hooks/use_kibana';
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';
import { ControlledEsqlChart } from '../esql_chart/controlled_esql_chart';
@ -35,18 +35,18 @@ export function StreamDetailOverview({ definition }: { definition?: ReadStreamDe
} = useDateRange({ data });
const indexPatterns = useMemo(() => {
if (!definition?.id) {
if (!definition?.name) {
return undefined;
}
const isRoot = definition.id.indexOf('.') === -1;
const isRoot = definition.name.indexOf('.') === -1;
const dataStreamOfDefinition = definition.id;
const dataStreamOfDefinition = definition.name;
return isRoot
? [dataStreamOfDefinition, `${dataStreamOfDefinition}.*`]
: [`${dataStreamOfDefinition}*`];
}, [definition?.id]);
}, [definition?.name]);
const discoverLocator = useMemo(
() => share.url.locators.get('DISCOVER_APP_LOCATOR'),

View file

@ -25,9 +25,12 @@ import { css } from '@emotion/css';
import { i18n } from '@kbn/i18n';
import { useAbortController } from '@kbn/observability-utils-browser/hooks/use_abort_controller';
import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range';
import { ReadStreamDefinition } from '@kbn/streams-plugin/common';
import React from 'react';
import { StreamChild } from '@kbn/streams-plugin/common/types';
import {
StreamChild,
ReadStreamDefinition,
WiredStreamConfigDefinition,
} from '@kbn/streams-schema';
import { AbortableAsyncState } from '@kbn/observability-utils-browser/hooks/use_abortable_async';
import { useKibana } from '../../hooks/use_kibana';
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';
@ -89,7 +92,7 @@ export function StreamDetailRouting({
closeModal={closeModal}
clearChildUnderEdit={() => routingAppState.setChildUnderEdit(undefined)}
refreshDefinition={refreshDefinition}
id={routingAppState.childUnderEdit.child.id}
id={routingAppState.childUnderEdit.child.name}
/>
)}
<EuiFlexGroup
@ -197,14 +200,12 @@ function ControlBar({
signal,
params: {
path: {
id: definition.id,
id: definition.name,
},
body: {
condition: routingAppState.childUnderEdit.child.condition,
stream: {
id: routingAppState.childUnderEdit.child.id,
processing: [],
fields: [],
name: routingAppState.childUnderEdit.child.name,
},
},
},
@ -217,19 +218,22 @@ function ControlBar({
}
const childUnderEdit = routingAppState.childUnderEdit.child;
const { inheritedFields, id, ...definitionToUpdate } = definition;
const { name, stream } = definition;
return streamsRepositoryClient.fetch('PUT /api/streams/{id}', {
signal,
params: {
path: {
id: definition.id,
id: name,
},
body: {
...definitionToUpdate,
children: definition.children.map((child) =>
child.id === childUnderEdit.id ? childUnderEdit : child
),
},
...stream,
ingest: {
...stream.ingest,
routing: definition.stream.ingest.routing.map((child) =>
child.name === childUnderEdit.name ? childUnderEdit : child
),
},
} as WiredStreamConfigDefinition,
},
});
}
@ -350,7 +354,7 @@ function PreviewPanel({
signal,
params: {
path: {
id: definition.id,
id: definition.name,
},
body: {
condition: routingAppState.debouncedChildUnderEdit.child.condition,
@ -550,17 +554,17 @@ function ChildStreamList({
>
<PreviousStreamEntry definition={definition} />
<CurrentStreamEntry definition={definition} />
{definition.children.map((child, i) => (
{definition.stream.ingest.routing.map((child, i) => (
<NestedView key={i}>
<RoutingStreamEntry
child={
!childUnderEdit?.isNew && child.id === childUnderEdit?.child.id
!childUnderEdit?.isNew && child.name === childUnderEdit?.child.name
? childUnderEdit.child
: child
}
edit={!childUnderEdit?.isNew && child.id === childUnderEdit?.child.id}
edit={!childUnderEdit?.isNew && child.name === childUnderEdit?.child.name}
onEditStateChange={() => {
if (child.id === childUnderEdit?.child.id) {
if (child.name === childUnderEdit?.child.name) {
setChildUnderEdit(undefined);
} else {
setChildUnderEdit({ isNew: false, child });
@ -601,7 +605,7 @@ function ChildStreamList({
setChildUnderEdit({
isNew: true,
child: {
id: `${definition.id}.child`,
name: `${definition.name}.child`,
condition: {
field: '',
operator: 'eq',
@ -627,7 +631,7 @@ function CurrentStreamEntry({ definition }: { definition: ReadStreamDefinition }
return (
<EuiFlexItem grow={false}>
<EuiPanel hasShadow={false} hasBorder paddingSize="s">
<EuiText size="s">{definition.id}</EuiText>
<EuiText size="s">{definition.name}</EuiText>
<EuiText size="xs" color="subdued">
{i18n.translate('xpack.streams.streamDetailRouting.currentStream', {
defaultMessage: 'Current stream',
@ -641,7 +645,7 @@ function CurrentStreamEntry({ definition }: { definition: ReadStreamDefinition }
function PreviousStreamEntry({ definition }: { definition: ReadStreamDefinition }) {
const router = useStreamsAppRouter();
const parentId = definition.id.split('.').slice(0, -1).join('.');
const parentId = definition.name.split('.').slice(0, -1).join('.');
if (parentId === '') {
return null;
}
@ -686,7 +690,7 @@ function RoutingStreamEntry({
<EuiPanel hasShadow={false} hasBorder paddingSize="s">
<EuiFlexGroup gutterSize="xs" alignItems="center">
<EuiFlexItem grow>
<EuiText size="s">{child.id}</EuiText>
<EuiText size="s">{child.name}</EuiText>
</EuiFlexItem>
<EuiButtonIcon
data-test-subj="streamsAppRoutingStreamEntryButton"
@ -702,7 +706,7 @@ function RoutingStreamEntry({
data-test-subj="streamsAppRoutingStreamEntryButton"
iconType="popout"
href={router.link('/{key}/{tab}/{subtab}', {
path: { key: child.id, tab: 'management', subtab: 'route' },
path: { key: child.name, tab: 'management', subtab: 'route' },
})}
aria-label={i18n.translate('xpack.streams.streamDetailRouting.goto', {
defaultMessage: 'Go to stream',
@ -750,13 +754,13 @@ function NewRoutingStreamEntry({
>
<EuiFieldText
data-test-subj="streamsAppRoutingStreamEntryNameField"
value={child.id}
value={child.name}
fullWidth
compressed
onChange={(e) => {
onChildChange({
...child,
id: e.target.value,
name: e.target.value,
});
}}
/>

View file

@ -7,8 +7,8 @@
import { EuiFlexGroup, EuiFlexItem, EuiToken } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { FieldDefinition } from '@kbn/streams-plugin/common/types';
import React from 'react';
import { FieldDefinitionConfig } from '@kbn/streams-schema';
export const FIELD_TYPE_MAP = {
boolean: {
@ -55,7 +55,7 @@ export const FIELD_TYPE_MAP = {
},
};
export const FieldType = ({ type }: { type: FieldDefinition['type'] }) => {
export const FieldType = ({ type }: { type: FieldDefinitionConfig['type'] }) => {
return (
<EuiFlexGroup alignItems="center" gutterSize="s">
<EuiFlexItem grow={false}>

View file

@ -22,7 +22,7 @@ import type {
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import useToggle from 'react-use/lib/useToggle';
import { ReadStreamDefinition } from '@kbn/streams-plugin/common/types';
import { isWiredStream, ReadStreamDefinition } from '@kbn/streams-schema';
import { FieldType } from './field_type';
import { FieldStatus } from './field_status';
import { FieldEntry, SchemaEditorEditingState } from './hooks/use_editing_state';
@ -71,14 +71,13 @@ export const EMPTY_CONTENT = '-----';
export const FieldsTableContainer = ({
definition,
unmappedFieldsResult,
isLoadingUnmappedFields,
query,
editingState,
unpromotingState,
}: FieldsTableContainerProps) => {
const inheritedFields = useMemo(() => {
return definition.inheritedFields.map((field) => ({
name: field.name,
return Object.entries(definition.inherited_fields).map(([name, field]) => ({
name,
type: field.type,
format: field.format,
parent: field.from,
@ -94,13 +93,16 @@ export const FieldsTableContainer = ({
}, [inheritedFields, query]);
const mappedFields = useMemo(() => {
return definition.fields.map((field) => ({
name: field.name,
type: field.type,
format: field.format,
parent: definition.id,
status: 'mapped' as const,
}));
if (isWiredStream(definition)) {
return Object.entries(definition.stream.ingest.wired.fields).map(([name, field]) => ({
name,
type: field.type,
format: field.format,
parent: definition.name,
status: 'mapped' as const,
}));
}
return [];
}, [definition]);
const filteredMappedFields = useMemo(() => {
@ -114,11 +116,11 @@ export const FieldsTableContainer = ({
return unmappedFieldsResult
? unmappedFieldsResult.map((field) => ({
name: field,
parent: definition.id,
parent: definition.name,
status: 'unmapped' as const,
}))
: [];
}, [definition.id, unmappedFieldsResult]);
}, [definition.name, unmappedFieldsResult]);
const filteredUnmappedFields = useMemo(() => {
if (!unmappedFieldsResult) return [];
@ -285,7 +287,9 @@ const FieldsTable = ({ definition, fields, editingState, unpromotingState }: Fie
if (!fieldType) return EMPTY_CONTENT;
return <FieldType type={fieldType} />;
} else if (columnId === 'parent') {
return <FieldParent parent={field.parent} linkEnabled={field.parent !== definition.id} />;
return (
<FieldParent parent={field.parent} linkEnabled={field.parent !== definition.name} />
);
} else if (columnId === 'status') {
return <FieldStatus status={field.status} />;
} else {

View file

@ -7,13 +7,13 @@
import React from 'react';
import { EuiCallOut } from '@elastic/eui';
import { StreamDefinition } from '@kbn/streams-plugin/common/types';
import { i18n } from '@kbn/i18n';
import { StreamConfigDefinition } from '@kbn/streams-schema';
export const ChildrenAffectedCallout = ({
childStreams,
}: {
childStreams: StreamDefinition['children'];
childStreams: StreamConfigDefinition['ingest']['routing'];
}) => {
return (
<EuiCallOut
@ -25,7 +25,7 @@ export const ChildrenAffectedCallout = ({
{i18n.translate('xpack.streams.childStreamsWarning.text', {
defaultMessage: "Editing this field will affect it's dependant streams: {affectedStreams} ",
values: {
affectedStreams: childStreams.map((stream) => stream.id).join(', '),
affectedStreams: childStreams.map((stream) => stream.name).join(', '),
},
})}
</EuiCallOut>

View file

@ -7,7 +7,7 @@
import { EuiFieldText } from '@elastic/eui';
import React from 'react';
import { FieldDefinition } from '@kbn/streams-plugin/common/types';
import { FieldDefinitionConfig } from '@kbn/streams-schema';
import { SchemaEditorEditingState } from '../hooks/use_editing_state';
type FieldFormFormatProps = Pick<
@ -15,7 +15,7 @@ type FieldFormFormatProps = Pick<
'nextFieldType' | 'nextFieldFormat' | 'setNextFieldFormat'
>;
export const typeSupportsFormat = (type?: FieldDefinition['type']) => {
export const typeSupportsFormat = (type?: FieldDefinitionConfig['type']) => {
if (!type) return false;
return ['date'].includes(type);
};

View file

@ -19,7 +19,7 @@ import {
} from '@elastic/eui';
import React from 'react';
import { i18n } from '@kbn/i18n';
import { ReadStreamDefinition } from '@kbn/streams-plugin/common';
import { ReadStreamDefinition } from '@kbn/streams-schema';
import { SchemaEditorEditingState } from '../hooks/use_editing_state';
import { ChildrenAffectedCallout } from './children_affected_callout';
import { SamplePreviewTable } from './sample_preview_table';
@ -57,9 +57,9 @@ export const SchemaEditorFlyout = (props: SchemaEditorFlyoutProps) => {
<EuiFlyoutBody>
<EuiFlexGroup direction="column">
<FieldSummary {...props} />
{isEditing && definition.children.length > 0 ? (
{isEditing && definition.stream.ingest.routing.length > 0 ? (
<EuiFlexItem grow={false}>
<ChildrenAffectedCallout childStreams={definition.children} />
<ChildrenAffectedCallout childStreams={definition.stream.ingest.routing} />
</EuiFlexItem>
) : null}
<EuiFlexItem grow={false}>

View file

@ -7,11 +7,10 @@
import React, { useMemo } from 'react';
import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api';
import { ReadStreamDefinition } from '@kbn/streams-plugin/common';
import { FieldDefinition } from '@kbn/streams-plugin/common/types';
import { css } from '@emotion/react';
import { i18n } from '@kbn/i18n';
import { EuiCallOut } from '@elastic/eui';
import { FieldDefinitionConfigWithName, ReadStreamDefinition } from '@kbn/streams-schema';
import { getFormattedError } from '../../../util/errors';
import { useStreamsAppFetch } from '../../../hooks/use_streams_app_fetch';
import { PreviewTable } from '../../preview_table';
@ -20,7 +19,7 @@ import { LoadingPanel } from '../../loading_panel';
interface SamplePreviewTableProps {
definition: ReadStreamDefinition;
nextFieldDefinition?: Partial<FieldDefinition>;
nextFieldDefinition?: Partial<FieldDefinitionConfigWithName>;
streamsRepositoryClient: StreamsRepositoryClient;
}
@ -39,14 +38,14 @@ const SamplePreviewTableContent = ({
definition,
nextFieldDefinition,
streamsRepositoryClient,
}: SamplePreviewTableProps & { nextFieldDefinition: FieldDefinition }) => {
}: SamplePreviewTableProps & { nextFieldDefinition: FieldDefinitionConfigWithName }) => {
const { value, loading, error } = useStreamsAppFetch(
({ signal }) => {
return streamsRepositoryClient.fetch('POST /api/streams/{id}/schema/fields_simulation', {
signal,
params: {
path: {
id: definition.id,
id: definition.name,
},
body: {
field_definitions: [nextFieldDefinition],
@ -54,7 +53,7 @@ const SamplePreviewTableContent = ({
},
});
},
[definition.id, nextFieldDefinition, streamsRepositoryClient],
[definition.name, nextFieldDefinition, streamsRepositoryClient],
{
disableToastOnError: true,
}

View file

@ -5,21 +5,26 @@
* 2.0.
*/
import { FieldDefinition, ReadStreamDefinition } from '@kbn/streams-plugin/common/types';
import {
ReadStreamDefinition,
FieldDefinitionConfigWithName,
isWiredReadStream,
} from '@kbn/streams-schema';
import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api';
import { useCallback, useMemo, useState } from 'react';
import useToggle from 'react-use/lib/useToggle';
import { useAbortController } from '@kbn/observability-utils-browser/hooks/use_abort_controller';
import { ToastsStart } from '@kbn/core-notifications-browser';
import { i18n } from '@kbn/i18n';
import { omit } from 'lodash';
import { FieldStatus } from '../field_status';
export type SchemaEditorEditingState = ReturnType<typeof useEditingState>;
export interface FieldEntry {
name: FieldDefinition['name'];
type?: FieldDefinition['type'];
format?: FieldDefinition['format'];
name: FieldDefinitionConfigWithName['name'];
type?: FieldDefinitionConfigWithName['type'];
format?: FieldDefinitionConfigWithName['format'];
parent: string;
status: FieldStatus;
}
@ -90,7 +95,8 @@ export const useEditingState = ({
const saveChanges = useMemo(() => {
return selectedField &&
isFullFieldDefinition(nextFieldDefinition) &&
hasChanges(selectedField, nextFieldDefinition)
hasChanges(selectedField, nextFieldDefinition) &&
isWiredReadStream(definition)
? async () => {
toggleIsSaving(true);
try {
@ -98,15 +104,22 @@ export const useEditingState = ({
signal: abortController.signal,
params: {
path: {
id: definition.id,
id: definition.name,
},
body: {
processing: definition.processing,
children: definition.children,
fields: [
...definition.fields.filter((field) => field.name !== nextFieldDefinition.name),
nextFieldDefinition,
],
ingest: {
...definition.stream.ingest,
wired: {
fields: {
...Object.fromEntries(
Object.entries(definition.stream.ingest.wired.fields).filter(
([name, _field]) => name !== nextFieldDefinition.name
)
),
[nextFieldDefinition.name]: omit(nextFieldDefinition, 'name'),
},
},
},
},
},
});
@ -133,10 +146,7 @@ export const useEditingState = ({
: undefined;
}, [
abortController.signal,
definition.children,
definition.fields,
definition.id,
definition.processing,
definition,
nextFieldDefinition,
refreshDefinition,
refreshUnmappedFields,
@ -165,14 +175,14 @@ export const useEditingState = ({
};
export const isFullFieldDefinition = (
value?: Partial<FieldDefinition>
): value is FieldDefinition => {
value?: Partial<FieldDefinitionConfigWithName>
): value is FieldDefinitionConfigWithName => {
return !!value && !!value.name && !!value.type;
};
const hasChanges = (
selectedField: Partial<FieldDefinition>,
nextFieldEntry: Partial<FieldDefinition>
selectedField: Partial<FieldDefinitionConfigWithName>,
nextFieldEntry: Partial<FieldDefinitionConfigWithName>
) => {
return (
selectedField.type !== nextFieldEntry.type || selectedField.format !== nextFieldEntry.format

View file

@ -11,7 +11,8 @@ import useToggle from 'react-use/lib/useToggle';
import { useAbortController } from '@kbn/observability-utils-browser/hooks/use_abort_controller';
import { ToastsStart } from '@kbn/core-notifications-browser';
import { i18n } from '@kbn/i18n';
import { ReadStreamDefinition } from '@kbn/streams-plugin/common';
import { WiredReadStreamDefinition } from '@kbn/streams-schema';
import { omit } from 'lodash';
export type SchemaEditorUnpromotingState = ReturnType<typeof useUnpromotingState>;
@ -23,7 +24,7 @@ export const useUnpromotingState = ({
toastsService,
}: {
streamsRepositoryClient: StreamsRepositoryClient;
definition: ReadStreamDefinition;
definition: WiredReadStreamDefinition;
refreshDefinition: () => void;
refreshUnmappedFields: () => void;
toastsService: ToastsStart;
@ -46,12 +47,15 @@ export const useUnpromotingState = ({
signal: abortController.signal,
params: {
path: {
id: definition.id,
id: definition.name,
},
body: {
processing: definition.processing,
children: definition.children,
fields: definition.fields.filter((field) => field.name !== selectedField),
ingest: {
...definition.stream.ingest,
wired: {
fields: omit(definition.stream.ingest.wired.fields, selectedField),
},
},
},
},
});
@ -77,10 +81,8 @@ export const useUnpromotingState = ({
}
}, [
abortController.signal,
definition.children,
definition.fields,
definition.id,
definition.processing,
definition.name,
definition.stream.ingest,
refreshDefinition,
refreshUnmappedFields,
selectedField,

View file

@ -13,8 +13,8 @@ import {
EuiPortal,
Query,
} from '@elastic/eui';
import { ReadStreamDefinition } from '@kbn/streams-plugin/common';
import { css } from '@emotion/css';
import { WiredReadStreamDefinition } from '@kbn/streams-schema';
import { useEditingState } from './hooks/use_editing_state';
import { SchemaEditorFlyout } from './flyout';
import { useKibana } from '../../hooks/use_kibana';
@ -25,7 +25,7 @@ import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';
import { FieldsTableContainer } from './fields_table';
interface SchemaEditorProps {
definition?: ReadStreamDefinition;
definition?: WiredReadStreamDefinition;
refreshDefinition: () => void;
isLoadingDefinition: boolean;
}
@ -63,12 +63,12 @@ const Content = ({
signal,
params: {
path: {
id: definition.id,
id: definition.name,
},
},
});
},
[definition.id, streamsRepositoryClient]
[definition.name, streamsRepositoryClient]
);
const editingState = useEditingState({
@ -92,7 +92,7 @@ const Content = ({
// If the definition changes (e.g. navigating to parent stream), reset the entire editing state.
useEffect(() => {
reset();
}, [definition.id, reset]);
}, [definition.name, reset]);
return (
<EuiFlexItem>

View file

@ -14,47 +14,47 @@ import {
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import type { AbortableAsyncState } from '@kbn/observability-utils-browser/hooks/use_abortable_async';
import { StreamDefinition } from '@kbn/streams-plugin/common';
import React, { useMemo } from 'react';
import { isWiredStreamConfig, StreamDefinition } from '@kbn/streams-schema';
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
export function StreamsTable({
listFetch,
query,
}: {
listFetch: AbortableAsyncState<{ definitions: StreamDefinition[] }>;
listFetch: AbortableAsyncState<{ streams: StreamDefinition[] }>;
query: string;
}) {
const router = useStreamsAppRouter();
const items = useMemo(() => {
return listFetch.value?.definitions ?? [];
}, [listFetch.value?.definitions]);
return listFetch.value?.streams ?? [];
}, [listFetch.value?.streams]);
const filteredItems = useMemo(() => {
if (!query) {
return items;
}
return items.filter((item) => item.id.toLowerCase().includes(query.toLowerCase()));
return items.filter((item) => item.name.toLowerCase().includes(query.toLowerCase()));
}, [query, items]);
const columns = useMemo<Array<EuiBasicTableColumn<StreamDefinition>>>(() => {
return [
{
field: 'id',
field: 'name',
name: i18n.translate('xpack.streams.streamsTable.nameColumnTitle', {
defaultMessage: 'Name',
}),
render: (_, { id, managed }) => {
render: (_, { name, stream }) => {
return (
<EuiFlexGroup direction="row" gutterSize="s" alignItems="center">
<EuiIcon type={managed ? 'branch' : 'bullseye'} />
<EuiIcon type={isWiredStreamConfig(stream) ? 'branch' : 'bullseye'} />
<EuiLink
data-test-subj="logsaiColumnsLink"
href={router.link('/{key}', { path: { key: id } })}
href={router.link('/{key}', { path: { key: name } })}
>
{id}
{name}
</EuiLink>
</EuiFlexGroup>
);

View file

@ -37,5 +37,6 @@
"@kbn/ui-theme",
"@kbn/navigation-plugin",
"@kbn/core-notifications-browser",
"@kbn/streams-schema",
]
}

View file

@ -42,54 +42,60 @@ export default function ({ getService }: FtrProviderContext) {
const response = await indexDocument(esClient, 'logs-test-default', doc);
expect(response.result).to.eql('created');
const streams = await listStreams(supertest);
const classicStream = streams.definitions.find(
(stream: JsonObject) => stream.id === 'logs-test-default'
const classicStream = streams.streams.find(
(stream: JsonObject) => stream.name === 'logs-test-default'
);
expect(classicStream).to.eql({
id: 'logs-test-default',
managed: false,
children: [],
fields: [],
processing: [],
name: 'logs-test-default',
stream: {
ingest: {
processing: [],
routing: [],
},
},
});
});
it('Allows setting processing on classic streams', async () => {
const response = await putStream(supertest, 'logs-test-default', {
managed: false,
children: [],
fields: [],
processing: [
{
config: {
type: 'grok',
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
ingest: {
processing: [
{
config: {
grok: {
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
},
},
},
},
],
],
routing: [],
},
});
expect(response).to.have.property('acknowledged', true);
const streamBody = await getStream(supertest, 'logs-test-default');
expect(streamBody).to.eql({
id: 'logs-test-default',
managed: false,
children: [],
inheritedFields: [],
fields: [],
processing: [
{
config: {
type: 'grok',
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
},
name: 'logs-test-default',
inherited_fields: {},
stream: {
ingest: {
processing: [
{
config: {
grok: {
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
},
},
},
],
routing: [],
},
],
},
});
});
@ -121,10 +127,10 @@ export default function ({ getService }: FtrProviderContext) {
it('Allows removing processing on classic streams', async () => {
const response = await putStream(supertest, 'logs-test-default', {
managed: false,
children: [],
fields: [],
processing: [],
ingest: {
processing: [],
routing: [],
},
});
expect(response).to.have.property('acknowledged', true);
});
@ -154,8 +160,8 @@ export default function ({ getService }: FtrProviderContext) {
it('Allows deleting classic streams', async () => {
await deleteStream(supertest, 'logs-test-default');
const streams = await listStreams(supertest);
const classicStream = streams.definitions.find(
(stream: JsonObject) => stream.id === 'logs-test-default'
const classicStream = streams.streams.find(
(stream: JsonObject) => stream.name === 'logs-test-default'
);
expect(classicStream).to.eql(undefined);
});

View file

@ -6,8 +6,8 @@
*/
import expect from '@kbn/expect';
import { JsonObject } from '@kbn/utility-types';
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import { WiredStreamConfigDefinition } from '@kbn/streams-schema';
import { enableStreams, fetchDocument, indexDocument, putStream } from './helpers/requests';
import { FtrProviderContext } from '../../ftr_provider_context';
import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers';
@ -29,53 +29,54 @@ export default function ({ getService }: FtrProviderContext) {
});
it('Place processing steps', async () => {
const body = {
fields: [
{
name: '@timestamp',
type: 'date',
},
{
name: 'message',
type: 'match_only_text',
},
{
name: 'message2',
type: 'match_only_text',
},
{
name: 'host.name',
type: 'keyword',
},
{
name: 'log.level',
type: 'keyword',
},
],
processing: [
{
config: {
type: 'grok',
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
const body: WiredStreamConfigDefinition = {
ingest: {
processing: [
{
config: {
grok: {
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
},
},
},
} as JsonObject,
{
config: {
type: 'dissect',
field: 'message2',
pattern: '%{log.logger} %{message3}',
{
config: {
dissect: {
field: 'message2',
pattern: '%{log.logger} %{message3}',
},
},
condition: {
field: 'log.level',
operator: 'eq',
value: 'info',
},
},
condition: {
field: 'log.level',
operator: 'eq',
value: 'info',
],
routing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
message2: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
},
},
} as JsonObject,
],
children: [],
},
},
};
const response = await putStream(supertest, 'logs', body);
expect(response).to.have.property('acknowledged', true);

View file

@ -7,84 +7,99 @@
import expect from '@kbn/expect';
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { StreamDefinition } from '@kbn/streams-schema';
import { deleteStream, enableStreams, indexDocument } from './helpers/requests';
import { FtrProviderContext } from '../../ftr_provider_context';
import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers';
import { cleanUpRootStream } from './helpers/cleanup';
const streams = [
const streams: StreamDefinition[] = [
{
processing: [],
fields: [
{
name: '@timestamp',
type: 'date',
},
{
name: 'message',
type: 'match_only_text',
},
{
name: 'host.name',
type: 'keyword',
},
{
name: 'log.level',
type: 'keyword',
},
],
children: [
{
id: 'logs.test',
condition: {
and: [
{
field: 'numberfield',
operator: 'gt',
value: 15,
name: 'logs',
stream: {
ingest: {
processing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
],
},
},
{
id: 'logs.test2',
condition: {
and: [
{
field: 'field2',
operator: 'eq',
value: 'abc',
message: {
type: 'match_only_text',
},
],
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
},
},
},
routing: [
{
name: 'logs.test',
condition: {
and: [
{
field: 'numberfield',
operator: 'gt',
value: 15,
},
],
},
},
{
name: 'logs.test2',
condition: {
and: [
{
field: 'field2',
operator: 'eq',
value: 'abc',
},
],
},
},
],
},
],
id: 'logs',
},
},
{
id: 'logs.test',
processing: [],
fields: [],
children: [],
name: 'logs.test',
stream: {
ingest: {
processing: [],
wired: {
fields: {},
},
routing: [],
},
},
},
{
id: 'logs.test2',
processing: [
{
config: {
type: 'grok',
field: 'message',
patterns: ['%{NUMBER:numberfield}'],
name: 'logs.test2',
stream: {
ingest: {
processing: [
{
config: {
grok: {
field: 'message',
patterns: ['%{NUMBER:numberfield}'],
},
},
},
],
wired: {
fields: {
numberfield: {
type: 'long',
},
},
},
routing: [],
},
],
fields: [
{
name: 'numberfield',
type: 'long',
},
],
children: [],
},
},
];
@ -107,9 +122,9 @@ export default function ({ getService }: FtrProviderContext) {
});
it('PUTs all streams one by one without errors', async () => {
for (const { id: streamId, ...stream } of streams) {
for (const { name, stream } of streams) {
const response = await supertest
.put(`/api/streams/${streamId}`)
.put(`/api/streams/${name}`)
.set('kbn-xsrf', 'xxx')
.send(stream)
.expect(200);

View file

@ -60,9 +60,7 @@ export default function ({ getService }: FtrProviderContext) {
it('Fork logs to logs.nginx', async () => {
const body = {
stream: {
id: 'logs.nginx',
fields: [],
processing: [],
name: 'logs.nginx',
},
condition: {
field: 'log.logger',
@ -99,9 +97,7 @@ export default function ({ getService }: FtrProviderContext) {
it('Fork logs to logs.nginx.access', async () => {
const body = {
stream: {
id: 'logs.nginx.access',
fields: [],
processing: [],
name: 'logs.nginx.access',
},
condition: { field: 'log.level', operator: 'eq', value: 'info' },
};
@ -139,9 +135,7 @@ export default function ({ getService }: FtrProviderContext) {
it('Fork logs to logs.nginx.error with invalid condition', async () => {
const body = {
stream: {
id: 'logs.nginx.error',
fields: [],
processing: [],
name: 'logs.nginx.error',
},
condition: { field: 'log', operator: 'eq', value: 'error' },
};
@ -181,9 +175,7 @@ export default function ({ getService }: FtrProviderContext) {
it('Fork logs to logs.number-test', async () => {
const body = {
stream: {
id: 'logs.number-test',
fields: [],
processing: [],
name: 'logs.number-test',
},
condition: { field: 'code', operator: 'gte', value: '500' },
};
@ -224,9 +216,7 @@ export default function ({ getService }: FtrProviderContext) {
it('Fork logs to logs.string-test', async () => {
const body = {
stream: {
id: 'logs.string-test',
fields: [],
processing: [],
name: 'logs.string-test',
},
condition: {
or: [

View file

@ -9,6 +9,7 @@ import { JsonObject } from '@kbn/utility-types';
import { Agent } from 'supertest';
import expect from '@kbn/expect';
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { StreamConfigDefinition } from '@kbn/streams-schema';
export async function enableStreams(supertest: Agent) {
const req = supertest.post('/api/streams/_enable').set('kbn-xsrf', 'xxx');
@ -36,7 +37,7 @@ export async function forkStream(supertest: Agent, root: string, body: JsonObjec
return response.body;
}
export async function putStream(supertest: Agent, name: string, body: JsonObject) {
export async function putStream(supertest: Agent, name: string, body: StreamConfigDefinition) {
const req = supertest.put(`/api/streams/${name}`).set('kbn-xsrf', 'xxx');
const response = await req.send(body).expect(200);
return response.body;

View file

@ -189,6 +189,7 @@
"@kbn/sse-utils-server",
"@kbn/gen-ai-functional-testing",
"@kbn/integration-assistant-plugin",
"@kbn/core-elasticsearch-server"
"@kbn/core-elasticsearch-server",
"@kbn/streams-schema"
]
}

View file

@ -7573,6 +7573,10 @@
version "0.0.0"
uid ""
"@kbn/streams-schema@link:x-pack/packages/kbn-streams-schema":
version "0.0.0"
uid ""
"@kbn/synthetics-e2e@link:x-pack/solutions/observability/plugins/synthetics/e2e":
version "0.0.0"
uid ""