Refactor models (#206930)

Refactors models to make it more clear what our data model is internally
and what our API responses are. Also some small changes to make it more
elasticsearch-y:

- isSchema variants now are based on specific type narrowing instead of
from any > type, as the latter only gives runtime safety, but does not
add much in terms of type safety
- validation is now entirely encapsulated in the type, removed
additional checks such as `isCompleteCondition`
- the stored document puts all stream properties top level (currently
only `ingest`, instead of `stream.ingest`)
- `condition` is renamed to `if`, and required everywhere
- `always` and `never` conditions were added
- `grok` and `dissect` processors are now similar to ES, where the
condition is a part of the processor config
- `GET /api/streams/{id}` returns `{ stream: ..., dashboards: ..., ...
}` instead of `{ ingest: ...., dashboards: ..., ... }`
- `PUT /api/streams/{id}` now requires `dashboards`, and `stream` is a
top-level property
- `PUT /api/streams/{id}/_ingest` was added to allow consumers to only
update the stream, and not its assets
- there are some legacy definitions (in `legacy.ts`) to minimize the
amount of changes in the UI, this still needs to happen at some point
but not in this PR

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Dario Gieselaar 2025-01-22 16:03:08 +01:00 committed by GitHub
parent f4ff699a82
commit 8d4a70c5e5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
90 changed files with 1965 additions and 1437 deletions

View file

@ -17,3 +17,5 @@ export function isNonEmptyString(input: string, ctx: z.RefinementCtx): void {
});
}
}
export const NonEmptyString = z.string().min(1).superRefine(isNonEmptyString);

View file

@ -5,6 +5,5 @@
* 2.0.
*/
export * from './src/apis';
export * from './src/models';
export * from './src/helpers';

View file

@ -1,111 +0,0 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`ReadStreamResponse should successfully parse 1`] = `
Object {
"streams": Array [
Object {
"elasticsearch_assets": Array [],
"inherited_fields": Object {
"@timestamp": Object {
"from": "logs",
"type": "date",
},
"message": Object {
"from": "logs",
"type": "match_only_text",
},
},
"lifecycle": Object {
"type": "dlm",
},
"name": "logs.nginx",
"stream": Object {
"ingest": Object {
"processing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"config": Object {
"grok": Object {
"field": "message",
"patterns": Array [
"%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}",
],
},
},
},
],
"routing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"name": "logs.errors",
},
],
"wired": Object {
"fields": Object {
"new_field": Object {
"type": "long",
},
},
},
},
},
},
Object {
"elasticsearch_assets": Array [],
"inherited_fields": Object {
"@timestamp": Object {
"from": "logs",
"type": "date",
},
"message": Object {
"from": "logs",
"type": "match_only_text",
},
},
"lifecycle": Object {
"type": "dlm",
},
"name": "logs.nginx",
"stream": Object {
"ingest": Object {
"processing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"config": Object {
"grok": Object {
"field": "message",
"patterns": Array [
"%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}",
],
},
},
},
],
"routing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"name": "logs.errors",
},
],
},
},
},
],
}
`;

View file

@ -1,14 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { streamDefinitionSchema } from '../models';
export const listStreamsResponseSchema = z.object({
streams: z.array(streamDefinitionSchema),
});
export type ListStreamsResponse = z.infer<typeof listStreamsResponseSchema>;

View file

@ -1,15 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { readStreamResponse } from '../fixtures/read_streams_response';
import { readStreamResponseSchema } from './read_streams_response';
describe('ReadStreamResponse', () => {
it('should successfully parse', () => {
expect(readStreamResponseSchema.parse(readStreamResponse)).toMatchSnapshot();
});
});

View file

@ -1,15 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { readStreamDefinitonSchema } from '../models';
export const readStreamResponseSchema = z.object({
streams: z.array(readStreamDefinitonSchema),
});
export type ReadStreamResponse = z.infer<typeof readStreamResponseSchema>;

View file

@ -5,6 +5,5 @@
* 2.0.
*/
export * from './processing';
export * from './type_guards';
export * from './hierarchy';

View file

@ -1,38 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Condition, ProcessingDefinition } from '../models';
import {
isGrokProcessor,
isDissectProcessor,
isFilterCondition,
isAndCondition,
isOrCondition,
} from './type_guards';
export function getProcessorType(processor: ProcessingDefinition) {
if (isGrokProcessor(processor.config)) {
return 'grok';
}
if (isDissectProcessor(processor.config)) {
return 'dissect';
}
throw new Error('Unknown processor type');
}
export function isCompleteCondition(condition: Condition): boolean {
if (isFilterCondition(condition)) {
return condition.field !== undefined && condition.field !== '';
}
if (isAndCondition(condition)) {
return condition.and.every(isCompleteCondition);
}
if (isOrCondition(condition)) {
return condition.or.every(isCompleteCondition);
}
return false;
}

View file

@ -5,117 +5,41 @@
* 2.0.
*/
import { ZodSchema, custom, output } from '@kbn/zod';
import {
AndCondition,
conditionSchema,
dissectProcessingDefinitionSchema,
DissectProcessingDefinition,
FilterCondition,
filterConditionSchema,
GrokProcessingDefinition,
grokProcessingDefinitionSchema,
IngestReadStreamDefinition,
ingestReadStreamDefinitonSchema,
IngestStreamDefinition,
ingestStreamDefinitonSchema,
OrCondition,
ReadStreamDefinition,
readStreamDefinitonSchema,
StreamDefinition,
streamDefinitionSchema,
WiredReadStreamDefinition,
wiredReadStreamDefinitonSchema,
WiredStreamDefinition,
wiredStreamDefinitonSchema,
} from '../models';
import {
IngestStreamConfigDefinition,
ingestStreamConfigDefinitonSchema,
StreamConfigDefinition,
streamConfigDefinitionSchema,
WiredStreamConfigDefinition,
wiredStreamConfigDefinitonSchema,
} from '../models/stream_config';
import { ZodSchema, z } from '@kbn/zod';
export function isSchema<T>(zodSchema: ZodSchema, subject: T) {
try {
zodSchema.parse(subject);
return true;
} catch (e) {
return false;
}
export function createIsNarrowSchema<TBaseSchema extends z.Schema, TNarrowSchema extends z.Schema>(
base: TBaseSchema,
narrow: TNarrowSchema
) {
return <TValue extends z.input<TBaseSchema>>(
value: TValue
): value is Extract<TValue, z.input<TNarrowSchema>> => {
return isSchema(narrow, value);
};
}
export function createAsSchemaOrThrow<TBaseSchema extends z.Schema, TNarrowSchema extends z.Schema>(
base: TBaseSchema,
narrow: TNarrowSchema
) {
return <TValue extends z.input<TBaseSchema>>(
value: TValue
): Extract<TValue, z.input<TNarrowSchema>> => {
narrow.parse(value);
return value;
};
}
export function isSchema<TSchema extends z.Schema>(
schema: TSchema,
value: unknown
): value is z.input<TSchema> {
return schema.safeParse(value).success;
}
export function assertsSchema<TSchema extends ZodSchema>(
schema: TSchema,
subject: any
): asserts subject is output<TSchema> {
): asserts subject is z.input<TSchema> {
schema.parse(subject);
}
export function isReadStream(subject: any): subject is ReadStreamDefinition {
return isSchema(readStreamDefinitonSchema, subject);
}
export function isWiredReadStream(subject: any): subject is WiredReadStreamDefinition {
return isSchema(wiredReadStreamDefinitonSchema, subject);
}
export function isIngestReadStream(subject: any): subject is IngestReadStreamDefinition {
return isSchema(ingestReadStreamDefinitonSchema, subject);
}
export function isStream(subject: any): subject is StreamDefinition {
return isSchema(streamDefinitionSchema, subject);
}
export function isIngestStream(subject: StreamDefinition): subject is IngestStreamDefinition {
return isSchema(ingestStreamDefinitonSchema, subject);
}
export function isWiredStream(subject: StreamDefinition): subject is WiredStreamDefinition {
return isSchema(wiredStreamDefinitonSchema, subject);
}
const rootStreamSchema = custom<'RootStreamSchema'>((val) => {
return val?.name?.split('.').length === 1;
});
export function isRootStream(subject: any): subject is WiredStreamDefinition {
return (
(isWiredStream(subject) || isWiredReadStream(subject)) && isSchema(rootStreamSchema, subject)
);
}
export function isWiredStreamConfig(subject: any): subject is WiredStreamConfigDefinition {
return isSchema(wiredStreamConfigDefinitonSchema, subject);
}
export function isIngestStreamConfig(subject: any): subject is IngestStreamConfigDefinition {
return isSchema(ingestStreamConfigDefinitonSchema, subject);
}
export function isStreamConfig(subject: any): subject is StreamConfigDefinition {
return isSchema(streamConfigDefinitionSchema, subject);
}
export function isGrokProcessor(subject: any): subject is GrokProcessingDefinition {
return isSchema(grokProcessingDefinitionSchema, subject);
}
export function isDissectProcessor(subject: any): subject is DissectProcessingDefinition {
return isSchema(dissectProcessingDefinitionSchema, subject);
}
export function isFilterCondition(subject: any): subject is FilterCondition {
return isSchema(filterConditionSchema, subject);
}
export function isAndCondition(subject: any): subject is AndCondition {
return isSchema(conditionSchema, subject) && subject.and != null;
}
export function isOrCondition(subject: any): subject is OrCondition {
return isSchema(conditionSchema, subject) && subject.or != null;
}

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import {
ingestStreamUpsertRequestSchema,
type IngestStreamGetResponse,
type IngestStreamUpsertRequest,
} from './ingest';
export const streamUpsertRequestSchema: z.Schema<StreamUpsertRequest> =
ingestStreamUpsertRequestSchema;
export type StreamGetResponse = IngestStreamGetResponse;
export type StreamUpsertRequest = IngestStreamUpsertRequest;

View file

@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
export interface StreamGetResponseBase {
dashboards: string[];
}
export interface StreamUpsertRequestBase {
dashboards: string[];
}
export const streamUpsertRequestSchemaBase: z.Schema<StreamUpsertRequestBase> = z.object({
dashboards: z.array(NonEmptyString),
});
export const streamGetResponseSchemaBase: z.Schema<StreamGetResponseBase> = z.object({
dashboards: z.array(NonEmptyString),
});

View file

@ -5,5 +5,6 @@
* 2.0.
*/
export * from './read_streams_response';
export * from './list_streams_response';
export interface StreamDefinitionBase {
name: string;
}

View file

@ -1,146 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
const stringOrNumberOrBoolean = z.union([z.string(), z.number(), z.boolean()]);
export const binaryConditionSchema = z.object({
field: z.string(),
operator: z.enum(['eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'contains', 'startsWith', 'endsWith']),
value: stringOrNumberOrBoolean,
});
export const unaryFilterConditionSchema = z.object({
field: z.string(),
operator: z.enum(['exists', 'notExists']),
});
export const filterConditionSchema = z.discriminatedUnion('operator', [
unaryFilterConditionSchema,
binaryConditionSchema,
]);
export type FilterCondition = z.infer<typeof filterConditionSchema>;
export type BinaryFilterCondition = z.infer<typeof binaryConditionSchema>;
export type UnaryFilterCondition = z.infer<typeof unaryFilterConditionSchema>;
export interface AndCondition {
and: Condition[];
}
export interface OrCondition {
or: Condition[];
}
export type Condition = FilterCondition | AndCondition | OrCondition | undefined;
export const conditionSchema: z.ZodType<Condition> = z.lazy(() =>
z.union([
filterConditionSchema,
z.object({ and: z.array(conditionSchema) }),
z.object({ or: z.array(conditionSchema) }),
])
);
export const grokProcessingDefinitionSchema = z.object({
grok: z.object({
field: z.string(),
patterns: z.array(z.string()),
pattern_definitions: z.optional(z.record(z.string())),
ignore_failure: z.optional(z.boolean()),
ignore_missing: z.optional(z.boolean()),
}),
});
export type GrokProcessingDefinition = z.infer<typeof grokProcessingDefinitionSchema>;
export const dissectProcessingDefinitionSchema = z.object({
dissect: z.object({
field: z.string(),
pattern: z.string(),
append_separator: z.optional(z.string()),
ignore_failure: z.optional(z.boolean()),
ignore_missing: z.optional(z.boolean()),
}),
});
export type DissectProcessingDefinition = z.infer<typeof dissectProcessingDefinitionSchema>;
export const processingConfigSchema = z.union([
grokProcessingDefinitionSchema,
dissectProcessingDefinitionSchema,
]);
export const processingDefinitionSchema = z.object({
condition: z.optional(conditionSchema),
config: processingConfigSchema,
});
export type ProcessingDefinition = z.infer<typeof processingDefinitionSchema>;
export type ProcessorType = ProcessingDefinition['config'] extends infer U
? U extends { [key: string]: any }
? keyof U
: never
: never;
export const FIELD_DEFINITION_TYPES = [
'keyword',
'match_only_text',
'long',
'double',
'date',
'boolean',
'ip',
] as const;
export const fieldDefinitionConfigSchema = z.object({
type: z.enum(FIELD_DEFINITION_TYPES),
format: z.optional(z.string()),
});
export type FieldDefinitionConfig = z.infer<typeof fieldDefinitionConfigSchema>;
export const fieldDefinitionSchema = z.record(z.string(), fieldDefinitionConfigSchema);
export type FieldDefinition = z.infer<typeof fieldDefinitionSchema>;
export const inheritedFieldDefinitionSchema = z.record(
z.string(),
fieldDefinitionConfigSchema.extend({ from: z.string() })
);
export type InheritedFieldDefinition = z.infer<typeof inheritedFieldDefinitionSchema>;
export const fieldDefinitionConfigWithNameSchema = fieldDefinitionConfigSchema.extend({
name: z.string(),
});
export type FieldDefinitionConfigWithName = z.infer<typeof fieldDefinitionConfigWithNameSchema>;
export const streamChildSchema = z.object({
name: z.string(),
condition: z.optional(conditionSchema),
});
export type StreamChild = z.infer<typeof streamChildSchema>;
export const elasticsearchAssetSchema = z.array(
z.object({
type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']),
id: z.string(),
})
);
export type ElasticsearchAsset = z.infer<typeof elasticsearchAssetSchema>;
export const lifecycleSchema = z.discriminatedUnion('type', [
z.object({ type: z.literal('dlm'), data_retention: z.optional(z.string()) }),
z.object({ type: z.literal('ilm'), policy: z.string() }),
]);
export type StreamLifecycle = z.infer<typeof lifecycleSchema>;

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { createIsNarrowSchema } from '../helpers';
import { IngestStreamDefinition, ingestStreamDefinitionSchema } from './ingest';
export type StreamDefinition = IngestStreamDefinition;
export const streamDefinitionSchema: z.Schema<StreamDefinition> = ingestStreamDefinitionSchema;
export const isStreamDefinition = createIsNarrowSchema(z.unknown(), streamDefinitionSchema);

View file

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { createIsNarrowSchema } from '../helpers';
import { streamDefinitionSchema } from './core';
import {
ingestStreamDefinitionSchema,
unwiredStreamDefinitionSchema,
wiredStreamDefinitionSchema,
} from './ingest';
export const isIngestStreamDefinition = createIsNarrowSchema(
streamDefinitionSchema,
ingestStreamDefinitionSchema
);
export const isWiredStreamDefinition = createIsNarrowSchema(
streamDefinitionSchema,
wiredStreamDefinitionSchema
);
export const isUnwiredStreamDefinition = createIsNarrowSchema(
streamDefinitionSchema,
unwiredStreamDefinitionSchema
);
export const isRootStreamDefinition = createIsNarrowSchema(
streamDefinitionSchema,
wiredStreamDefinitionSchema.refine((stream) => {
return stream.name.split('.').length === 1;
})
);

View file

@ -5,7 +5,9 @@
* 2.0.
*/
export * from './common';
export * from './read_streams';
export * from './streams';
export * from './stream_config';
export * from './ingest';
export * from './legacy';
export * from './api';
export * from './core';
export * from './helpers';

View file

@ -0,0 +1,186 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { InheritedFieldDefinition, inheritedFieldDefinitionSchema } from './fields';
import {
StreamGetResponseBase,
StreamUpsertRequestBase,
streamGetResponseSchemaBase,
streamUpsertRequestSchemaBase,
} from '../base/api';
import {
UnwiredIngest,
UnwiredStreamDefinition,
WiredIngest,
WiredStreamDefinition,
unwiredIngestSchema,
unwiredStreamDefinitionSchemaBase,
wiredIngestSchema,
wiredStreamDefinitionSchemaBase,
} from './base';
import {
ElasticsearchAsset,
IngestStreamLifecycle,
elasticsearchAssetSchema,
ingestStreamLifecycleSchema,
} from './common';
import { createIsNarrowSchema, createAsSchemaOrThrow } from '../../helpers';
/**
* Ingest get response
*/
interface WiredIngestResponse {
ingest: WiredIngest;
}
interface UnwiredIngestResponse {
ingest: UnwiredIngest;
}
type IngestGetResponse = WiredIngestResponse | UnwiredIngestResponse;
interface WiredIngestUpsertRequest {
ingest: WiredIngest;
}
interface UnwiredIngestUpsertRequest {
ingest: UnwiredIngest;
}
type IngestUpsertRequest = WiredIngestUpsertRequest | UnwiredIngestUpsertRequest;
const wiredIngestUpsertRequestSchema: z.Schema<WiredIngestUpsertRequest> = z.object({
ingest: wiredIngestSchema,
});
const unwiredIngestUpsertRequestSchema: z.Schema<UnwiredIngestUpsertRequest> = z.object({
ingest: unwiredIngestSchema,
});
const ingestUpsertRequestSchema: z.Schema<IngestUpsertRequest> = z.union([
wiredIngestUpsertRequestSchema,
unwiredIngestUpsertRequestSchema,
]);
/**
* Stream get response
*/
interface IngestStreamGetResponseBase extends StreamGetResponseBase {
lifecycle: IngestStreamLifecycle;
}
interface WiredStreamGetResponse extends IngestStreamGetResponseBase {
stream: Omit<WiredStreamDefinition, 'name'>;
inherited_fields: InheritedFieldDefinition;
}
interface UnwiredStreamGetResponse extends IngestStreamGetResponseBase {
stream: Omit<UnwiredStreamDefinition, 'name'>;
elasticsearch_assets: ElasticsearchAsset[];
}
type IngestStreamGetResponse = WiredStreamGetResponse | UnwiredStreamGetResponse;
/**
* Ingest stream upsert request
*/
interface UnwiredStreamUpsertRequest extends StreamUpsertRequestBase {
stream: UnwiredIngestUpsertRequest;
}
interface WiredStreamUpsertRequest extends StreamUpsertRequestBase {
stream: WiredIngestUpsertRequest;
}
type IngestStreamUpsertRequest = WiredStreamUpsertRequest | UnwiredStreamUpsertRequest;
const unwiredStreamUpsertRequestSchema: z.Schema<UnwiredStreamUpsertRequest> = z.intersection(
streamUpsertRequestSchemaBase,
z.object({
stream: unwiredStreamDefinitionSchemaBase,
})
);
const wiredStreamUpsertRequestSchema: z.Schema<WiredStreamUpsertRequest> = z.intersection(
streamUpsertRequestSchemaBase,
z.object({
stream: wiredStreamDefinitionSchemaBase,
})
);
const ingestStreamUpsertRequestSchema: z.Schema<IngestStreamUpsertRequest> = z.union([
wiredStreamUpsertRequestSchema,
unwiredStreamUpsertRequestSchema,
]);
const ingestStreamGetResponseSchemaBase: z.Schema<IngestStreamGetResponseBase> = z.intersection(
streamGetResponseSchemaBase,
z.object({
lifecycle: ingestStreamLifecycleSchema,
})
);
const wiredStreamGetResponseSchema: z.Schema<WiredStreamGetResponse> = z.intersection(
ingestStreamGetResponseSchemaBase,
z.object({
stream: wiredStreamDefinitionSchemaBase,
inherited_fields: inheritedFieldDefinitionSchema,
})
);
const unwiredStreamGetResponseSchema: z.Schema<UnwiredStreamGetResponse> = z.intersection(
ingestStreamGetResponseSchemaBase,
z.object({
stream: unwiredStreamDefinitionSchemaBase,
elasticsearch_assets: z.array(elasticsearchAssetSchema),
})
);
const ingestStreamGetResponseSchema: z.Schema<IngestStreamGetResponse> = z.union([
wiredStreamGetResponseSchema,
unwiredStreamGetResponseSchema,
]);
const isWiredStreamGetResponse = createIsNarrowSchema(
ingestStreamGetResponseSchema,
wiredStreamGetResponseSchema
);
const isUnWiredStreamGetResponse = createIsNarrowSchema(
ingestStreamGetResponseSchema,
wiredStreamGetResponseSchema
);
const asWiredStreamGetResponse = createAsSchemaOrThrow(
ingestStreamGetResponseSchema,
wiredStreamGetResponseSchema
);
const asUnwiredStreamGetResponse = createAsSchemaOrThrow(
ingestStreamGetResponseSchema,
unwiredStreamGetResponseSchema
);
export {
ingestStreamUpsertRequestSchema,
ingestUpsertRequestSchema,
isWiredStreamGetResponse,
isUnWiredStreamGetResponse,
asWiredStreamGetResponse,
asUnwiredStreamGetResponse,
type IngestGetResponse,
type IngestStreamGetResponse,
type IngestStreamUpsertRequest,
type IngestUpsertRequest,
type UnwiredStreamGetResponse,
type WiredStreamGetResponse,
type UnwiredIngestUpsertRequest,
type WiredIngestUpsertRequest,
};

View file

@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
import { StreamDefinitionBase } from '../base';
import { FieldDefinition, fieldDefinitionSchema } from './fields';
import { ProcessorDefinition, processorDefinitionSchema } from './processors';
import { RoutingDefinition, routingDefinitionSchema } from './routing';
interface IngestBase {
processing: ProcessorDefinition[];
routing: RoutingDefinition[];
}
interface WiredIngest extends IngestBase {
wired: {
fields: FieldDefinition;
};
}
interface UnwiredIngest extends IngestBase {
unwired: {};
}
interface WiredStreamDefinitionBase {
ingest: WiredIngest;
}
interface UnwiredStreamDefinitionBase {
ingest: UnwiredIngest;
}
interface WiredStreamDefinition extends StreamDefinitionBase {
ingest: WiredIngest;
}
interface UnwiredStreamDefinition extends StreamDefinitionBase {
ingest: UnwiredIngest;
}
type IngestStreamDefinition = WiredStreamDefinition | UnwiredStreamDefinition;
const ingestBaseSchema: z.Schema<IngestBase> = z.object({
processing: z.array(processorDefinitionSchema),
routing: z.array(routingDefinitionSchema),
});
const unwiredIngestSchema: z.Schema<UnwiredIngest> = z.intersection(
ingestBaseSchema,
z.object({
unwired: z.object({}),
})
);
const wiredIngestSchema: z.Schema<WiredIngest> = z.intersection(
ingestBaseSchema,
z.object({
wired: z.object({
fields: fieldDefinitionSchema,
}),
})
);
const unwiredStreamDefinitionSchemaBase: z.Schema<UnwiredStreamDefinitionBase> = z.object({
ingest: unwiredIngestSchema,
});
const wiredStreamDefinitionSchemaBase: z.Schema<WiredStreamDefinitionBase> = z.object({
ingest: wiredIngestSchema,
});
const wiredStreamDefinitionSchema: z.Schema<WiredStreamDefinition> = z.intersection(
z.object({
name: NonEmptyString,
}),
wiredStreamDefinitionSchemaBase
);
const unwiredStreamDefinitionSchema: z.Schema<UnwiredStreamDefinition> = z.intersection(
z.object({
name: NonEmptyString,
}),
unwiredStreamDefinitionSchemaBase
);
const ingestStreamDefinitionSchema: z.Schema<IngestStreamDefinition> = z.union([
wiredStreamDefinitionSchema,
unwiredStreamDefinitionSchema,
]);
const ingestStreamDefinitionSchemaBase: z.Schema<Omit<IngestStreamDefinition, 'name'>> = z.union([
wiredStreamDefinitionSchemaBase,
unwiredStreamDefinitionSchemaBase,
]);
export {
type WiredStreamDefinition,
wiredStreamDefinitionSchema,
wiredStreamDefinitionSchemaBase,
type UnwiredStreamDefinition,
unwiredStreamDefinitionSchema,
unwiredStreamDefinitionSchemaBase,
type IngestStreamDefinition,
ingestStreamDefinitionSchema,
ingestStreamDefinitionSchemaBase,
type WiredIngest,
wiredIngestSchema,
type UnwiredIngest,
unwiredIngestSchema,
};

View file

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
const ELASTICSEARCH_ASSET_TYPES = [
'ingest_pipeline',
'component_template',
'index_template',
'data_stream',
] as const;
type ElasticsearchAssetType = (typeof ELASTICSEARCH_ASSET_TYPES)[number];
export interface ElasticsearchAsset {
type: ElasticsearchAssetType;
id: string;
}
export const elasticsearchAssetSchema: z.Schema<ElasticsearchAsset> = z.object({
type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']),
id: NonEmptyString,
});
export interface IngestStreamLifecycleDLM {
type: 'dlm';
data_retention?: string;
}
export interface IngestStreamLifecycleILM {
type: 'ilm';
policy: string;
}
export type IngestStreamLifecycle = IngestStreamLifecycleDLM | IngestStreamLifecycleILM;
export const ingestStreamLifecycleSchema: z.Schema<IngestStreamLifecycle> = z.discriminatedUnion(
'type',
[
z.object({ type: z.literal('dlm'), data_retention: z.optional(NonEmptyString) }),
z.object({ type: z.literal('ilm'), policy: NonEmptyString }),
]
);

View file

@ -0,0 +1,109 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
import { createIsNarrowSchema } from '../../../helpers';
const stringOrNumberOrBoolean = z.union([z.string(), z.number(), z.boolean()]);
export type BinaryOperator =
| 'eq'
| 'neq'
| 'lt'
| 'lte'
| 'gt'
| 'gte'
| 'contains'
| 'startsWith'
| 'endsWith';
export type UnaryOperator = 'exists' | 'notExists';
export interface BinaryFilterCondition {
field: string;
operator: BinaryOperator;
value: string | number | boolean;
}
export interface UnaryFilterCondition {
field: string;
operator: UnaryOperator;
}
export const binaryFilterConditionSchema: z.Schema<BinaryFilterCondition> = z.object({
field: NonEmptyString,
operator: z.enum(['eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'contains', 'startsWith', 'endsWith']),
value: stringOrNumberOrBoolean,
});
export const unaryFilterConditionSchema: z.Schema<UnaryFilterCondition> = z.object({
field: NonEmptyString,
operator: z.enum(['exists', 'notExists']),
});
export const filterConditionSchema = z.union([
unaryFilterConditionSchema,
binaryFilterConditionSchema,
]);
export type FilterCondition = BinaryFilterCondition | UnaryFilterCondition;
export interface AndCondition {
and: Condition[];
}
export interface OrCondition {
or: Condition[];
}
export interface AlwaysCondition {
always: {};
}
export interface NeverCondition {
never: {};
}
export type Condition =
| FilterCondition
| AndCondition
| OrCondition
| NeverCondition
| AlwaysCondition;
export const conditionSchema: z.Schema<Condition> = z.lazy(() =>
z.union([
filterConditionSchema,
andConditionSchema,
orConditionSchema,
neverConditionSchema,
alwaysConditionSchema,
])
);
export const andConditionSchema = z.object({ and: z.array(conditionSchema) });
export const orConditionSchema = z.object({ or: z.array(conditionSchema) });
export const neverConditionSchema = z.object({ never: z.strictObject({}) });
export const alwaysConditionSchema = z.object({ always: z.strictObject({}) });
export const isBinaryFilterCondition = createIsNarrowSchema(
conditionSchema,
binaryFilterConditionSchema
);
export const isUnaryFilterCondition = createIsNarrowSchema(
conditionSchema,
unaryFilterConditionSchema
);
export const isFilterCondition = createIsNarrowSchema(conditionSchema, filterConditionSchema);
export const isAndCondition = createIsNarrowSchema(conditionSchema, andConditionSchema);
export const isOrCondition = createIsNarrowSchema(conditionSchema, orConditionSchema);
export const isNeverCondition = createIsNarrowSchema(conditionSchema, neverConditionSchema);
export const isAlwaysCondition = createIsNarrowSchema(conditionSchema, alwaysConditionSchema);
export const isCondition = createIsNarrowSchema(z.unknown(), conditionSchema);

View file

@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
export const FIELD_DEFINITION_TYPES = [
'keyword',
'match_only_text',
'long',
'double',
'date',
'boolean',
'ip',
] as const;
export type FieldDefinitionType = (typeof FIELD_DEFINITION_TYPES)[number];
export interface FieldDefinitionConfig {
type: FieldDefinitionType;
format?: string;
}
export const fieldDefinitionConfigSchema: z.Schema<FieldDefinitionConfig> = z.object({
type: z.enum(FIELD_DEFINITION_TYPES),
format: z.optional(NonEmptyString),
});
export interface FieldDefinition {
[x: string]: FieldDefinitionConfig;
}
export const fieldDefinitionSchema: z.Schema<FieldDefinition> = z.record(
z.string(),
fieldDefinitionConfigSchema
);
export interface InheritedFieldDefinitionConfig extends FieldDefinitionConfig {
from: string;
}
export interface InheritedFieldDefinition {
[x: string]: InheritedFieldDefinitionConfig;
}
export const inheritedFieldDefinitionSchema: z.Schema<InheritedFieldDefinition> = z.record(
z.string(),
z.intersection(fieldDefinitionConfigSchema, z.object({ from: NonEmptyString }))
);
export interface NamedFieldDefinitionConfig extends FieldDefinitionConfig {
name: string;
}
export const namedFieldDefinitionConfigSchema: z.Schema<NamedFieldDefinitionConfig> =
z.intersection(
fieldDefinitionConfigSchema,
z.object({
name: NonEmptyString,
})
);

View file

@ -5,6 +5,10 @@
* 2.0.
*/
export * from './wired_stream_config';
export * from './ingest_stream_config';
export * from './stream_config';
export * from './base';
export * from './api';
export * from './fields';
export * from './processors';
export * from './conditions';
export * from './routing';
export * from './common';

View file

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
import { Condition, conditionSchema } from '../conditions';
import { createIsNarrowSchema } from '../../../helpers';
export interface ProcessorBase {
if: Condition;
}
export interface GrokProcessorConfig extends ProcessorBase {
field: string;
patterns: string[];
pattern_definitions?: Record<string, string>;
ignore_failure?: boolean;
ignore_missing?: boolean;
}
export interface GrokProcessorDefinition {
grok: GrokProcessorConfig;
}
const processorBaseSchema = z.object({
if: conditionSchema,
});
export const grokProcessorDefinitionSchema: z.Schema<GrokProcessorDefinition> = z.strictObject({
grok: z.intersection(
processorBaseSchema,
z.object({
field: NonEmptyString,
patterns: z.array(NonEmptyString),
pattern_definitions: z.optional(z.record(z.string())),
ignore_failure: z.optional(z.boolean()),
ignore_missing: z.optional(z.boolean()),
})
),
});
export interface DissectProcessorConfig extends ProcessorBase {
field: string;
pattern: string;
append_separator?: string;
ignore_failure?: boolean;
ignore_missing?: boolean;
}
export interface DissectProcessorDefinition {
dissect: DissectProcessorConfig;
}
export const dissectProcessorDefinitionSchema: z.Schema<DissectProcessorDefinition> =
z.strictObject({
dissect: z.intersection(
processorBaseSchema,
z.object({
field: NonEmptyString,
pattern: NonEmptyString,
append_separator: z.optional(NonEmptyString),
ignore_failure: z.optional(z.boolean()),
ignore_missing: z.optional(z.boolean()),
})
),
});
export type ProcessorDefinition = DissectProcessorDefinition | GrokProcessorDefinition;
type UnionKeysOf<T extends Record<string, any>> = T extends T ? keyof T : never;
type BodyOf<T extends Record<string, any>> = T extends T ? T[keyof T] : never;
export type ProcessorConfig = BodyOf<ProcessorDefinition>;
export type ProcessorType = UnionKeysOf<ProcessorDefinition>;
type ProcessorTypeOf<TProcessorDefinition extends ProcessorDefinition> =
UnionKeysOf<TProcessorDefinition> & ProcessorType;
export const processorDefinitionSchema: z.ZodType<ProcessorDefinition> = z.union([
grokProcessorDefinitionSchema,
dissectProcessorDefinitionSchema,
]);
export const isGrokProcessorDefinition = createIsNarrowSchema(
processorDefinitionSchema,
grokProcessorDefinitionSchema
);
export const isDissectProcessorDefinition = createIsNarrowSchema(
processorDefinitionSchema,
dissectProcessorDefinitionSchema
);
export function getProcessorType<TProcessorDefinition extends ProcessorDefinition>(
processor: TProcessorDefinition
): ProcessorTypeOf<TProcessorDefinition> {
return Object.keys(processor)[0] as ProcessorTypeOf<TProcessorDefinition>;
}
export function getProcessorConfig(processor: ProcessorDefinition): ProcessorConfig {
if ('grok' in processor) {
return processor.grok;
}
return processor.dissect;
}

View file

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
import { Condition, conditionSchema } from '../conditions';
export interface RoutingDefinition {
destination: string;
if: Condition;
}
export const routingDefinitionSchema: z.Schema<RoutingDefinition> = z.object({
destination: NonEmptyString,
if: conditionSchema,
});

View file

@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
import {
InheritedFieldDefinition,
UnwiredStreamDefinition,
WiredStreamDefinition,
inheritedFieldDefinitionSchema,
unwiredStreamDefinitionSchema,
wiredStreamDefinitionSchema,
} from './ingest';
import {
ElasticsearchAsset,
IngestStreamLifecycle,
elasticsearchAssetSchema,
ingestStreamLifecycleSchema,
} from './ingest/common';
import { createIsNarrowSchema } from '../helpers';
/**
* These are deprecated types, they should be migrated to the updated types
*/
interface ReadStreamDefinitionBase {
name: string;
dashboards: string[];
elasticsearch_assets: ElasticsearchAsset[];
lifecycle: IngestStreamLifecycle;
inherited_fields: InheritedFieldDefinition;
}
interface WiredReadStreamDefinition extends ReadStreamDefinitionBase {
stream: WiredStreamDefinition;
}
interface UnwiredReadStreamDefinition extends ReadStreamDefinitionBase {
stream: UnwiredStreamDefinition;
}
type ReadStreamDefinition = WiredReadStreamDefinition | UnwiredReadStreamDefinition;
const readStreamDefinitionSchemaBase: z.Schema<ReadStreamDefinitionBase> = z.object({
name: z.string(),
dashboards: z.array(NonEmptyString),
elasticsearch_assets: z.array(elasticsearchAssetSchema),
inherited_fields: inheritedFieldDefinitionSchema,
lifecycle: ingestStreamLifecycleSchema,
});
const wiredReadStreamDefinitionSchema: z.Schema<WiredReadStreamDefinition> = z.intersection(
readStreamDefinitionSchemaBase,
z.object({
stream: wiredStreamDefinitionSchema,
})
);
const unwiredReadStreamDefinitionSchema: z.Schema<UnwiredReadStreamDefinition> = z.intersection(
readStreamDefinitionSchemaBase,
z.object({
stream: unwiredStreamDefinitionSchema,
})
);
const readStreamSchema: z.Schema<ReadStreamDefinition> = z.union([
wiredReadStreamDefinitionSchema,
unwiredReadStreamDefinitionSchema,
]);
const isReadStream = createIsNarrowSchema(z.unknown(), readStreamSchema);
const isWiredReadStream = createIsNarrowSchema(readStreamSchema, wiredReadStreamDefinitionSchema);
const isUnwiredReadStream = createIsNarrowSchema(
readStreamSchema,
unwiredReadStreamDefinitionSchema
);
export {
readStreamSchema,
type ReadStreamDefinition,
type WiredReadStreamDefinition,
type UnwiredReadStreamDefinition,
isReadStream,
isWiredReadStream,
isUnwiredReadStream,
wiredReadStreamDefinitionSchema,
};

View file

@ -1,110 +0,0 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`ReadStream should successfully parse ingestReadStream 1`] = `
Object {
"elasticsearch_assets": Array [],
"inherited_fields": Object {
"@timestamp": Object {
"from": "logs",
"type": "date",
},
"message": Object {
"from": "logs",
"type": "match_only_text",
},
},
"lifecycle": Object {
"type": "dlm",
},
"name": "logs.nginx",
"stream": Object {
"ingest": Object {
"processing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"config": Object {
"grok": Object {
"field": "message",
"patterns": Array [
"%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}",
],
},
},
},
],
"routing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"name": "logs.errors",
},
],
},
},
}
`;
exports[`ReadStream should successfully parse wiredReadStream 1`] = `
Object {
"elasticsearch_assets": Array [],
"inherited_fields": Object {
"@timestamp": Object {
"from": "logs",
"type": "date",
},
"message": Object {
"from": "logs",
"type": "match_only_text",
},
},
"lifecycle": Object {
"type": "dlm",
},
"name": "logs.nginx",
"stream": Object {
"ingest": Object {
"processing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"config": Object {
"grok": Object {
"field": "message",
"patterns": Array [
"%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}",
],
},
},
},
],
"routing": Array [
Object {
"condition": Object {
"field": "log.level",
"operator": "eq",
"value": "error",
},
"name": "logs.errors",
},
],
"wired": Object {
"fields": Object {
"new_field": Object {
"type": "long",
},
},
},
},
},
}
`;

View file

@ -1,10 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './ingest_read_stream';
export * from './wired_read_stream';
export * from './read_stream';

View file

@ -1,19 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { ingestStreamDefinitonSchema } from '../streams';
import { inheritedFieldDefinitionSchema, lifecycleSchema } from '../common';
export const ingestReadStreamDefinitonSchema = ingestStreamDefinitonSchema
.extend({
inherited_fields: inheritedFieldDefinitionSchema,
lifecycle: lifecycleSchema,
})
.strict();
export type IngestReadStreamDefinition = z.infer<typeof ingestReadStreamDefinitonSchema>;

View file

@ -1,19 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ingestReadStream } from '../../fixtures/ingest_read_stream';
import { wiredReadStream } from '../../fixtures/wired_read_stream';
import { readStreamDefinitonSchema } from './read_stream';
describe('ReadStream', () => {
it('should successfully parse wiredReadStream', () => {
expect(readStreamDefinitonSchema.parse(wiredReadStream)).toMatchSnapshot();
});
it('should successfully parse ingestReadStream', () => {
expect(readStreamDefinitonSchema.parse(ingestReadStream)).toMatchSnapshot();
});
});

View file

@ -1,17 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { ingestReadStreamDefinitonSchema } from './ingest_read_stream';
import { wiredReadStreamDefinitonSchema } from './wired_read_stream';
export const readStreamDefinitonSchema = z.union([
wiredReadStreamDefinitonSchema,
ingestReadStreamDefinitonSchema,
]);
export type ReadStreamDefinition = z.infer<typeof readStreamDefinitonSchema>;

View file

@ -1,19 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { wiredStreamDefinitonSchema } from '../streams';
import { inheritedFieldDefinitionSchema, lifecycleSchema } from '../common';
export const wiredReadStreamDefinitonSchema = wiredStreamDefinitonSchema
.extend({
inherited_fields: inheritedFieldDefinitionSchema,
lifecycle: lifecycleSchema,
})
.strict();
export type WiredReadStreamDefinition = z.infer<typeof wiredReadStreamDefinitonSchema>;

View file

@ -1,20 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { processingDefinitionSchema, streamChildSchema } from '../common';
export const ingestStreamConfigDefinitonSchema = z
.object({
ingest: z.object({
processing: z.array(processingDefinitionSchema),
routing: z.array(streamChildSchema),
}),
})
.strict();
export type IngestStreamConfigDefinition = z.infer<typeof ingestStreamConfigDefinitonSchema>;

View file

@ -1,17 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { ingestStreamConfigDefinitonSchema } from './ingest_stream_config';
import { wiredStreamConfigDefinitonSchema } from './wired_stream_config';
export const streamConfigDefinitionSchema = z.union([
wiredStreamConfigDefinitonSchema,
ingestStreamConfigDefinitonSchema,
]);
export type StreamConfigDefinition = z.infer<typeof streamConfigDefinitionSchema>;

View file

@ -1,23 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { fieldDefinitionSchema, processingDefinitionSchema, streamChildSchema } from '../common';
export const wiredStreamConfigDefinitonSchema = z
.object({
ingest: z.object({
processing: z.array(processingDefinitionSchema),
wired: z.object({
fields: fieldDefinitionSchema,
}),
routing: z.array(streamChildSchema),
}),
})
.strict();
export type WiredStreamConfigDefinition = z.infer<typeof wiredStreamConfigDefinitonSchema>;

View file

@ -1,10 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './ingest_stream';
export * from './wired_stream';
export * from './stream';

View file

@ -1,21 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { ingestStreamConfigDefinitonSchema } from '../stream_config';
import { elasticsearchAssetSchema } from '../common';
export const ingestStreamDefinitonSchema = z
.object({
name: z.string(),
elasticsearch_assets: z.optional(elasticsearchAssetSchema),
stream: ingestStreamConfigDefinitonSchema,
dashboards: z.optional(z.array(z.string())),
})
.strict();
export type IngestStreamDefinition = z.infer<typeof ingestStreamDefinitonSchema>;

View file

@ -1,17 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { wiredStreamDefinitonSchema } from './wired_stream';
import { ingestStreamDefinitonSchema } from './ingest_stream';
export const streamDefinitionSchema = z.union([
wiredStreamDefinitonSchema,
ingestStreamDefinitonSchema,
]);
export type StreamDefinition = z.infer<typeof streamDefinitionSchema>;

View file

@ -1,21 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { wiredStreamConfigDefinitonSchema } from '../stream_config';
import { elasticsearchAssetSchema } from '../common';
export const wiredStreamDefinitonSchema = z
.object({
name: z.string(),
elasticsearch_assets: z.optional(elasticsearchAssetSchema),
stream: wiredStreamConfigDefinitonSchema,
dashboards: z.optional(z.array(z.string())),
})
.strict();
export type WiredStreamDefinition = z.infer<typeof wiredStreamDefinitonSchema>;

View file

@ -11,7 +11,8 @@
"**/*.ts"
],
"kbn_references": [
"@kbn/zod"
"@kbn/zod",
"@kbn/zod-helpers"
],
"exclude": [
"target/**/*"

View file

@ -15,16 +15,17 @@ import type { IScopedClusterClient, Logger } from '@kbn/core/server';
import { isResponseError } from '@kbn/es-errors';
import {
Condition,
IngestStreamDefinition,
StreamDefinition,
StreamUpsertRequest,
UnwiredStreamDefinition,
WiredStreamDefinition,
assertsSchema,
getAncestors,
getParentId,
isChildOf,
isIngestStream,
isRootStream,
isWiredStream,
isRootStreamDefinition,
isUnwiredStreamDefinition,
isWiredStreamDefinition,
streamDefinitionSchema,
} from '@kbn/streams-schema';
import { cloneDeep, keyBy, omit, orderBy } from 'lodash';
@ -32,7 +33,7 @@ import { AssetClient } from './assets/asset_client';
import { DefinitionNotFound, SecurityException } from './errors';
import { MalformedStreamId } from './errors/malformed_stream_id';
import {
syncIngestStreamDefinitionObjects,
syncUnwiredStreamDefinitionObjects,
syncWiredStreamDefinitionObjects,
} from './helpers/sync';
import { validateAncestorFields, validateDescendantFields } from './helpers/validate_fields';
@ -48,7 +49,6 @@ import {
checkAccessBulk,
deleteStreamObjects,
deleteUnmanagedStreamObjects,
getUnmanagedElasticsearchAssets,
} from './stream_crud';
interface AcknowledgeResponse<TResult extends Result> {
@ -92,7 +92,7 @@ export class StreamsClient {
*/
async isStreamsEnabled(): Promise<boolean> {
const rootLogsStreamExists = await this.getStream(LOGS_ROOT_STREAM_NAME)
.then((definition) => isWiredStream(definition))
.then((definition) => isWiredStreamDefinition(definition))
.catch((error) => {
if (isDefinitionNotFoundError(error)) {
return false;
@ -115,7 +115,11 @@ export class StreamsClient {
}
await this.upsertStream({
definition: rootStreamDefinition,
request: {
dashboards: [],
stream: omit(rootStreamDefinition, 'name'),
},
name: rootStreamDefinition.name,
});
return { acknowledged: true, result: 'created' };
@ -172,30 +176,23 @@ export class StreamsClient {
}
private async syncStreamObjects({ definition }: { definition: StreamDefinition }) {
const { assetClient, logger, scopedClusterClient } = this.dependencies;
const { logger, scopedClusterClient } = this.dependencies;
if (isWiredStream(definition)) {
if (isWiredStreamDefinition(definition)) {
await syncWiredStreamDefinitionObjects({
definition,
logger,
scopedClusterClient,
isServerless: this.dependencies.isServerless,
});
} else if (isIngestStream(definition)) {
await syncIngestStreamDefinitionObjects({
} else if (isUnwiredStreamDefinition(definition)) {
await syncUnwiredStreamDefinitionObjects({
definition,
scopedClusterClient,
logger,
dataStream: await this.getDataStream(definition.name),
});
}
await assetClient.syncAssetList({
entityId: definition.name,
entityType: 'stream',
assetType: 'dashboard',
assetIds: definition.dashboards ?? [],
});
}
/**
@ -203,17 +200,21 @@ export class StreamsClient {
* also updated (including syncing to Elasticsearch).
*/
async upsertStream({
definition,
name,
request,
}: {
definition: StreamDefinition;
name: string;
request: StreamUpsertRequest;
}): Promise<UpsertStreamResponse> {
const stream: StreamDefinition = { ...request.stream, name };
const { dashboards } = request;
const { result, parentDefinition } = await this.validateAndUpsertStream({
definition,
definition: stream,
});
if (parentDefinition) {
const isRoutingToChild = parentDefinition.stream.ingest.routing.find(
(item) => item.name === definition.name
const isRoutingToChild = parentDefinition.ingest.routing.find(
(item) => item.destination === name
);
if (!isRoutingToChild) {
@ -222,26 +223,29 @@ export class StreamsClient {
// The user can set the condition later on the parent
await this.updateStreamRouting({
definition: parentDefinition,
routing: parentDefinition.stream.ingest.routing.concat({
name: definition.name,
routing: parentDefinition.ingest.routing.concat({
destination: name,
if: { never: {} },
}),
});
}
} else if (isWiredStream(definition)) {
} else if (isWiredStreamDefinition(stream)) {
// if there is no parent, this is either the root stream, or
// there are intermediate streams missing in the tree.
// In the latter case, we need to create the intermediate streams first.
const parentId = getParentId(definition.name);
const parentId = getParentId(stream.name);
if (parentId) {
await this.upsertStream({
definition: {
name: parentId,
name: parentId,
request: {
dashboards: [],
stream: {
ingest: {
processing: [],
routing: [
{
name: definition.name,
destination: stream.name,
if: { never: {} },
},
],
wired: {
@ -254,9 +258,15 @@ export class StreamsClient {
}
}
await this.dependencies.assetClient.syncAssetList({
entityId: stream.name,
entityType: 'stream',
assetIds: dashboards,
assetType: 'dashboard',
});
return { acknowledged: true, result };
}
/**
* `validateAndUpsertStream` does the following things:
* - determining whether the given definition is valid
@ -283,7 +293,7 @@ export class StreamsClient {
validateStreamTypeChanges(existingDefinition, definition);
}
if (isRootStream(definition)) {
if (isRootStreamDefinition(definition)) {
// only allow selective updates to a root stream
validateRootStreamChanges(
(existingDefinition as undefined | WiredStreamDefinition) || rootStreamDefinition,
@ -291,7 +301,7 @@ export class StreamsClient {
);
}
if (isWiredStream(definition)) {
if (isWiredStreamDefinition(definition)) {
const validateWiredStreamResult = await this.validateWiredStreamAndCreateChildrenIfNeeded({
existingDefinition: existingDefinition as WiredStreamDefinition,
definition,
@ -343,43 +353,46 @@ export class StreamsClient {
// If no existing definition exists, this is a fork via upsert,
// and we need to validate whether the parent is a wired stream
if (!existingDefinition && parentId && parentDefinition && !isWiredStream(parentDefinition)) {
if (
!existingDefinition &&
parentId &&
parentDefinition &&
!isWiredStreamDefinition(parentDefinition)
) {
throw new MalformedStreamId('Cannot fork a stream that is not managed');
}
validateAncestorFields({
ancestors,
fields: definition.stream.ingest.wired.fields,
fields: definition.ingest.wired.fields,
});
validateDescendantFields({
descendants,
fields: definition.stream.ingest.wired.fields,
fields: definition.ingest.wired.fields,
});
if (existingDefinition) {
validateStreamChildrenChanges(existingDefinition, definition);
}
for (const child of definition.stream.ingest.routing) {
if (descendantsById[child.name]) {
for (const item of definition.ingest.routing) {
if (descendantsById[item.destination]) {
continue;
}
if (!isChildOf(definition.name, child.name)) {
if (!isChildOf(definition.name, item.destination)) {
throw new MalformedStreamId(
`The ID (${child.name}) from the child stream must start with the parent's id (${definition.name}), followed by a dot and a name`
`The ID (${item.destination}) from the child stream must start with the parent's id (${definition.name}), followed by a dot and a name`
);
}
await this.validateAndUpsertStream({
definition: {
name: child.name,
stream: {
ingest: {
processing: [],
routing: [],
wired: {
fields: {},
},
name: item.destination,
ingest: {
processing: [],
routing: [],
wired: {
fields: {},
},
},
},
@ -404,23 +417,21 @@ export class StreamsClient {
async forkStream({
parent,
name,
condition,
if: condition,
}: {
parent: string;
name: string;
condition: Condition;
if: Condition;
}): Promise<ForkStreamResponse> {
const parentDefinition = await this.getStream(parent);
const childDefinition: WiredStreamDefinition = {
name,
stream: { ingest: { processing: [], routing: [], wired: { fields: {} } } },
ingest: { processing: [], routing: [], wired: { fields: {} } },
};
// check whether root stream has a child of the given name already
if (
parentDefinition.stream.ingest.routing.some((child) => child.name === childDefinition.name)
) {
if (parentDefinition.ingest.routing.some((item) => item.destination === childDefinition.name)) {
throw new MalformedStreamId(
`The stream with ID (${name}) already exists as a child of the parent stream`
);
@ -439,9 +450,9 @@ export class StreamsClient {
await this.updateStreamRouting({
definition: updatedParentDefinition!,
routing: parentDefinition.stream.ingest.routing.concat({
name,
condition,
routing: parentDefinition.ingest.routing.concat({
destination: name,
if: condition,
}),
});
@ -509,22 +520,16 @@ export class StreamsClient {
*/
private async getDataStreamAsIngestStream(
dataStream: IndicesDataStream
): Promise<IngestStreamDefinition> {
const definition: IngestStreamDefinition = {
): Promise<UnwiredStreamDefinition> {
const definition: UnwiredStreamDefinition = {
name: dataStream.name,
stream: {
ingest: {
routing: [],
processing: [],
},
ingest: {
routing: [],
processing: [],
unwired: {},
},
};
definition.elasticsearch_assets = await getUnmanagedElasticsearchAssets({
dataStream,
scopedClusterClient: this.dependencies.scopedClusterClient,
});
return definition;
}
@ -568,20 +573,19 @@ export class StreamsClient {
}
/**
* Lists all unmanaged streams (ingest streams without a
* Lists all unmanaged streams (unwired streams without a
* stored definition).
*/
private async getUnmanagedDataStreams(): Promise<IngestStreamDefinition[]> {
private async getUnmanagedDataStreams(): Promise<UnwiredStreamDefinition[]> {
const response =
await this.dependencies.scopedClusterClient.asCurrentUser.indices.getDataStream();
return response.data_streams.map((dataStream) => ({
name: dataStream.name,
stream: {
ingest: {
processing: [],
routing: [],
},
ingest: {
processing: [],
routing: [],
unwired: {},
},
}));
}
@ -625,7 +629,7 @@ export class StreamsClient {
private async deleteStreamFromDefinition(definition: StreamDefinition): Promise<void> {
const { assetClient, logger, scopedClusterClient } = this.dependencies;
if (!isWiredStream(definition)) {
if (!isWiredStreamDefinition(definition)) {
await deleteUnmanagedStreamObjects({
scopedClusterClient,
id: definition.name,
@ -640,16 +644,16 @@ export class StreamsClient {
await this.updateStreamRouting({
definition: parentDefinition,
routing: parentDefinition.stream.ingest.routing.filter(
(child) => child.name !== definition.name
routing: parentDefinition.ingest.routing.filter(
(item) => item.destination !== definition.name
),
});
}
// delete the children first, as this will update
// the parent as well
for (const child of definition.stream.ingest.routing) {
await this.deleteStream(child.name);
for (const item of definition.ingest.routing) {
await this.deleteStream(item.destination);
}
await deleteStreamObjects({ scopedClusterClient, id: definition.name, logger });
@ -676,10 +680,10 @@ export class StreamsClient {
routing,
}: {
definition: WiredStreamDefinition;
routing: WiredStreamDefinition['stream']['ingest']['routing'];
routing: WiredStreamDefinition['ingest']['routing'];
}) {
const update = cloneDeep(definition);
update.stream.ingest.routing = routing;
update.ingest.routing = routing;
await this.updateStoredStream(update);
@ -712,7 +716,7 @@ export class StreamsClient {
}
const parentId = getParentId(name);
if (isWiredStream(definition) && !parentId) {
if (isWiredStreamDefinition(definition) && !parentId) {
throw new MalformedStreamId('Cannot delete root stream');
}
@ -721,12 +725,7 @@ export class StreamsClient {
return { acknowledged: true, result: 'deleted' };
}
private async updateStoredStream(
definition: Omit<
StreamDefinition,
'dashboards' | 'elasticsearch_assets' | 'inherited_fields' | 'lifecycle'
>
) {
private async updateStoredStream(definition: StreamDefinition) {
return this.dependencies.storageClient.index({
id: definition.name,
document: omit(
@ -748,7 +747,7 @@ export class StreamsClient {
filter: [{ terms: { name: ancestorIds } }],
},
},
}).then((streams) => streams.filter(isWiredStream));
}).then((streams) => streams.filter(isWiredStreamDefinition));
}
async getDescendants(name: string): Promise<WiredStreamDefinition[]> {
@ -771,6 +770,6 @@ export class StreamsClient {
],
},
},
}).then((streams) => streams.filter(isWiredStream));
}).then((streams) => streams.filter(isWiredStreamDefinition));
}
}

View file

@ -20,7 +20,7 @@ export function generateLayer(
definition: WiredStreamDefinition
): ClusterPutComponentTemplateRequest {
const properties: Record<string, MappingProperty> = {};
Object.entries(definition.stream.ingest.wired.fields).forEach(([field, props]) => {
Object.entries(definition.ingest.wired.fields).forEach(([field, props]) => {
const property: MappingProperty = {
type: props.type,
};

View file

@ -66,12 +66,3 @@ function getFieldTypeForFilterCondition(condition: FilterCondition): 'number' |
return 'string';
}
}
export function validateCondition(condition: Condition) {
if (isFilterCondition(condition)) {
// check whether a field is specified
if (!condition.field.trim()) {
throw new Error('Field is required in conditions');
}
}
}

View file

@ -11,16 +11,20 @@ import {
Condition,
FilterCondition,
UnaryFilterCondition,
isAlwaysCondition,
isAndCondition,
isFilterCondition,
isNeverCondition,
isOrCondition,
isUnaryFilterCondition,
} from '@kbn/streams-schema';
function safePainlessField(conditionOrField: FilterCondition | string) {
if (isFilterCondition(conditionOrField)) {
return `ctx.${conditionOrField.field.split('.').join('?.')}`;
if (typeof conditionOrField === 'string') {
return `ctx.${conditionOrField.split('.').join('?.')}`;
}
return `ctx.${conditionOrField.split('.').join('?.')}`;
return `ctx.${conditionOrField.field.split('.').join('?.')}`;
}
function encodeValue(value: string | number | boolean) {
@ -101,10 +105,6 @@ function unaryToPainless(condition: UnaryFilterCondition) {
}
}
function isUnaryFilterCondition(subject: FilterCondition): subject is UnaryFilterCondition {
return !('value' in subject);
}
function extractAllFields(condition: Condition, fields: string[] = []): string[] {
if (isFilterCondition(condition) && !isUnaryFilterCondition(condition)) {
return uniq([...fields, condition.field]);
@ -116,7 +116,7 @@ function extractAllFields(condition: Condition, fields: string[] = []): string[]
return uniq(fields);
}
export function conditionToStatement(condition?: Condition, nested = false): string {
export function conditionToStatement(condition: Condition, nested = false): string {
if (isFilterCondition(condition)) {
if (isUnaryFilterCondition(condition)) {
return unaryToPainless(condition);
@ -131,13 +131,26 @@ export function conditionToStatement(condition?: Condition, nested = false): str
const or = condition.or.map((filter) => conditionToStatement(filter, true)).join(' || ');
return nested ? `(${or})` : or;
}
return 'false';
if (isAlwaysCondition(condition)) {
return `true;`;
}
if (isNeverCondition(condition)) {
return `false;`;
}
throw new Error('Unsupported condition');
}
export function conditionToPainless(condition?: Condition): string {
if (!condition) {
return 'false';
export function conditionToPainless(condition: Condition): string {
if (isNeverCondition(condition)) {
return `return false`;
}
if (isAlwaysCondition(condition)) {
return `return true`;
}
const fields = extractAllFields(condition);
let fieldCheck = '';
if (fields.length !== 0) {

View file

@ -5,21 +5,20 @@
* 2.0.
*/
import { ProcessingDefinition, getProcessorType } from '@kbn/streams-schema';
import { get } from 'lodash';
import { ProcessorDefinition, getProcessorConfig, getProcessorType } from '@kbn/streams-schema';
import { IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types';
import { conditionToPainless } from './condition_to_painless';
export function formatToIngestProcessors(
processing: ProcessingDefinition[]
processing: ProcessorDefinition[]
): IngestProcessorContainer[] {
return processing.map((processor) => {
const config = getProcessorConfig(processor);
const type = getProcessorType(processor);
const config = get(processor.config, type);
return {
[type]: {
...config,
if: processor.condition ? conditionToPainless(processor.condition) : undefined,
if: conditionToPainless(config.if),
},
};
});

View file

@ -7,8 +7,8 @@
import { IScopedClusterClient, Logger } from '@kbn/core/server';
import {
IngestStreamDefinition,
StreamDefinition,
UnwiredStreamDefinition,
WiredStreamDefinition,
} from '@kbn/streams-schema';
import { isResponseError } from '@kbn/es-errors';
@ -187,15 +187,15 @@ async function ensureStreamManagedPipelineReference(
}
}
export async function syncIngestStreamDefinitionObjects({
export async function syncUnwiredStreamDefinitionObjects({
definition,
dataStream,
scopedClusterClient,
}: SyncStreamParamsBase & {
dataStream: IndicesDataStream;
definition: IngestStreamDefinition;
definition: UnwiredStreamDefinition;
}) {
if (definition.stream.ingest.routing.length) {
if (definition.ingest.routing.length) {
throw new Error('Unmanaged streams cannot have managed children, coming soon');
}
const unmanagedAssets = await getUnmanagedElasticsearchAssets({
@ -218,7 +218,7 @@ export async function syncIngestStreamDefinitionObjects({
executionPlan
);
if (definition.stream.ingest.processing.length) {
if (definition.ingest.processing.length) {
// if the stream has processing, we need to create or update the stream managed pipeline
executionPlan.push({
method: 'PUT',

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { FieldDefinition, WiredStreamDefinition, isWiredStream } from '@kbn/streams-schema';
import { FieldDefinition, WiredStreamDefinition } from '@kbn/streams-schema';
import { MalformedFields } from '../errors/malformed_fields';
export function validateAncestorFields({
@ -19,8 +19,7 @@ export function validateAncestorFields({
for (const fieldName in fields) {
if (
Object.hasOwn(fields, fieldName) &&
isWiredStream(ancestor) &&
Object.entries(ancestor.stream.ingest.wired.fields).some(
Object.entries(ancestor.ingest.wired.fields).some(
([ancestorFieldName, attr]) =>
attr.type !== fields[fieldName].type && ancestorFieldName === fieldName
)
@ -44,8 +43,7 @@ export function validateDescendantFields({
for (const fieldName in fields) {
if (
Object.hasOwn(fields, fieldName) &&
isWiredStream(descendant) &&
Object.entries(descendant.stream.ingest.wired.fields).some(
Object.entries(descendant.ingest.wired.fields).some(
([descendantFieldName, attr]) =>
attr.type !== fields[fieldName].type && descendantFieldName === fieldName
)

View file

@ -5,7 +5,12 @@
* 2.0.
*/
import { StreamDefinition, WiredStreamDefinition, isWiredStream } from '@kbn/streams-schema';
import {
StreamDefinition,
WiredStreamDefinition,
isUnwiredStreamDefinition,
isWiredStreamDefinition,
} from '@kbn/streams-schema';
import { difference, isEqual } from 'lodash';
import { RootStreamImmutabilityException } from '../errors';
import { MalformedStream } from '../errors/malformed_stream';
@ -20,8 +25,8 @@ export function validateRootStreamChanges(
nextStreamDefinition: WiredStreamDefinition
) {
const hasFieldChanges = !isEqual(
currentStreamDefinition.stream.ingest.wired.fields,
nextStreamDefinition.stream.ingest.wired.fields
currentStreamDefinition.ingest.wired.fields,
nextStreamDefinition.ingest.wired.fields
);
if (hasFieldChanges) {
@ -29,8 +34,8 @@ export function validateRootStreamChanges(
}
const hasProcessingChanges = !isEqual(
currentStreamDefinition.stream.ingest.processing,
nextStreamDefinition.stream.ingest.processing
currentStreamDefinition.ingest.processing,
nextStreamDefinition.ingest.processing
);
if (hasProcessingChanges) {
@ -45,18 +50,20 @@ export function validateStreamTypeChanges(
currentStreamDefinition: StreamDefinition,
nextStreamDefinition: StreamDefinition
) {
const fromIngestToWired =
!isWiredStream(currentStreamDefinition) && isWiredStream(nextStreamDefinition);
const fromUnwiredToWired =
isUnwiredStreamDefinition(currentStreamDefinition) &&
isWiredStreamDefinition(nextStreamDefinition);
if (fromIngestToWired) {
throw new MalformedStream('Cannot change ingest stream to wired stream');
if (fromUnwiredToWired) {
throw new MalformedStream('Cannot change unwired stream to wired stream');
}
const fromWiredToIngest =
isWiredStream(currentStreamDefinition) && !isWiredStream(nextStreamDefinition);
const fromWiredToUnwired =
isWiredStreamDefinition(currentStreamDefinition) &&
isUnwiredStreamDefinition(nextStreamDefinition);
if (fromWiredToIngest) {
throw new MalformedStream('Cannot change wired stream to ingest stream');
if (fromWiredToUnwired) {
throw new MalformedStream('Cannot change wired stream to unwired stream');
}
}
@ -67,9 +74,13 @@ export function validateStreamChildrenChanges(
currentStreamDefinition: WiredStreamDefinition,
nextStreamDefinition: WiredStreamDefinition
) {
const existingChildren = currentStreamDefinition.stream.ingest.routing.map((child) => child.name);
const existingChildren = currentStreamDefinition.ingest.routing.map(
(routingDefinition) => routingDefinition.destination
);
const nextChildren = nextStreamDefinition.stream.ingest.routing.map((child) => child.name);
const nextChildren = nextStreamDefinition.ingest.routing.map(
(routingDefinition) => routingDefinition.destination
);
const removedChildren = difference(existingChildren, nextChildren);

View file

@ -16,7 +16,7 @@ export function generateIngestPipeline(id: string, definition: StreamDefinition)
id: getProcessingPipelineName(id),
processors: [
...(isRoot(definition.name) ? logsDefaultPipelineProcessors : []),
...formatToIngestProcessors(definition.stream.ingest.processing),
...formatToIngestProcessors(definition.ingest.processing),
{
pipeline: {
name: `${id}@stream.reroutes`,
@ -34,7 +34,7 @@ export function generateIngestPipeline(id: string, definition: StreamDefinition)
export function generateClassicIngestPipelineBody(definition: StreamDefinition) {
return {
processors: formatToIngestProcessors(definition.stream.ingest.processing),
processors: formatToIngestProcessors(definition.ingest.processing),
_meta: {
description: `Stream-managed pipeline for the ${definition.name} stream`,
managed: true,

View file

@ -17,11 +17,11 @@ interface GenerateReroutePipelineParams {
export function generateReroutePipeline({ definition }: GenerateReroutePipelineParams) {
return {
id: getReroutePipelineName(definition.name),
processors: definition.stream.ingest.routing.map((child) => {
processors: definition.ingest.routing.map((child) => {
return {
reroute: {
destination: child.name,
if: conditionToPainless(child.condition),
destination: child.destination,
if: conditionToPainless(child.if),
},
};
}),

View file

@ -9,24 +9,22 @@ import { WiredStreamDefinition } from '@kbn/streams-schema';
export const rootStreamDefinition: WiredStreamDefinition = {
name: 'logs',
stream: {
ingest: {
processing: [],
routing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
},
ingest: {
processing: [],
routing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
},
},
},

View file

@ -21,11 +21,7 @@ export const streamsStorageSettings = {
schema: {
properties: {
name: types.keyword(),
stream: types.object({
properties: {
ingest: types.object({ enabled: false }),
},
}),
ingest: types.object({ enabled: false }),
},
},
} satisfies StorageSettings;

View file

@ -8,7 +8,7 @@
import { IndicesDataStream, IngestPipeline } from '@elastic/elasticsearch/lib/api/types';
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import { StreamLifecycle } from '@kbn/streams-schema';
import { IngestStreamLifecycle } from '@kbn/streams-schema/src/models/ingest/common';
import { deleteComponent } from './component_templates/manage_component_templates';
import { getComponentTemplateName } from './component_templates/name';
import { deleteDataStream } from './data_streams/manage_data_streams';
@ -27,7 +27,7 @@ interface DeleteStreamParams extends BaseParams {
logger: Logger;
}
export function getDataStreamLifecycle(dataStream: IndicesDataStream): StreamLifecycle {
export function getDataStreamLifecycle(dataStream: IndicesDataStream): IngestStreamLifecycle {
if (
dataStream.ilm_policy &&
(!dataStream.lifecycle || typeof dataStream.prefer_ilm === 'undefined' || dataStream.prefer_ilm)

View file

@ -12,6 +12,7 @@ import { enablementRoutes } from './streams/enablement/route';
import { managementRoutes } from './streams/management/route';
import { schemaRoutes } from './streams/schema/route';
import { processingRoutes } from './streams/processing/route';
import { ingestRoutes } from './streams/ingest/route';
export const streamsRouteRepository = {
...esqlRoutes,
@ -21,6 +22,7 @@ export const streamsRouteRepository = {
...managementRoutes,
...schemaRoutes,
...processingRoutes,
...ingestRoutes,
};
export type StreamsRouteRepository = typeof streamsRouteRepository;

View file

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { omit } from 'lodash';
import {
IngestStreamGetResponse,
InheritedFieldDefinition,
WiredStreamGetResponse,
isUnwiredStreamDefinition,
} from '@kbn/streams-schema';
import { IScopedClusterClient } from '@kbn/core/server';
import { AssetClient } from '../../../lib/streams/assets/asset_client';
import { StreamsClient } from '../../../lib/streams/client';
import {
getDataStreamLifecycle,
getUnmanagedElasticsearchAssets,
} from '../../../lib/streams/stream_crud';
export async function readStream({
name,
assetClient,
streamsClient,
scopedClusterClient,
}: {
name: string;
assetClient: AssetClient;
streamsClient: StreamsClient;
scopedClusterClient: IScopedClusterClient;
}): Promise<IngestStreamGetResponse> {
const [streamDefinition, dashboards, ancestors, dataStream] = await Promise.all([
streamsClient.getStream(name),
assetClient.getAssetIds({
entityId: name,
entityType: 'stream',
assetType: 'dashboard',
}),
streamsClient.getAncestors(name),
streamsClient.getDataStream(name),
]);
const lifecycle = getDataStreamLifecycle(dataStream);
if (isUnwiredStreamDefinition(streamDefinition)) {
return {
stream: omit(streamDefinition, 'name'),
elasticsearch_assets: await getUnmanagedElasticsearchAssets({
dataStream,
scopedClusterClient,
}),
lifecycle,
dashboards,
inherited_fields: {},
};
}
const body: WiredStreamGetResponse = {
stream: omit(streamDefinition, 'name'),
dashboards,
lifecycle,
inherited_fields: ancestors.reduce((acc, def) => {
Object.entries(def.ingest.wired.fields).forEach(([key, fieldDef]) => {
acc[key] = { ...fieldDef, from: def.name };
});
return acc;
}, {} as InheritedFieldDefinition),
};
return body;
}

View file

@ -5,19 +5,15 @@
* 2.0.
*/
import { z } from '@kbn/zod';
import { badRequest, internal, notFound } from '@hapi/boom';
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import {
streamConfigDefinitionSchema,
ListStreamsResponse,
FieldDefinitionConfig,
ReadStreamDefinition,
WiredReadStreamDefinition,
isWiredStream,
} from '@kbn/streams-schema';
import { badRequest, internal, notFound } from '@hapi/boom';
import { isResponseError } from '@kbn/es-errors';
import { MalformedStreamId } from '../../../lib/streams/errors/malformed_stream_id';
import {
StreamDefinition,
StreamGetResponse,
streamUpsertRequestSchema,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import {
DefinitionNotFound,
ForkConditionMissing,
@ -25,8 +21,9 @@ import {
RootStreamImmutabilityException,
SecurityException,
} from '../../../lib/streams/errors';
import { MalformedStreamId } from '../../../lib/streams/errors/malformed_stream_id';
import { createServerRoute } from '../../create_server_route';
import { getDataStreamLifecycle } from '../../../lib/streams/stream_crud';
import { readStream } from './read_stream';
export const readStreamRoute = createServerRoute({
endpoint: 'GET /api/streams/{id}',
@ -43,48 +40,18 @@ export const readStreamRoute = createServerRoute({
params: z.object({
path: z.object({ id: z.string() }),
}),
handler: async ({ params, request, getScopedClients }): Promise<ReadStreamDefinition> => {
handler: async ({ params, request, getScopedClients }): Promise<StreamGetResponse> => {
try {
const { assetClient, streamsClient } = await getScopedClients({
const { assetClient, streamsClient, scopedClusterClient } = await getScopedClients({
request,
});
const name = params.path.id;
const [streamDefinition, dashboards, ancestors, dataStream] = await Promise.all([
streamsClient.getStream(name),
assetClient.getAssetIds({
entityId: name,
entityType: 'stream',
assetType: 'dashboard',
}),
streamsClient.getAncestors(name),
streamsClient.getDataStream(name),
]);
const lifecycle = getDataStreamLifecycle(dataStream);
if (!isWiredStream(streamDefinition)) {
return {
...streamDefinition,
lifecycle,
dashboards,
inherited_fields: {},
};
}
const body: WiredReadStreamDefinition = {
...streamDefinition,
dashboards,
lifecycle,
inherited_fields: ancestors.reduce((acc, def) => {
Object.entries(def.stream.ingest.wired.fields).forEach(([key, fieldDef]) => {
acc[key] = { ...fieldDef, from: def.name };
});
return acc;
// TODO: replace this with a proper type
}, {} as Record<string, FieldDefinitionConfig & { from: string }>),
};
const body = await readStream({
name: params.path.id,
assetClient,
scopedClusterClient,
streamsClient,
});
return body;
} catch (e) {
@ -174,7 +141,7 @@ export const listStreamsRoute = createServerRoute({
},
},
params: z.object({}),
handler: async ({ request, getScopedClients }): Promise<ListStreamsResponse> => {
handler: async ({ request, getScopedClients }): Promise<{ streams: StreamDefinition[] }> => {
try {
const { streamsClient } = await getScopedClients({ request });
return {
@ -206,14 +173,16 @@ export const editStreamRoute = createServerRoute({
path: z.object({
id: z.string(),
}),
body: streamConfigDefinitionSchema,
body: streamUpsertRequestSchema,
}),
handler: async ({ params, request, getScopedClients }) => {
try {
const { streamsClient } = await getScopedClients({ request });
const streamDefinition = { stream: params.body, name: params.path.id };
return await streamsClient.upsertStream({ definition: streamDefinition });
return await streamsClient.upsertStream({
request: params.body,
name: params.path.id,
});
} catch (e) {
if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) {
throw notFound(e);

View file

@ -0,0 +1,140 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { badRequest, internal, notFound } from '@hapi/boom';
import { isResponseError } from '@kbn/es-errors';
import {
IngestGetResponse,
StreamUpsertRequest,
ingestUpsertRequestSchema,
isUnwiredStreamDefinition,
isWiredStreamDefinition,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import {
DefinitionNotFound,
ForkConditionMissing,
IndexTemplateNotFound,
RootStreamImmutabilityException,
SecurityException,
} from '../../../lib/streams/errors';
import { MalformedStreamId } from '../../../lib/streams/errors/malformed_stream_id';
import { createServerRoute } from '../../create_server_route';
const readIngestRoute = createServerRoute({
endpoint: 'GET /api/streams/{id}/_ingest',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
}),
handler: async ({ params, request, getScopedClients }): Promise<IngestGetResponse> => {
try {
const { streamsClient } = await getScopedClients({
request,
});
const name = params.path.id;
const definition = await streamsClient.getStream(name);
if (isWiredStreamDefinition(definition)) {
return { ingest: definition.ingest };
}
if (isUnwiredStreamDefinition(definition)) {
return { ingest: definition.ingest };
}
throw badRequest(`Stream is not an ingest stream`);
} catch (e) {
if (e instanceof DefinitionNotFound || (isResponseError(e) && e.statusCode === 404)) {
throw notFound(e);
}
throw internal(e);
}
},
});
const upsertIngestRoute = createServerRoute({
endpoint: 'PUT /api/streams/{id}/_ingest',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({
id: z.string(),
}),
body: ingestUpsertRequestSchema,
}),
handler: async ({ params, request, getScopedClients }) => {
try {
const { streamsClient, assetClient } = await getScopedClients({
request,
});
const name = params.path.id;
const assets = await assetClient.getAssets({
entityId: name,
entityType: 'stream',
});
const ingestUpsertRequest = params.body;
const dashboards = assets
.filter((asset) => asset.assetType === 'dashboard')
.map((asset) => asset.assetId);
const upsertRequest = {
dashboards,
stream: ingestUpsertRequest,
} as StreamUpsertRequest;
return await streamsClient.upsertStream({
request: upsertRequest,
name: params.path.id,
});
} catch (e) {
if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) {
throw notFound(e);
}
if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId ||
e instanceof RootStreamImmutabilityException
) {
throw badRequest(e);
}
throw internal(e);
}
},
});
export const ingestRoutes = {
...readIngestRoute,
...upsertIngestRoute,
};

View file

@ -7,7 +7,7 @@
import { z } from '@kbn/zod';
import { badRequest, internal, notFound } from '@hapi/boom';
import { conditionSchema, isCompleteCondition } from '@kbn/streams-schema';
import { conditionSchema } from '@kbn/streams-schema';
import { errors } from '@elastic/elasticsearch';
import {
DefinitionNotFound,
@ -19,7 +19,6 @@ import {
import { createServerRoute } from '../../create_server_route';
import { checkAccess } from '../../../lib/streams/stream_crud';
import { MalformedStreamId } from '../../../lib/streams/errors/malformed_stream_id';
import { validateCondition } from '../../../lib/streams/helpers/condition_fields';
import { conditionToQueryDsl } from '../../../lib/streams/helpers/condition_to_query_dsl';
import { getFields } from '../../../lib/streams/helpers/condition_fields';
import { ResyncStreamsResponse } from '../../../lib/streams/client';
@ -40,23 +39,21 @@ export const forkStreamsRoute = createServerRoute({
path: z.object({
id: z.string(),
}),
body: z.object({ stream: z.object({ name: z.string() }), condition: conditionSchema }),
body: z.object({ stream: z.object({ name: z.string() }), if: conditionSchema }),
}),
handler: async ({ params, request, getScopedClients }): Promise<{ acknowledged: true }> => {
try {
if (!params.body.condition) {
if (!params.body.if) {
throw new ForkConditionMissing('You must provide a condition to fork a stream');
}
validateCondition(params.body.condition);
const { streamsClient } = await getScopedClients({
request,
});
return await streamsClient.forkStream({
parent: params.path.id,
condition: params.body.condition,
if: params.body.if,
name: params.body.stream.name,
});
} catch (e) {
@ -132,7 +129,7 @@ export const sampleStreamRoute = createServerRoute({
params: z.object({
path: z.object({ id: z.string() }),
body: z.object({
condition: z.optional(conditionSchema),
if: z.optional(conditionSchema),
start: z.optional(z.number()),
end: z.optional(z.number()),
size: z.optional(z.number()),
@ -147,14 +144,12 @@ export const sampleStreamRoute = createServerRoute({
throw new DefinitionNotFound(`Stream definition for ${params.path.id} not found.`);
}
const { condition, start, end, size } = params.body;
const { if: condition, start, end, size } = params.body;
const searchBody = {
query: {
bool: {
must: [
Boolean(condition && isCompleteCondition(condition))
? conditionToQueryDsl(condition)
: { match_all: {} },
condition ? conditionToQueryDsl(condition) : { match_all: {} },
{
range: {
'@timestamp': {

View file

@ -7,30 +7,30 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { z } from '@kbn/zod';
import { notFound, internal, badRequest } from '@hapi/boom';
import { badRequest, internal, notFound } from '@hapi/boom';
import { IScopedClusterClient } from '@kbn/core/server';
import { calculateObjectDiff, flattenObject } from '@kbn/object-utils';
import {
FieldDefinitionConfig,
fieldDefinitionConfigSchema,
processingDefinitionSchema,
namedFieldDefinitionConfigSchema,
processorDefinitionSchema,
} from '@kbn/streams-schema';
import { calculateObjectDiff, flattenObject } from '@kbn/object-utils';
import { z } from '@kbn/zod';
import { isEmpty } from 'lodash';
import { IScopedClusterClient } from '@kbn/core/server';
import { DefinitionNotFound } from '../../../lib/streams/errors';
import { DetectedMappingFailure } from '../../../lib/streams/errors/detected_mapping_failure';
import { NonAdditiveProcessor } from '../../../lib/streams/errors/non_additive_processor';
import { SimulationFailed } from '../../../lib/streams/errors/simulation_failed';
import { formatToIngestProcessors } from '../../../lib/streams/helpers/processing';
import { createServerRoute } from '../../create_server_route';
import { DefinitionNotFound } from '../../../lib/streams/errors';
import { checkAccess } from '../../../lib/streams/stream_crud';
import { createServerRoute } from '../../create_server_route';
const paramsSchema = z.object({
path: z.object({ id: z.string() }),
body: z.object({
processing: z.array(processingDefinitionSchema),
processing: z.array(processorDefinitionSchema),
documents: z.array(z.record(z.unknown())),
detected_fields: z.array(fieldDefinitionConfigSchema.extend({ name: z.string() })).optional(),
detected_fields: z.array(namedFieldDefinitionConfigSchema).optional(),
}),
});

View file

@ -7,7 +7,7 @@
import { z } from '@kbn/zod';
import { internal, notFound } from '@hapi/boom';
import { getFlattenedObject } from '@kbn/std';
import { fieldDefinitionConfigSchema, isWiredStream } from '@kbn/streams-schema';
import { fieldDefinitionConfigSchema, isWiredStreamDefinition } from '@kbn/streams-schema';
import { DefinitionNotFound } from '../../../lib/streams/errors';
import { checkAccess } from '../../../lib/streams/stream_crud';
import { createServerRoute } from '../../create_server_route';
@ -64,14 +64,12 @@ export const unmappedFieldsRoute = createServerRoute({
// Mapped fields from the stream's definition and inherited from ancestors
const mappedFields = new Set<string>();
if (isWiredStream(streamDefinition)) {
Object.keys(streamDefinition.stream.ingest.wired.fields).forEach((name) =>
mappedFields.add(name)
);
if (isWiredStreamDefinition(streamDefinition)) {
Object.keys(streamDefinition.ingest.wired.fields).forEach((name) => mappedFields.add(name));
}
for (const ancestor of ancestors) {
Object.keys(ancestor.stream.ingest.wired.fields).forEach((name) => mappedFields.add(name));
Object.keys(ancestor.ingest.wired.fields).forEach((name) => mappedFields.add(name));
}
const unmappedFields = Array.from(sourceFields)
@ -106,7 +104,9 @@ export const schemaFieldsSimulationRoute = createServerRoute({
params: z.object({
path: z.object({ id: z.string() }),
body: z.object({
field_definitions: z.array(fieldDefinitionConfigSchema.extend({ name: z.string() })),
field_definitions: z.array(
z.intersection(fieldDefinitionConfigSchema, z.object({ name: z.string() }))
),
}),
}),
handler: async ({

View file

@ -28,6 +28,7 @@ import React, { useEffect } from 'react';
import { i18n } from '@kbn/i18n';
import { css } from '@emotion/css';
import { CodeEditor } from '@kbn/code-editor';
import { EMPTY_EQUALS_CONDITION } from '../../util/condition';
export function ConditionEditor(props: {
condition: Condition;
@ -84,7 +85,7 @@ export function ConditionForm(props: {
>
<EuiButton
size={'xs' as 's'} // TODO: remove this cast when EUI is updated - EuiButton takes xs, but the type is wrong
onClick={() => props.onConditionChange(undefined)}
onClick={() => props.onConditionChange({ never: {} })}
disabled={props.condition === undefined}
>
{i18n.translate('xpack.streams.conditionEditor.disable', {
@ -119,9 +120,7 @@ export function ConditionForm(props: {
/>
) : !props.condition || 'operator' in props.condition ? (
<FilterForm
condition={
(props.condition as FilterCondition) || { field: '', operator: 'eq', value: '' }
}
condition={(props.condition as FilterCondition) || EMPTY_EQUALS_CONDITION}
onConditionChange={props.onConditionChange}
/>
) : (

View file

@ -9,7 +9,11 @@ import { i18n } from '@kbn/i18n';
import React from 'react';
import { css } from '@emotion/css';
import { ILM_LOCATOR_ID, IlmLocatorParams } from '@kbn/index-lifecycle-management-common-shared';
import { ReadStreamDefinition, StreamLifecycle, isIngestStream } from '@kbn/streams-schema';
import {
IngestStreamLifecycle,
ReadStreamDefinition,
isUnwiredStreamDefinition,
} from '@kbn/streams-schema';
import { useStreamsAppBreadcrumbs } from '../../hooks/use_streams_app_breadcrumbs';
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { EntityOverviewTabList } from '../entity_overview_tab_list';
@ -103,7 +107,7 @@ export function EntityDetailViewWithoutParams({
title={
<EuiFlexGroup gutterSize="s" alignItems="center">
{entity.displayName}
{definition && isIngestStream(definition) ? (
{definition && isUnwiredStreamDefinition(definition.stream) ? (
<>
{' '}
<EuiBadge>
@ -137,7 +141,7 @@ export function EntityDetailViewWithoutParams({
);
}
function LifecycleBadge({ lifecycle }: { lifecycle: StreamLifecycle }) {
function LifecycleBadge({ lifecycle }: { lifecycle: IngestStreamLifecycle }) {
const {
dependencies: {
start: { share },

View file

@ -6,7 +6,7 @@
*/
import { EuiButton, EuiFlexGroup, EuiFlexItem, EuiSearchBar } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { StreamDefinition } from '@kbn/streams-schema';
import { ReadStreamDefinition } from '@kbn/streams-schema';
import React, { useMemo, useState } from 'react';
import type { SanitizedDashboardAsset } from '@kbn/streams-plugin/server/routes/dashboards/route';
import { AddDashboardFlyout } from './add_dashboard_flyout';
@ -14,7 +14,7 @@ import { DashboardsTable } from './dashboard_table';
import { useDashboardsApi } from '../../hooks/use_dashboards_api';
import { useDashboardsFetch } from '../../hooks/use_dashboards_fetch';
export function StreamDetailDashboardsView({ definition }: { definition?: StreamDefinition }) {
export function StreamDetailDashboardsView({ definition }: { definition?: ReadStreamDefinition }) {
const [query, setQuery] = useState('');
const [isAddDashboardFlyoutOpen, setIsAddDashboardFlyoutOpen] = useState(false);

View file

@ -9,12 +9,17 @@ import React, { useMemo } from 'react';
import { FormProvider, SubmitHandler, useForm } from 'react-hook-form';
import { EuiCallOut, EuiForm, EuiButton, EuiSpacer, EuiHorizontalRule } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { ProcessingDefinition, ReadStreamDefinition, getProcessorType } from '@kbn/streams-schema';
import { ReadStreamDefinition } from '@kbn/streams-schema';
import { isEqual } from 'lodash';
import { dynamic } from '@kbn/shared-ux-utility';
import { ProcessorTypeSelector } from './processor_type_selector';
import { ProcessorFlyoutTemplate } from './processor_flyout_template';
import { DetectedField, ProcessorDefinition, ProcessorFormState } from '../types';
import {
DetectedField,
EnrichmentUIProcessorDefinition,
ProcessingDefinition,
ProcessorFormState,
} from '../types';
import { DangerZone } from './danger_zone';
import { DissectProcessorForm } from './dissect';
import { GrokProcessorForm } from './grok';
@ -38,9 +43,9 @@ export interface AddProcessorFlyoutProps extends ProcessorFlyoutProps {
onAddProcessor: (newProcessing: ProcessingDefinition, newFields?: DetectedField[]) => void;
}
export interface EditProcessorFlyoutProps extends ProcessorFlyoutProps {
processor: ProcessorDefinition;
processor: EnrichmentUIProcessorDefinition;
onDeleteProcessor: (id: string) => void;
onUpdateProcessor: (id: string, processor: ProcessorDefinition) => void;
onUpdateProcessor: (id: string, processor: EnrichmentUIProcessorDefinition) => void;
}
export function AddProcessorFlyout({
@ -125,9 +130,11 @@ export function EditProcessorFlyout({
onUpdateProcessor,
processor,
}: EditProcessorFlyoutProps) {
const processorType = 'grok' in processor.config ? 'grok' : 'dissect';
const defaultValues = useMemo(
() => getDefaultFormState(getProcessorType(processor), processor),
[processor]
() => getDefaultFormState(processorType, processor),
[processor, processorType]
);
const methods = useForm<ProcessorFormState>({ defaultValues, mode: 'onChange' });

View file

@ -11,32 +11,40 @@ import { useAbortController } from '@kbn/observability-utils-browser/hooks/use_a
import { useBoolean } from '@kbn/react-hooks';
import {
ReadStreamDefinition,
ProcessingDefinition,
isWiredReadStream,
FieldDefinition,
WiredReadStreamDefinition,
ProcessorDefinition,
getProcessorConfig,
IngestUpsertRequest,
} from '@kbn/streams-schema';
import { htmlIdGenerator } from '@elastic/eui';
import { isEqual } from 'lodash';
import { DetectedField, ProcessorDefinition } from '../types';
import { isEqual, omit } from 'lodash';
import { DetectedField, EnrichmentUIProcessorDefinition, ProcessingDefinition } from '../types';
import { useKibana } from '../../../hooks/use_kibana';
import { alwaysToEmptyEquals, emptyEqualsToAlways } from '../../../util/condition';
export const useDefinition = (definition: ReadStreamDefinition, refreshDefinition: () => void) => {
const { core, dependencies } = useKibana();
const { toasts } = core.notifications;
const { processing } = definition.stream.ingest;
const { processing: existingProcessorDefinitions } = definition.stream.ingest;
const { streamsRepositoryClient } = dependencies.start.streams;
const abortController = useAbortController();
const [isSavingChanges, { on: startsSaving, off: endsSaving }] = useBoolean();
const [processors, setProcessors] = useState(() => createProcessorsList(processing));
const [processors, setProcessors] = useState(() =>
createProcessorsList(existingProcessorDefinitions)
);
const [fields, setFields] = useState(() =>
isWiredReadStream(definition) ? definition.stream.ingest.wired.fields : {}
);
const httpProcessing = useMemo(() => processors.map(removeIdFromProcessor), [processors]);
const nextProcessorDefinitions = useMemo(
() => processors.map(convertUiDefinitionIntoApiDefinition),
[processors]
);
useEffect(() => {
// Reset processors when definition refreshes
@ -44,19 +52,19 @@ export const useDefinition = (definition: ReadStreamDefinition, refreshDefinitio
}, [definition]);
const hasChanges = useMemo(
() => !isEqual(processing, httpProcessing),
[processing, httpProcessing]
() => !isEqual(existingProcessorDefinitions, nextProcessorDefinitions),
[existingProcessorDefinitions, nextProcessorDefinitions]
);
const addProcessor = (newProcessing: ProcessingDefinition, newFields?: DetectedField[]) => {
setProcessors((prevProcs) => prevProcs.concat(createProcessorWithId(newProcessing)));
setProcessors((prevProcs) => prevProcs.concat({ ...newProcessing, id: createId() }));
if (isWiredReadStream(definition) && newFields) {
setFields((currentFields) => mergeFields(definition, currentFields, newFields));
}
};
const updateProcessor = (id: string, processorUpdate: ProcessorDefinition) => {
const updateProcessor = (id: string, processorUpdate: EnrichmentUIProcessorDefinition) => {
setProcessors((prevProcs) =>
prevProcs.map((proc) => (proc.id === id ? processorUpdate : proc))
);
@ -67,14 +75,14 @@ export const useDefinition = (definition: ReadStreamDefinition, refreshDefinitio
};
const resetChanges = () => {
setProcessors(createProcessorsList(processing));
setProcessors(createProcessorsList(existingProcessorDefinitions));
setFields(isWiredReadStream(definition) ? definition.stream.ingest.wired.fields : {});
};
const saveChanges = async () => {
startsSaving();
try {
await streamsRepositoryClient.fetch(`PUT /api/streams/{id}`, {
await streamsRepositoryClient.fetch(`PUT /api/streams/{id}/_ingest`, {
signal: abortController.signal,
params: {
path: {
@ -83,10 +91,10 @@ export const useDefinition = (definition: ReadStreamDefinition, refreshDefinitio
body: {
ingest: {
...definition.stream.ingest,
processing: httpProcessing,
processing: nextProcessorDefinitions,
...(isWiredReadStream(definition) && { wired: { fields } }),
},
},
} as IngestUpsertRequest,
},
});
@ -127,17 +135,41 @@ export const useDefinition = (definition: ReadStreamDefinition, refreshDefinitio
};
const createId = htmlIdGenerator();
const createProcessorsList = (processors: ProcessingDefinition[]): ProcessorDefinition[] =>
processors.map(createProcessorWithId);
const createProcessorsList = (
processors: ProcessorDefinition[]
): EnrichmentUIProcessorDefinition[] => processors.map(createProcessorWithId);
const createProcessorWithId = (processor: ProcessingDefinition): ProcessorDefinition => ({
...processor,
const createProcessorWithId = (
processor: ProcessorDefinition
): EnrichmentUIProcessorDefinition => ({
condition: alwaysToEmptyEquals(getProcessorConfig(processor).if),
config: {
...('grok' in processor
? { grok: omit(processor.grok, 'if') }
: { dissect: omit(processor.dissect, 'if') }),
},
id: createId(),
});
const removeIdFromProcessor = (processor: ProcessorDefinition): ProcessingDefinition => {
const { id, ...rest } = processor;
return rest;
const convertUiDefinitionIntoApiDefinition = (
processor: EnrichmentUIProcessorDefinition
): ProcessorDefinition => {
const { id: _id, config, condition } = processor;
if ('grok' in config) {
return {
grok: {
...config.grok,
if: emptyEqualsToAlways(condition),
},
};
}
return {
dissect: {
...config.dissect,
if: emptyEqualsToAlways(condition),
},
};
};
const mergeFields = (

View file

@ -8,14 +8,22 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { useAbortController } from '@kbn/observability-utils-browser/hooks/use_abort_controller';
import { ReadStreamDefinition, ProcessingDefinition, Condition } from '@kbn/streams-schema';
import {
DissectProcessorDefinition,
ReadStreamDefinition,
Condition,
ProcessorDefinition,
GrokProcessorDefinition,
} from '@kbn/streams-schema';
import useAsyncFn from 'react-use/lib/useAsyncFn';
import { IHttpFetchError, ResponseErrorBody } from '@kbn/core/public';
import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range';
import { APIReturnType, StreamsAPIClientRequestParamsOf } from '@kbn/streams-plugin/public/api';
import { useStreamsAppFetch } from '../../../hooks/use_streams_app_fetch';
import { useKibana } from '../../../hooks/use_kibana';
import { ProcessingDefinition } from '../types';
import { DetectedField } from '../types';
import { emptyEqualsToAlways } from '../../../util/condition';
type Simulation = APIReturnType<'POST /api/streams/{id}/processing/_simulate'>;
type SimulationRequestBody =
@ -69,7 +77,7 @@ export const useProcessingSimulator = ({
params: {
path: { id: definition.name },
body: {
condition,
if: condition ? emptyEqualsToAlways(condition) : { always: {} },
start: start?.valueOf(),
end: end?.valueOf(),
size: 100,
@ -89,6 +97,29 @@ export const useProcessingSimulator = ({
return Promise.resolve(null);
}
const processorDefinition: ProcessorDefinition =
'grok' in processingDefinition.config
? ({
grok: {
field: processingDefinition.config.grok.field,
ignore_failure: processingDefinition.config.grok.ignore_failure,
ignore_missing: processingDefinition.config.grok.ignore_missing,
if: emptyEqualsToAlways(processingDefinition.condition),
patterns: processingDefinition.config.grok.patterns,
pattern_definitions: processingDefinition.config.grok.pattern_definitions,
},
} satisfies GrokProcessorDefinition)
: ({
dissect: {
field: processingDefinition.config.dissect.field,
ignore_failure: processingDefinition.config.dissect.ignore_failure,
ignore_missing: processingDefinition.config.dissect.ignore_missing,
if: emptyEqualsToAlways(processingDefinition.condition),
pattern: processingDefinition.config.dissect.pattern,
append_separator: processingDefinition.config.dissect.append_separator,
},
} satisfies DissectProcessorDefinition);
const detected_fields = detectedFields
? (detectedFields.filter(
(field) => field.type !== 'unmapped'
@ -101,7 +132,7 @@ export const useProcessingSimulator = ({
path: { id: definition.name },
body: {
documents: sampleDocs,
processing: [processingDefinition],
processing: [processorDefinition],
detected_fields,
},
},

View file

@ -15,7 +15,7 @@ import {
euiDragDropReorder,
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { ReadStreamDefinition, isRootStream } from '@kbn/streams-schema';
import { ReadStreamDefinition, isRootStreamDefinition } from '@kbn/streams-schema';
import { useBoolean } from '@kbn/react-hooks';
import { useUnsavedChangesPrompt } from '@kbn/unsaved-changes-prompt';
import { EnrichmentEmptyPrompt } from './enrichment_empty_prompt';
@ -103,7 +103,7 @@ export function StreamDetailEnrichmentContent({
const hasProcessors = processors.length > 0;
if (isRootStream(definition)) {
if (isRootStreamDefinition(definition.stream)) {
return <RootStreamEmptyPrompt />;
}

View file

@ -17,16 +17,11 @@ import {
EuiButtonIcon,
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import {
ReadStreamDefinition,
getProcessorType,
isDissectProcessor,
isGrokProcessor,
} from '@kbn/streams-schema';
import { ReadStreamDefinition } from '@kbn/streams-schema';
import { useBoolean } from '@kbn/react-hooks';
import { css } from '@emotion/react';
import { EditProcessorFlyout, EditProcessorFlyoutProps } from './flyout';
import { ProcessorDefinition } from './types';
import { EnrichmentUIProcessorDefinition, isDissectProcessor, isGrokProcessor } from './types';
export const DraggableProcessorListItem = ({
processor,
@ -51,7 +46,7 @@ export const DraggableProcessorListItem = ({
interface ProcessorListItemProps {
definition: ReadStreamDefinition;
processor: ProcessorDefinition;
processor: EnrichmentUIProcessorDefinition;
hasShadow: EuiPanelProps['hasShadow'];
onUpdateProcessor: EditProcessorFlyoutProps['onUpdateProcessor'];
onDeleteProcessor: EditProcessorFlyoutProps['onDeleteProcessor'];
@ -66,7 +61,7 @@ const ProcessorListItem = ({
}: ProcessorListItemProps) => {
const [isEditProcessorOpen, { on: openEditProcessor, off: closeEditProcessor }] = useBoolean();
const type = getProcessorType(processor);
const type = 'grok' in processor.config ? 'grok' : 'dissect';
const description = getProcessorDescription(processor);
return (
@ -111,7 +106,7 @@ const ProcessorListItem = ({
);
};
const getProcessorDescription = (processor: ProcessorDefinition) => {
const getProcessorDescription = (processor: EnrichmentUIProcessorDefinition) => {
if (isGrokProcessor(processor.config)) {
return processor.config.grok.patterns.join(' • ');
} else if (isDissectProcessor(processor.config)) {

View file

@ -6,13 +6,26 @@
*/
import {
DissectProcessingDefinition,
Condition,
DissectProcessorConfig,
FieldDefinitionConfig,
GrokProcessingDefinition,
ProcessingDefinition,
GrokProcessorConfig,
} from '@kbn/streams-schema';
export interface ProcessorDefinition extends ProcessingDefinition {
export interface DissectProcessingDefinition {
dissect: Omit<DissectProcessorConfig, 'if'>;
}
export interface GrokProcessingDefinition {
grok: Omit<GrokProcessorConfig, 'if'>;
}
export interface ProcessingDefinition {
condition: Condition;
config: DissectProcessingDefinition | GrokProcessingDefinition;
}
export interface EnrichmentUIProcessorDefinition extends ProcessingDefinition {
id: string;
}
@ -43,3 +56,15 @@ export interface DetectedField {
name: string;
type: FieldDefinitionConfig['type'] | 'unmapped';
}
export function isGrokProcessor(
config: ProcessingDefinition['config']
): config is GrokProcessingDefinition {
return 'grok' in config;
}
export function isDissectProcessor(
config: ProcessingDefinition['config']
): config is DissectProcessingDefinition {
return 'dissect' in config;
}

View file

@ -7,23 +7,21 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { ProcessorType, conditionSchema, createIsNarrowSchema } from '@kbn/streams-schema';
import { isEmpty } from 'lodash';
import { z } from '@kbn/zod';
import {
DissectFormState,
DissectProcessingDefinition,
EnrichmentUIProcessorDefinition,
GrokFormState,
GrokProcessingDefinition,
ProcessingDefinition,
ProcessorType,
isCompleteCondition,
ProcessorFormState,
isDissectProcessor,
isGrokProcessor,
} from '@kbn/streams-schema';
import { isEmpty } from 'lodash';
import { DissectFormState, GrokFormState, ProcessorDefinition, ProcessorFormState } from './types';
const defaultCondition: ProcessingDefinition['condition'] = {
field: '',
operator: 'eq',
value: '',
};
} from './types';
import { EMPTY_EQUALS_CONDITION } from '../../util/condition';
const defaultGrokProcessorFormState: GrokFormState = {
type: 'grok',
@ -32,7 +30,7 @@ const defaultGrokProcessorFormState: GrokFormState = {
pattern_definitions: {},
ignore_failure: true,
ignore_missing: true,
condition: defaultCondition,
condition: EMPTY_EQUALS_CONDITION,
};
const defaultDissectProcessorFormState: DissectFormState = {
@ -41,7 +39,7 @@ const defaultDissectProcessorFormState: DissectFormState = {
pattern: '',
ignore_failure: true,
ignore_missing: true,
condition: defaultCondition,
condition: EMPTY_EQUALS_CONDITION,
};
const defaultProcessorFormStateByType: Record<ProcessorType, ProcessorFormState> = {
@ -51,35 +49,28 @@ const defaultProcessorFormStateByType: Record<ProcessorType, ProcessorFormState>
export const getDefaultFormState = (
type: ProcessorType,
processor?: ProcessorDefinition
processor?: EnrichmentUIProcessorDefinition
): ProcessorFormState => {
if (!processor) return defaultProcessorFormStateByType[type];
let configValues: ProcessorFormState = defaultProcessorFormStateByType[type];
if (isGrokProcessor(processor.config)) {
const { grok } = processor.config;
configValues = structuredClone({
return structuredClone({
...grok,
condition: processor.condition,
type: 'grok',
patterns: grok.patterns.map((pattern) => ({ value: pattern })),
});
}
if (isDissectProcessor(processor.config)) {
const { dissect } = processor.config;
const { dissect } = processor.config;
configValues = structuredClone({
...dissect,
type: 'dissect',
});
}
return {
condition: processor.condition || defaultCondition,
...configValues,
};
return structuredClone({
...dissect,
condition: processor.condition,
type: 'dissect',
});
};
export const convertFormStateToProcessing = (
@ -90,7 +81,7 @@ export const convertFormStateToProcessing = (
formState;
return {
condition: isCompleteCondition(condition) ? condition : undefined,
condition,
config: {
grok: {
patterns: patterns
@ -110,7 +101,7 @@ export const convertFormStateToProcessing = (
formState;
return {
condition: isCompleteCondition(condition) ? condition : undefined,
condition,
config: {
dissect: {
field,
@ -126,6 +117,8 @@ export const convertFormStateToProcessing = (
throw new Error('Cannot convert form state to processing: unknown type.');
};
export const isCompleteCondition = createIsNarrowSchema(z.unknown(), conditionSchema);
export const isCompleteGrokDefinition = (processing: GrokProcessingDefinition) => {
const { patterns } = processing.grok;

View file

@ -19,7 +19,11 @@ import { i18n } from '@kbn/i18n';
import moment from 'moment';
import React, { useMemo } from 'react';
import { css } from '@emotion/css';
import { ReadStreamDefinition, isWiredReadStream, isWiredStream } from '@kbn/streams-schema';
import {
ReadStreamDefinition,
isWiredReadStream,
isWiredStreamDefinition,
} from '@kbn/streams-schema';
import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range';
import type { SanitizedDashboardAsset } from '@kbn/streams-plugin/server/routes/dashboards/route';
import { useKibana } from '../../hooks/use_kibana';
@ -58,7 +62,7 @@ export function StreamDetailOverview({ definition }: { definition?: ReadStreamDe
} = useDateRange({ data });
const indexPatterns = useMemo(() => {
return getIndexPatterns(definition);
return getIndexPatterns(definition?.stream);
}, [definition]);
const discoverLocator = useMemo(
@ -303,7 +307,7 @@ function ChildStreamList({ stream }: { stream?: ReadStreamDefinition }) {
return [];
}
return streamsListFetch.value?.streams.filter(
(d) => isWiredStream(d) && d.name.startsWith(stream.name as string)
(d) => isWiredStreamDefinition(d) && d.name.startsWith(stream.name as string)
);
}, [stream, streamsListFetch.value?.streams]);

View file

@ -36,17 +36,18 @@ import { useAbortController } from '@kbn/observability-utils-browser/hooks/use_a
import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range';
import React, { useCallback, useEffect } from 'react';
import {
StreamChild,
ReadStreamDefinition,
WiredStreamConfigDefinition,
isRoot,
isDescendantOf,
RoutingDefinition,
IngestUpsertRequest,
} from '@kbn/streams-schema';
import { useUnsavedChangesPrompt } from '@kbn/unsaved-changes-prompt';
import { AbortableAsyncState } from '@kbn/observability-utils-browser/hooks/use_abortable_async';
import { DraggableProvided } from '@hello-pangea/dnd';
import { IToasts, Toast } from '@kbn/core/public';
import { toMountPoint } from '@kbn/react-kibana-mount';
import { cloneDeep } from 'lodash';
import { useKibana } from '../../hooks/use_kibana';
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';
import { StreamsAppSearchBar } from '../streams_app_search_bar';
@ -57,10 +58,15 @@ import { NestedView } from '../nested_view';
import { PreviewTable } from '../preview_table';
import { StreamDeleteModal } from '../stream_delete_modal';
import { AssetImage } from '../asset_image';
import {
EMPTY_EQUALS_CONDITION,
alwaysToEmptyEquals,
emptyEqualsToAlways,
} from '../../util/condition';
interface ChildUnderEdit {
isNew: boolean;
child: StreamChild;
child: RoutingDefinition;
}
function useRoutingState({
@ -202,7 +208,7 @@ export function StreamDetailRouting({
closeModal={closeModal}
clearChildUnderEdit={() => routingAppState.selectChildUnderEdit(undefined)}
refreshDefinition={refreshDefinition}
id={routingAppState.childUnderEdit.child.name}
id={routingAppState.childUnderEdit.child.destination}
availableStreams={availableStreams}
/>
)}
@ -314,6 +320,7 @@ function ControlBar({
if (!routingAppState.childUnderEdit) {
return;
}
return streamsRepositoryClient.fetch('POST /api/streams/{id}/_fork', {
signal,
params: {
@ -321,9 +328,9 @@ function ControlBar({
id: definition.name,
},
body: {
condition: routingAppState.childUnderEdit.child.condition,
if: emptyEqualsToAlways(routingAppState.childUnderEdit.child.if),
stream: {
name: routingAppState.childUnderEdit.child.name,
name: routingAppState.childUnderEdit.child.destination,
},
},
},
@ -338,21 +345,25 @@ function ControlBar({
const childUnderEdit = routingAppState.childUnderEdit?.child;
const { name, stream } = definition;
return streamsRepositoryClient.fetch('PUT /api/streams/{id}', {
const routing = routingAppState.childStreams.map((child) =>
child.destination === childUnderEdit?.destination ? childUnderEdit : child
);
const request = {
ingest: {
...stream.ingest,
routing,
},
} as IngestUpsertRequest;
return streamsRepositoryClient.fetch('PUT /api/streams/{id}/_ingest', {
signal,
params: {
path: {
id: name,
},
body: {
...stream,
ingest: {
...stream.ingest,
routing: routingAppState.childStreams.map((child) =>
child.name === childUnderEdit?.name ? childUnderEdit : child
),
},
} as WiredStreamConfigDefinition,
body: request,
},
});
}
@ -387,7 +398,7 @@ function ControlBar({
target="_blank"
href={router.link('/{key}/{tab}/{subtab}', {
path: {
key: routingAppState.childUnderEdit?.child.name!,
key: routingAppState.childUnderEdit?.child.destination!,
tab: 'management',
subtab: 'route',
},
@ -490,13 +501,12 @@ function PreviewPanel({
const previewSampleFetch = useStreamsAppFetch(
({ signal }) => {
if (
!definition ||
!routingAppState.debouncedChildUnderEdit ||
!routingAppState.debouncedChildUnderEdit.isNew
) {
const { debouncedChildUnderEdit } = routingAppState;
if (!definition || !debouncedChildUnderEdit || !debouncedChildUnderEdit.isNew) {
return Promise.resolve({ documents: [] });
}
return streamsRepositoryClient.fetch('POST /api/streams/{id}/_sample', {
signal,
params: {
@ -504,7 +514,7 @@ function PreviewPanel({
id: definition.name,
},
body: {
condition: routingAppState.debouncedChildUnderEdit.child.condition,
if: emptyEqualsToAlways(debouncedChildUnderEdit.child.if),
start: start?.valueOf(),
end: end?.valueOf(),
size: 100,
@ -713,27 +723,34 @@ function ChildStreamList({
<EuiDroppable droppableId="routing_children_reordering" spacing="none">
<EuiFlexGroup direction="column" gutterSize="xs">
{childStreams.map((child, i) => (
<EuiFlexItem key={`${child.name}-${i}-flex-item`} grow={false}>
<EuiFlexItem key={`${child.destination}-${i}-flex-item`} grow={false}>
<EuiDraggable
key={child.name}
key={child.destination}
index={i}
draggableId={child.name}
draggableId={child.destination}
hasInteractiveChildren={true}
customDragHandle={true}
spacing="none"
>
{(provided) => (
<NestedView key={i} isBeingDragged={draggingChildStream === child.name}>
<NestedView
key={i}
isBeingDragged={draggingChildStream === child.destination}
>
<RoutingStreamEntry
draggableProvided={provided}
child={
!childUnderEdit?.isNew && child.name === childUnderEdit?.child.name
!childUnderEdit?.isNew &&
child.destination === childUnderEdit?.child.destination
? childUnderEdit.child
: child
}
edit={!childUnderEdit?.isNew && child.name === childUnderEdit?.child.name}
edit={
!childUnderEdit?.isNew &&
child.destination === childUnderEdit?.child.destination
}
onEditStateChange={() => {
if (child.name === childUnderEdit?.child.name) {
if (child.destination === childUnderEdit?.child.destination) {
selectChildUnderEdit(undefined);
} else {
selectChildUnderEdit({ isNew: false, child });
@ -781,12 +798,8 @@ function ChildStreamList({
selectChildUnderEdit({
isNew: true,
child: {
name: `${definition.name}.child`,
condition: {
field: '',
operator: 'eq',
value: '',
},
destination: `${definition.name}.child`,
if: cloneDeep(EMPTY_EQUALS_CONDITION),
},
});
}}
@ -849,13 +862,15 @@ function RoutingStreamEntry({
availableStreams,
}: {
draggableProvided: DraggableProvided;
child: StreamChild;
onChildChange: (child: StreamChild) => void;
child: RoutingDefinition;
onChildChange: (child: RoutingDefinition) => void;
onEditStateChange: () => void;
edit?: boolean;
availableStreams: string[];
}) {
const children = availableStreams.filter((stream) => isDescendantOf(child.name, stream)).length;
const children = availableStreams.filter((stream) =>
isDescendantOf(child.destination, stream)
).length;
const router = useStreamsAppRouter();
return (
<EuiPanel hasShadow={false} hasBorder paddingSize="s">
@ -878,11 +893,11 @@ function RoutingStreamEntry({
<EuiFlexGroup gutterSize="xs" alignItems="center">
<EuiLink
href={router.link('/{key}/{tab}/{subtab}', {
path: { key: child.name, tab: 'management', subtab: 'route' },
path: { key: child.destination, tab: 'management', subtab: 'route' },
})}
data-test-subj="streamsAppRoutingStreamEntryButton"
>
<EuiText size="s">{child.name}</EuiText>
<EuiText size="s">{child.destination}</EuiText>
</EuiLink>
{children > 0 && (
<EuiBadge color="hollow">
@ -908,11 +923,11 @@ function RoutingStreamEntry({
</EuiFlexGroup>
<ConditionEditor
readonly={!edit}
condition={child.condition}
condition={alwaysToEmptyEquals(child.if)}
onConditionChange={(condition) => {
onChildChange({
...child,
condition,
if: condition,
});
}}
/>
@ -924,8 +939,8 @@ function NewRoutingStreamEntry({
child,
onChildChange,
}: {
child: StreamChild;
onChildChange: (child?: StreamChild) => void;
child: RoutingDefinition;
onChildChange: (child?: RoutingDefinition) => void;
}) {
return (
<EuiPanel hasShadow={false} hasBorder paddingSize="s">
@ -938,24 +953,24 @@ function NewRoutingStreamEntry({
>
<EuiFieldText
data-test-subj="streamsAppRoutingStreamEntryNameField"
value={child.name}
value={child.destination}
fullWidth
compressed
onChange={(e) => {
onChildChange({
...child,
name: e.target.value,
destination: e.target.value,
});
}}
/>
</EuiFormRow>
<ConditionEditor
readonly={false}
condition={child.condition}
condition={child.if}
onConditionChange={(condition) => {
onChildChange({
...child,
condition,
if: condition,
});
}}
/>

View file

@ -22,7 +22,11 @@ import type {
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import useToggle from 'react-use/lib/useToggle';
import { isRootStream, isWiredReadStream, ReadStreamDefinition } from '@kbn/streams-schema';
import {
isRootStreamDefinition,
isWiredReadStream,
ReadStreamDefinition,
} from '@kbn/streams-schema';
import { FieldType } from './field_type';
import { FieldStatus } from './field_status';
import { FieldEntry, SchemaEditorEditingState } from './hooks/use_editing_state';
@ -155,7 +159,7 @@ const FieldsTable = ({ definition, fields, editingState, unpromotingState }: Fie
const [visibleColumns, setVisibleColumns] = useState(Object.keys(COLUMNS));
const trailingColumns = useMemo(() => {
return !isRootStream(definition)
return !isRootStreamDefinition(definition.stream)
? ([
{
id: 'actions',

View file

@ -8,12 +8,12 @@
import React from 'react';
import { EuiCallOut } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { StreamConfigDefinition } from '@kbn/streams-schema';
import { RoutingDefinition } from '@kbn/streams-schema';
export const ChildrenAffectedCallout = ({
childStreams,
}: {
childStreams: StreamConfigDefinition['ingest']['routing'];
childStreams: RoutingDefinition[];
}) => {
return (
<EuiCallOut
@ -25,7 +25,7 @@ export const ChildrenAffectedCallout = ({
{i18n.translate('xpack.streams.childStreamsWarning.text', {
defaultMessage: "Editing this field will affect it's dependant streams: {affectedStreams} ",
values: {
affectedStreams: childStreams.map((stream) => stream.name).join(', '),
affectedStreams: childStreams.map((stream) => stream.destination).join(', '),
},
})}
</EuiCallOut>

View file

@ -10,7 +10,7 @@ import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api';
import { css } from '@emotion/react';
import { i18n } from '@kbn/i18n';
import { EuiCallOut } from '@elastic/eui';
import { FieldDefinitionConfigWithName, ReadStreamDefinition } from '@kbn/streams-schema';
import { NamedFieldDefinitionConfig, ReadStreamDefinition } from '@kbn/streams-schema';
import { getFormattedError } from '../../../util/errors';
import { useStreamsAppFetch } from '../../../hooks/use_streams_app_fetch';
import { PreviewTable } from '../../preview_table';
@ -19,7 +19,7 @@ import { LoadingPanel } from '../../loading_panel';
interface SamplePreviewTableProps {
definition: ReadStreamDefinition;
nextFieldDefinition?: Partial<FieldDefinitionConfigWithName>;
nextFieldDefinition?: Partial<NamedFieldDefinitionConfig>;
streamsRepositoryClient: StreamsRepositoryClient;
}
@ -38,7 +38,7 @@ const SamplePreviewTableContent = ({
definition,
nextFieldDefinition,
streamsRepositoryClient,
}: SamplePreviewTableProps & { nextFieldDefinition: FieldDefinitionConfigWithName }) => {
}: SamplePreviewTableProps & { nextFieldDefinition: NamedFieldDefinitionConfig }) => {
const { value, loading, error } = useStreamsAppFetch(
({ signal }) => {
return streamsRepositoryClient.fetch('POST /api/streams/{id}/schema/fields_simulation', {

View file

@ -7,7 +7,7 @@
import {
ReadStreamDefinition,
FieldDefinitionConfigWithName,
NamedFieldDefinitionConfig,
isWiredReadStream,
} from '@kbn/streams-schema';
import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api';
@ -22,9 +22,9 @@ import { FieldStatus } from '../field_status';
export type SchemaEditorEditingState = ReturnType<typeof useEditingState>;
export interface FieldEntry {
name: FieldDefinitionConfigWithName['name'];
type?: FieldDefinitionConfigWithName['type'];
format?: FieldDefinitionConfigWithName['format'];
name: NamedFieldDefinitionConfig['name'];
type?: NamedFieldDefinitionConfig['type'];
format?: NamedFieldDefinitionConfig['format'];
parent: string;
status: FieldStatus;
}
@ -100,7 +100,7 @@ export const useEditingState = ({
? async () => {
toggleIsSaving(true);
try {
await streamsRepositoryClient.fetch(`PUT /api/streams/{id}`, {
await streamsRepositoryClient.fetch(`PUT /api/streams/{id}/_ingest`, {
signal: abortController.signal,
params: {
path: {
@ -175,14 +175,14 @@ export const useEditingState = ({
};
export const isFullFieldDefinition = (
value?: Partial<FieldDefinitionConfigWithName>
): value is FieldDefinitionConfigWithName => {
value?: Partial<NamedFieldDefinitionConfig>
): value is NamedFieldDefinitionConfig => {
return !!value && !!value.name && !!value.type;
};
const hasChanges = (
selectedField: Partial<FieldDefinitionConfigWithName>,
nextFieldEntry: Partial<FieldDefinitionConfigWithName>
selectedField: Partial<NamedFieldDefinitionConfig>,
nextFieldEntry: Partial<NamedFieldDefinitionConfig>
) => {
return (
selectedField.type !== nextFieldEntry.type || selectedField.format !== nextFieldEntry.format

View file

@ -43,7 +43,7 @@ export const useUnpromotingState = ({
}
toggleIsUnpromotingField(true);
try {
await streamsRepositoryClient.fetch(`PUT /api/streams/{id}`, {
await streamsRepositoryClient.fetch(`PUT /api/streams/{id}/_ingest`, {
signal: abortController.signal,
params: {
path: {

View file

@ -4,15 +4,16 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React from 'react';
import { i18n } from '@kbn/i18n';
import { EntityDetailViewWithoutParams, EntityViewTab } from '../entity_detail_view';
import { useStreamsAppParams } from '../../hooks/use_streams_app_params';
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';
import { isWiredStreamGetResponse } from '@kbn/streams-schema';
import React from 'react';
import { useKibana } from '../../hooks/use_kibana';
import { StreamDetailOverview } from '../stream_detail_overview';
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';
import { useStreamsAppParams } from '../../hooks/use_streams_app_params';
import { EntityDetailViewWithoutParams, EntityViewTab } from '../entity_detail_view';
import { StreamDetailDashboardsView } from '../stream_detail_dashboards_view';
import { StreamDetailManagement } from '../stream_detail_management';
import { StreamDetailOverview } from '../stream_detail_overview';
export function StreamDetailView() {
const params1 = useStreamsAppParams('/{key}/{tab}', true);
@ -36,14 +37,42 @@ export function StreamDetailView() {
loading,
} = useStreamsAppFetch(
({ signal }) => {
return streamsRepositoryClient.fetch('GET /api/streams/{id}', {
signal,
params: {
path: {
id: key,
return streamsRepositoryClient
.fetch('GET /api/streams/{id}', {
signal,
params: {
path: {
id: key,
},
},
},
});
})
.then((response) => {
if (isWiredStreamGetResponse(response)) {
return {
dashboards: response.dashboards,
inherited_fields: response.inherited_fields,
elasticsearch_assets: [],
lifecycle: response.lifecycle,
name: key,
stream: {
name: key,
...response.stream,
},
};
}
return {
dashboards: response.dashboards,
elasticsearch_assets: response.elasticsearch_assets,
inherited_fields: {},
lifecycle: response.lifecycle,
name: key,
stream: {
name: key,
...response.stream,
},
};
});
},
[streamsRepositoryClient, key]
);

View file

@ -20,7 +20,12 @@ import { i18n } from '@kbn/i18n';
import React, { useMemo } from 'react';
import { euiThemeVars } from '@kbn/ui-theme';
import { css } from '@emotion/css';
import { StreamDefinition, isDescendantOf, isWiredStream } from '@kbn/streams-schema';
import {
StreamDefinition,
isDescendantOf,
isUnwiredStreamDefinition,
isWiredStreamDefinition,
} from '@kbn/streams-schema';
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { NestedView } from '../nested_view';
import { useKibana } from '../../hooks/use_kibana';
@ -35,7 +40,7 @@ export interface StreamTree {
function asTrees(definitions: StreamDefinition[]) {
const trees: StreamTree[] = [];
const wiredDefinitions = definitions.filter((definition) => isWiredStream(definition));
const wiredDefinitions = definitions.filter((definition) => isWiredStreamDefinition(definition));
wiredDefinitions.sort((a, b) => a.name.split('.').length - b.name.split('.').length);
wiredDefinitions.forEach((definition) => {
@ -80,12 +85,12 @@ export function StreamsList({
const filteredItems = useMemo(() => {
return items
.filter((item) => showClassic || isWiredStream(item))
.filter((item) => showClassic || isWiredStreamDefinition(item))
.filter((item) => !query || item.name.toLowerCase().includes(query.toLowerCase()));
}, [query, items, showClassic]);
const classicStreams = useMemo(() => {
return filteredItems.filter((item) => !isWiredStream(item));
return filteredItems.filter((item) => isUnwiredStreamDefinition(item));
}, [filteredItems]);
const treeView = useMemo(() => {

View file

@ -55,6 +55,10 @@ export const useStreamsAppFetch: UseAbortableAsync<{}, { disableToastOnError?: b
},
}),
});
// log to console to get the actual stack trace
// eslint-disable-next-line no-console
console.log(error);
}
};

View file

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
isAlwaysCondition,
type AlwaysCondition,
type BinaryFilterCondition,
type Condition,
} from '@kbn/streams-schema';
import { cloneDeep, isEqual } from 'lodash';
export const EMPTY_EQUALS_CONDITION: BinaryFilterCondition = Object.freeze({
field: '',
operator: 'eq',
value: '',
});
export function alwaysToEmptyEquals<T extends Condition>(condition: T): Exclude<T, AlwaysCondition>;
export function alwaysToEmptyEquals(condition: Condition) {
if (isAlwaysCondition(condition)) {
return cloneDeep(EMPTY_EQUALS_CONDITION);
}
return condition;
}
export function emptyEqualsToAlways(condition: Condition) {
if (isEqual(condition, EMPTY_EQUALS_CONDITION)) {
return { always: {} };
}
return condition;
}

View file

@ -5,14 +5,14 @@
* 2.0.
*/
import { StreamDefinition, isIngestStream, isWiredStream } from '@kbn/streams-schema';
import { StreamDefinition, isUnwiredStreamDefinition } from '@kbn/streams-schema';
export function getIndexPatterns(definition: StreamDefinition | undefined) {
if (!definition) {
return undefined;
}
if (!isWiredStream(definition) && isIngestStream(definition)) {
return [definition.name as string];
if (!isUnwiredStreamDefinition(definition)) {
return [definition.name];
}
const isRoot = definition.name.indexOf('.') === -1;
const dataStreamOfDefinition = definition.name;

View file

@ -56,6 +56,7 @@
"@kbn/object-utils",
"@kbn/deeplinks-analytics",
"@kbn/dashboard-plugin",
"@kbn/react-kibana-mount"
"@kbn/react-kibana-mount",
"@kbn/zod"
]
}

View file

@ -6,6 +6,7 @@
*/
import expect from '@kbn/expect';
import { asUnwiredStreamGetResponse } from '@kbn/streams-schema';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import {
StreamsSupertestRepositoryClient,
@ -33,7 +34,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
await disableStreams(apiClient);
});
it('Shows non-wired data streams', async () => {
it('non-wired data streams', async () => {
const doc = {
message: '2023-01-01T00:00:10.000Z error test',
};
@ -51,11 +52,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(classicStream).to.eql({
name: TEST_STREAM_NAME,
stream: {
ingest: {
processing: [],
routing: [],
},
ingest: {
processing: [],
routing: [],
unwired: {},
},
});
});
@ -67,20 +67,23 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
id: TEST_STREAM_NAME,
},
body: {
ingest: {
routing: [],
processing: [
{
config: {
dashboards: [],
stream: {
ingest: {
routing: [],
processing: [
{
grok: {
if: { always: {} },
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
},
},
},
],
],
unwired: {},
},
},
},
},
@ -96,34 +99,66 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(getResponse.status).to.eql(200);
expect(getResponse.body).to.eql({
name: TEST_STREAM_NAME,
dashboards: [],
inherited_fields: {},
lifecycle: isServerless
const body = asUnwiredStreamGetResponse(getResponse.body);
const { dashboards, stream, lifecycle, elasticsearch_assets: elasticsearchAssets } = body;
expect(dashboards).to.eql([]);
expect(stream).to.eql({
ingest: {
processing: [
{
grok: {
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
if: { always: {} },
},
},
],
routing: [],
unwired: {},
},
});
expect(lifecycle).to.eql(
isServerless
? { type: 'dlm' }
: {
policy: 'logs',
type: 'ilm',
},
stream: {
ingest: {
processing: [
{
config: {
grok: {
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
},
},
},
],
routing: [],
},
}
);
expect(elasticsearchAssets).to.eql([
{
type: 'ingest_pipeline',
id: 'logs@default-pipeline',
},
});
{
type: 'component_template',
id: 'logs@mappings',
},
{
type: 'component_template',
id: 'logs@settings',
},
{
type: 'component_template',
id: 'logs@custom',
},
{ type: 'component_template', id: 'ecs@mappings' },
{
type: 'index_template',
id: 'logs',
},
{
type: 'data_stream',
id: 'logs-test-default',
},
]);
});
it('Executes processing on classic streams', async () => {
@ -151,9 +186,13 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
params: {
path: { id: TEST_STREAM_NAME },
body: {
ingest: {
processing: [],
routing: [],
dashboards: [],
stream: {
ingest: {
processing: [],
routing: [],
unwired: {},
},
},
},
},

View file

@ -7,7 +7,7 @@
import expect from '@kbn/expect';
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import { WiredStreamConfigDefinition } from '@kbn/streams-schema';
import { IngestStreamUpsertRequest } from '@kbn/streams-schema';
import {
disableStreams,
enableStreams,
@ -35,7 +35,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
name: 'logs.nginx',
},
condition: {
if: {
field: 'host.name',
operator: 'eq' as const,
value: 'routeme',
@ -50,50 +50,50 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('Place processing steps', async () => {
const body: WiredStreamConfigDefinition = {
ingest: {
processing: [
{
config: {
const body: IngestStreamUpsertRequest = {
dashboards: [],
stream: {
ingest: {
processing: [
{
grok: {
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
if: { always: {} },
},
},
},
{
config: {
{
dissect: {
field: 'message2',
pattern: '%{log.logger} %{message3}',
if: {
field: 'log.level',
operator: 'eq',
value: 'info',
},
},
},
condition: {
field: 'log.level',
operator: 'eq',
value: 'info',
},
},
],
routing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
message2: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
],
routing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
message2: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
},
},
},
},

View file

@ -6,9 +6,11 @@
*/
import expect from '@kbn/expect';
import { ClientRequestParamsOf } from '@kbn/server-route-repository-utils';
import type { StreamsRouteRepository } from '@kbn/streams-plugin/server';
import { ReadStreamDefinition, WiredReadStreamDefinition } from '@kbn/streams-schema';
import {
StreamUpsertRequest,
StreamGetResponse,
WiredReadStreamDefinition,
} from '@kbn/streams-schema';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import {
StreamsSupertestRepositoryClient,
@ -16,69 +18,70 @@ import {
} from './helpers/repository_client';
import { disableStreams, enableStreams, indexDocument } from './helpers/requests';
type StreamPutItem = ClientRequestParamsOf<
StreamsRouteRepository,
'PUT /api/streams/{id}'
>['params']['body'] & { name: string };
type StreamPutItem = Omit<StreamUpsertRequest, 'dashboards'> & { name: string };
const streams: StreamPutItem[] = [
{
name: 'logs',
ingest: {
processing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
stream: {
ingest: {
processing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
},
},
},
routing: [
{
destination: 'logs.test',
if: {
and: [
{
field: 'numberfield',
operator: 'gt',
value: 15,
},
],
},
},
{
destination: 'logs.test2',
if: {
and: [
{
field: 'field2',
operator: 'eq',
value: 'abc',
},
],
},
},
],
},
routing: [
{
name: 'logs.test',
condition: {
and: [
{
field: 'numberfield',
operator: 'gt',
value: 15,
},
],
},
},
{
name: 'logs.test2',
condition: {
and: [
{
field: 'field2',
operator: 'eq',
value: 'abc',
},
],
},
},
],
},
},
{
name: 'logs.test',
ingest: {
routing: [],
processing: [],
wired: {
fields: {
numberfield: {
type: 'long',
stream: {
ingest: {
routing: [],
processing: [],
wired: {
fields: {
numberfield: {
type: 'long',
},
},
},
},
@ -86,39 +89,42 @@ const streams: StreamPutItem[] = [
},
{
name: 'logs.test2',
ingest: {
processing: [
{
config: {
stream: {
ingest: {
processing: [
{
grok: {
field: 'message',
patterns: ['%{NUMBER:numberfield}'],
if: { always: {} },
},
},
],
wired: {
fields: {
field2: {
type: 'keyword',
},
},
},
],
wired: {
fields: {
field2: {
type: 'keyword',
},
},
routing: [],
},
routing: [],
},
},
{
name: 'logs.deeply.nested.streamname',
ingest: {
processing: [],
wired: {
fields: {
field2: {
type: 'keyword',
stream: {
ingest: {
processing: [],
wired: {
fields: {
field2: {
type: 'keyword',
},
},
},
routing: [],
},
routing: [],
},
},
];
@ -143,8 +149,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('checks whether deeply nested stream is created correctly', async () => {
function getChildNames(stream: ReadStreamDefinition['stream']) {
return stream.ingest.routing.map((r) => r.name);
function getChildNames(stream: StreamGetResponse['stream']) {
return stream.ingest.routing.map((r) => r.destination);
}
const logs = await apiClient.fetch('GET /api/streams/{id}', {
params: {
@ -216,7 +222,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
await apiClient
.fetch('PUT /api/streams/{id}', {
params: {
body: stream,
body: {
...stream,
dashboards: [],
} as StreamUpsertRequest,
path: { id: streamId },
},
})

View file

@ -147,7 +147,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
name: 'logs.nginx',
},
condition: {
if: {
field: 'log.logger',
operator: 'eq' as const,
value: 'nginx',
@ -184,7 +184,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
name: 'logs.nginx.access',
},
condition: { field: 'log.level', operator: 'eq' as const, value: 'info' },
if: { field: 'log.level', operator: 'eq' as const, value: 'info' },
};
const response = await forkStream(apiClient, 'logs.nginx', body);
expect(response).to.have.property('acknowledged', true);
@ -217,7 +217,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
name: 'logs.nginx.error',
},
condition: { field: 'log', operator: 'eq' as const, value: 'error' },
if: { field: 'log', operator: 'eq' as const, value: 'error' },
};
const response = await forkStream(apiClient, 'logs.nginx', body);
expect(response).to.have.property('acknowledged', true);
@ -250,7 +250,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
name: 'logs.number-test',
},
condition: { field: 'code', operator: 'gte' as const, value: '500' },
if: { field: 'code', operator: 'gte' as const, value: '500' },
};
const response = await forkStream(apiClient, 'logs', body);
expect(response).to.have.property('acknowledged', true);
@ -282,7 +282,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
name: 'logs.string-test',
},
condition: {
if: {
or: [
{ field: 'message', operator: 'contains' as const, value: '500' },
{ field: 'message', operator: 'contains' as const, value: 400 },

View file

@ -8,7 +8,7 @@ import { Client } from '@elastic/elasticsearch';
import { JsonObject } from '@kbn/utility-types';
import expect from '@kbn/expect';
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { StreamConfigDefinition } from '@kbn/streams-schema';
import { StreamUpsertRequest } from '@kbn/streams-schema';
import { ClientRequestParamsOf } from '@kbn/server-route-repository-utils';
import { StreamsRouteRepository } from '@kbn/streams-plugin/server';
import { StreamsSupertestRepositoryClient } from './repository_client';
@ -59,7 +59,7 @@ export async function forkStream(
export async function putStream(
apiClient: StreamsSupertestRepositoryClient,
name: string,
body: StreamConfigDefinition,
body: StreamUpsertRequest,
expectStatusCode: number = 200
) {
return await apiClient

View file

@ -54,13 +54,12 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
};
const basicGrokProcessor = {
config: {
grok: {
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:parsed_timestamp} %{LOGLEVEL:parsed_level} %{GREEDYDATA:parsed_message}',
],
},
grok: {
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:parsed_timestamp} %{LOGLEVEL:parsed_level} %{GREEDYDATA:parsed_message}',
],
if: { always: {} },
},
};
@ -82,7 +81,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
name: 'logs.test',
},
condition: {
if: {
field: 'host.name',
operator: 'eq' as const,
value: TEST_HOST,
@ -160,11 +159,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
{
processing: [
{
config: {
grok: {
field: 'message',
patterns: ['%{INVALID_PATTERN:field}'],
},
grok: {
field: 'message',
patterns: ['%{INVALID_PATTERN:field}'],
if: { always: {} },
},
},
],
@ -181,11 +179,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
{
processing: [
{
config: {
grok: {
field: 'message',
patterns: ['%{TIMESTAMP_ISO8601:parsed_timestamp} %{GREEDYDATA:message}'], // Overwrites existing message field
},
grok: {
field: 'message',
patterns: ['%{TIMESTAMP_ISO8601:parsed_timestamp} %{GREEDYDATA:message}'], // Overwrites existing message field
if: { always: {} },
},
},
],

View file

@ -6,7 +6,7 @@
*/
import expect from '@kbn/expect';
import { WiredStreamConfigDefinition, WiredStreamDefinition } from '@kbn/streams-schema';
import { IngestStreamUpsertRequest, WiredStreamDefinition } from '@kbn/streams-schema';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import { disableStreams, enableStreams, putStream } from './helpers/requests';
import {
@ -16,24 +16,22 @@ import {
const rootStreamDefinition: WiredStreamDefinition = {
name: 'logs',
stream: {
ingest: {
processing: [],
routing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
},
ingest: {
processing: [],
routing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
},
},
},
@ -55,21 +53,23 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('Should not allow processing changes', async () => {
const body: WiredStreamConfigDefinition = {
ingest: {
...rootStreamDefinition.stream.ingest,
processing: [
{
config: {
const body: IngestStreamUpsertRequest = {
dashboards: [],
stream: {
ingest: {
...rootStreamDefinition.ingest,
processing: [
{
grok: {
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
if: { always: {} },
},
},
},
],
],
},
},
};
const response = await putStream(apiClient, 'logs', body, 400);
@ -80,14 +80,17 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('Should not allow fields changes', async () => {
const body: WiredStreamConfigDefinition = {
ingest: {
...rootStreamDefinition.stream.ingest,
wired: {
fields: {
...rootStreamDefinition.stream.ingest.wired.fields,
'log.level': {
type: 'boolean',
const body: IngestStreamUpsertRequest = {
dashboards: [],
stream: {
ingest: {
...rootStreamDefinition.ingest,
wired: {
fields: {
...rootStreamDefinition.ingest.wired.fields,
'log.level': {
type: 'boolean',
},
},
},
},
@ -98,19 +101,22 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('Should allow routing changes', async () => {
const body: WiredStreamConfigDefinition = {
ingest: {
...rootStreamDefinition.stream.ingest,
routing: [
{
name: 'logs.gcpcloud',
condition: {
field: 'cloud.provider',
operator: 'eq',
value: 'gcp',
const body: IngestStreamUpsertRequest = {
dashboards: [],
stream: {
ingest: {
...rootStreamDefinition.ingest,
routing: [
{
destination: 'logs.gcpcloud',
if: {
field: 'cloud.provider',
operator: 'eq',
value: 'gcp',
},
},
},
],
],
},
},
};
const response = await putStream(apiClient, 'logs', body);

View file

@ -93,7 +93,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
stream: {
name: 'logs.nginx',
},
condition: {
if: {
field: 'log.logger',
operator: 'eq' as const,
value: 'nginx',