[Streams 🌊] Introduce GroupStreams (#208126)

## 🍒  Summary
 
This PR adds support to `/api/streams` endpoints for the
`GroupStreamDefinition` type. Group streams are simply a list of member
streams along with dashboards. An example of the definition looks like:

```JSON
{
  "name": "nginx-logs",
  "stream": {
    "grouped": {
      "description": "A collection of streams for Nginx",
      "members": [
        "logs",
        "logs.nginx"
      ]
    }
  },
  "dashboards": []
}
```
The following APIs support `GroupStreamDefinition`:

- `GET /api/streams`
- `GET /api/streams/{id}`
- `PUT /api/streams/{id}`
- `DELETE /api/streams/{id}`
- `GET /api/streams/{id}/_details`

This PR only includes the support to the APIs, I will submit a follow PR
for the UI.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
This commit is contained in:
Chris Cowan 2025-02-04 03:12:35 -07:00 committed by GitHub
parent bdf496b2e1
commit 2092c3d4ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 764 additions and 273 deletions

View file

@ -1593,10 +1593,10 @@
},
{
"parentPluginId": "@kbn/streams-schema",
"id": "def-common.isUnWiredStreamGetResponse",
"id": "def-common.isUnwiredStreamGetResponse",
"type": "Function",
"tags": [],
"label": "isUnWiredStreamGetResponse",
"label": "isUnwiredStreamGetResponse",
"description": [],
"signature": [
"<TValue extends ",
@ -1624,7 +1624,7 @@
"children": [
{
"parentPluginId": "@kbn/streams-schema",
"id": "def-common.isUnWiredStreamGetResponse.$1",
"id": "def-common.isUnwiredStreamGetResponse.$1",
"type": "Uncategorized",
"tags": [],
"label": "value",
@ -5016,4 +5016,4 @@
}
]
}
}
}

View file

@ -7,13 +7,55 @@
import { z } from '@kbn/zod';
import {
ingestStreamGetResponseSchema,
ingestStreamUpsertRequestSchema,
unwiredStreamGetResponseSchema,
wiredStreamGetResponseSchema,
type IngestStreamGetResponse,
type IngestStreamUpsertRequest,
} from './ingest';
import {
GroupStreamGetResponse,
groupStreamGetResponseSchema,
GroupStreamUpsertRequest,
groupStreamUpsertRequestSchema,
} from './group';
import { createAsSchemaOrThrow, createIsNarrowSchema } from '../helpers';
export const streamUpsertRequestSchema: z.Schema<StreamUpsertRequest> =
ingestStreamUpsertRequestSchema;
export const streamGetResponseSchema: z.Schema<StreamGetResponse> = z.union([
ingestStreamGetResponseSchema,
groupStreamGetResponseSchema,
]);
export type StreamGetResponse = IngestStreamGetResponse;
export type StreamUpsertRequest = IngestStreamUpsertRequest;
export const streamUpsertRequestSchema: z.Schema<StreamUpsertRequest> = z.union([
ingestStreamUpsertRequestSchema,
groupStreamUpsertRequestSchema,
]);
export const isWiredStreamGetResponse = createIsNarrowSchema(
streamGetResponseSchema,
wiredStreamGetResponseSchema
);
export const isUnwiredStreamGetResponse = createIsNarrowSchema(
streamGetResponseSchema,
unwiredStreamGetResponseSchema
);
export const asWiredStreamGetResponse = createAsSchemaOrThrow(
streamGetResponseSchema,
wiredStreamGetResponseSchema
);
export const asUnwiredStreamGetResponse = createAsSchemaOrThrow(
streamGetResponseSchema,
unwiredStreamGetResponseSchema
);
export const asIngestStreamGetResponse = createAsSchemaOrThrow(
streamGetResponseSchema,
ingestStreamGetResponseSchema
);
export type StreamGetResponse = IngestStreamGetResponse | GroupStreamGetResponse;
export type StreamUpsertRequest = IngestStreamUpsertRequest | GroupStreamUpsertRequest;

View file

@ -8,9 +8,13 @@
import { z } from '@kbn/zod';
import { createIsNarrowSchema } from '../helpers';
import { IngestStreamDefinition, ingestStreamDefinitionSchema } from './ingest';
import { GroupStreamDefinition, groupStreamDefinitionSchema } from './group';
export type StreamDefinition = IngestStreamDefinition;
export type StreamDefinition = IngestStreamDefinition | GroupStreamDefinition;
export const streamDefinitionSchema: z.Schema<StreamDefinition> = ingestStreamDefinitionSchema;
export const streamDefinitionSchema: z.Schema<StreamDefinition> = z.union([
ingestStreamDefinitionSchema,
groupStreamDefinitionSchema,
]);
export const isStreamDefinition = createIsNarrowSchema(z.unknown(), streamDefinitionSchema);

View file

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import {
StreamGetResponseBase,
streamGetResponseSchemaBase,
StreamUpsertRequestBase,
streamUpsertRequestSchemaBase,
} from '../base/api';
import { GroupStreamDefinitionBase, groupStreamDefinitionBaseSchema } from './base';
/**
* Group get response
*/
interface GroupStreamGetResponse extends StreamGetResponseBase {
stream: GroupStreamDefinitionBase;
}
const groupStreamGetResponseSchema: z.Schema<GroupStreamGetResponse> = z.intersection(
streamGetResponseSchemaBase,
z.object({
stream: groupStreamDefinitionBaseSchema,
})
);
/**
* Group upsert request
*/
interface GroupStreamUpsertRequest extends StreamUpsertRequestBase {
stream: GroupStreamDefinitionBase;
}
const groupStreamUpsertRequestSchema: z.Schema<GroupStreamUpsertRequest> = z.intersection(
streamUpsertRequestSchemaBase,
z.object({
stream: groupStreamDefinitionBaseSchema,
})
);
export {
type GroupStreamGetResponse,
type GroupStreamUpsertRequest,
groupStreamGetResponseSchema,
groupStreamUpsertRequestSchema,
};

View file

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
import { StreamDefinitionBase } from '../base';
interface GroupBase {
description?: string;
members: string[];
}
const groupBaseSchema: z.Schema<GroupBase> = z.object({
description: z.optional(z.string()),
members: z.array(NonEmptyString),
});
interface GroupStreamDefinitionBase {
group: GroupBase;
}
const groupStreamDefinitionBaseSchema: z.Schema<GroupStreamDefinitionBase> = z.object({
group: groupBaseSchema,
});
type GroupStreamDefinition = StreamDefinitionBase & GroupStreamDefinitionBase;
const groupStreamDefinitionSchema: z.Schema<GroupStreamDefinition> = z.intersection(
z.object({ name: NonEmptyString }),
groupStreamDefinitionBaseSchema
);
export {
type GroupBase,
type GroupStreamDefinitionBase,
type GroupStreamDefinition,
groupBaseSchema,
groupStreamDefinitionBaseSchema,
groupStreamDefinitionSchema,
};

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './base';
export * from './api';

View file

@ -4,9 +4,10 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { createIsNarrowSchema } from '../helpers';
import { z } from '@kbn/zod';
import { createAsSchemaOrThrow, createIsNarrowSchema } from '../helpers';
import { streamDefinitionSchema } from './core';
import { groupStreamDefinitionBaseSchema, groupStreamDefinitionSchema } from './group';
import {
ingestStreamDefinitionSchema,
unwiredStreamDefinitionSchema,
@ -23,11 +24,31 @@ export const isWiredStreamDefinition = createIsNarrowSchema(
wiredStreamDefinitionSchema
);
export const asIngestStreamDefinition = createAsSchemaOrThrow(
streamDefinitionSchema,
ingestStreamDefinitionSchema
);
export const asWiredStreamDefinition = createAsSchemaOrThrow(
streamDefinitionSchema,
wiredStreamDefinitionSchema
);
export const isUnwiredStreamDefinition = createIsNarrowSchema(
streamDefinitionSchema,
unwiredStreamDefinitionSchema
);
export const isGroupStreamDefinition = createIsNarrowSchema(
streamDefinitionSchema,
groupStreamDefinitionSchema
);
export const isGroupStreamDefinitionBase = createIsNarrowSchema(
z.unknown(),
groupStreamDefinitionBaseSchema
);
export const isRootStreamDefinition = createIsNarrowSchema(
streamDefinitionSchema,
wiredStreamDefinitionSchema.refine((stream) => {

View file

@ -11,3 +11,4 @@ export * from './legacy';
export * from './api';
export * from './core';
export * from './helpers';
export * from './group';

View file

@ -26,7 +26,6 @@ import {
wiredStreamDefinitionSchemaBase,
} from './base';
import { ElasticsearchAsset, elasticsearchAssetSchema } from './common';
import { createIsNarrowSchema, createAsSchemaOrThrow } from '../../helpers';
import {
UnwiredIngestStreamEffectiveLifecycle,
WiredIngestStreamEffectiveLifecycle,
@ -146,33 +145,12 @@ const ingestStreamGetResponseSchema: z.Schema<IngestStreamGetResponse> = z.union
unwiredStreamGetResponseSchema,
]);
const isWiredStreamGetResponse = createIsNarrowSchema(
ingestStreamGetResponseSchema,
wiredStreamGetResponseSchema
);
const isUnWiredStreamGetResponse = createIsNarrowSchema(
ingestStreamGetResponseSchema,
unwiredStreamGetResponseSchema
);
const asWiredStreamGetResponse = createAsSchemaOrThrow(
ingestStreamGetResponseSchema,
wiredStreamGetResponseSchema
);
const asUnwiredStreamGetResponse = createAsSchemaOrThrow(
ingestStreamGetResponseSchema,
unwiredStreamGetResponseSchema
);
export {
ingestStreamUpsertRequestSchema,
ingestUpsertRequestSchema,
isWiredStreamGetResponse,
isUnWiredStreamGetResponse,
asWiredStreamGetResponse,
asUnwiredStreamGetResponse,
ingestStreamGetResponseSchema,
wiredStreamGetResponseSchema,
unwiredStreamGetResponseSchema,
type IngestGetResponse,
type IngestStreamGetResponse,
type IngestStreamUpsertRequest,

View file

@ -52,7 +52,7 @@ export type StorageClientBulkOperation<TDocument extends { _id?: string }> =
}
| { delete: { _id: string } };
export type StorageClientBulkRequest<TDocument extends Record<string, any>> = Omit<
export type StorageClientBulkRequest<TDocument extends { _id?: string }> = Omit<
BulkRequest,
'operations' | 'index'
> & {
@ -80,21 +80,20 @@ export type StorageClientIndexRequest<TDocument = unknown> = Omit<
export type StorageClientIndexResponse = IndexResponse;
export type StorageClientGetRequest = Omit<GetRequest & SearchRequest, 'index'>;
export type StorageClientGetResponse<TDocument extends Record<string, any>> =
GetResponse<TDocument>;
export type StorageClientGetResponse<TDocument extends { _id?: string }> = GetResponse<TDocument>;
export type StorageClientSearch<TStorageSettings extends StorageSettings = never> = <
export type StorageClientSearch<TDocumentType = never> = <
TSearchRequest extends StorageClientSearchRequest
>(
request: TSearchRequest
) => Promise<StorageClientSearchResponse<StorageDocumentOf<TStorageSettings>, TSearchRequest>>;
) => Promise<StorageClientSearchResponse<TDocumentType, TSearchRequest>>;
export type StorageClientBulk<TStorageSettings extends StorageSettings = never> = (
request: StorageClientBulkRequest<StorageDocumentOf<TStorageSettings>>
export type StorageClientBulk<TDocumentType extends { _id?: string } = never> = (
request: StorageClientBulkRequest<TDocumentType>
) => Promise<StorageClientBulkResponse>;
export type StorageClientIndex<TStorageSettings extends StorageSettings = never> = (
request: StorageClientIndexRequest<StorageDocumentOf<TStorageSettings>>
export type StorageClientIndex<TDocumentType = never> = (
request: StorageClientIndexRequest<TDocumentType>
) => Promise<StorageClientIndexResponse>;
export type StorageClientDelete = (
@ -103,22 +102,50 @@ export type StorageClientDelete = (
export type StorageClientClean = () => Promise<StorageClientCleanResponse>;
export type StorageClientGet<TStorageSettings extends StorageSettings = never> = (
export type StorageClientGet<TDocumentType extends { _id?: string } = never> = (
request: StorageClientGetRequest
) => Promise<StorageClientGetResponse<StorageDocumentOf<TStorageSettings>>>;
) => Promise<StorageClientGetResponse<TDocumentType>>;
export type StorageClientExistsIndex = () => Promise<boolean>;
export interface IStorageClient<TStorageSettings extends StorageSettings = never> {
search: StorageClientSearch<TStorageSettings>;
bulk: StorageClientBulk<TStorageSettings>;
index: StorageClientIndex<TStorageSettings>;
export interface InternalIStorageClient<TDocumentType extends { _id?: string } = never> {
search: StorageClientSearch<TDocumentType>;
bulk: StorageClientBulk<TDocumentType>;
index: StorageClientIndex<TDocumentType>;
delete: StorageClientDelete;
clean: StorageClientClean;
get: StorageClientGet<TStorageSettings>;
get: StorageClientGet<TDocumentType>;
existsIndex: StorageClientExistsIndex;
}
type UnionKeys<T> = T extends T ? keyof T : never;
type Exact<T, U> = T extends U
? Exclude<UnionKeys<T>, UnionKeys<U>> extends never
? true
: false
: false;
// The storage settings need to support the application payload type, but it's OK if the
// storage document can hold more fields than the application document.
// To keep the type safety of the application type in the consuming code, both the storage
// settings and the application type are passed to the IStorageClient type.
// The IStorageClient type then checks if the application type is a subset of the storage
// document type. If this is not the case, the IStorageClient type is set to never, which
// will cause a type error in the consuming code.
export type IStorageClient<TSchema extends IndexStorageSettings, TApplicationType> = Exact<
ApplicationDocument<TApplicationType>,
Partial<StorageDocumentOf<TSchema>>
> extends true
? InternalIStorageClient<ApplicationDocument<TApplicationType>>
: never;
export type SimpleIStorageClient<TStorageSettings extends IndexStorageSettings> = IStorageClient<
TStorageSettings,
Omit<StorageDocumentOf<TStorageSettings>, '_id'>
>;
export type ApplicationDocument<TApplicationType> = TApplicationType & { _id: string };
export type StorageDocumentOf<TStorageSettings extends StorageSettings> = StorageFieldTypeOf<{
type: 'object';
properties: TStorageSettings['schema']['properties'];

View file

@ -27,13 +27,14 @@ import {
StorageClientIndex,
StorageClientIndexResponse,
StorageClientSearch,
IStorageClient,
StorageClientGet,
StorageClientExistsIndex,
StorageDocumentOf,
StorageClientSearchResponse,
StorageClientClean,
StorageClientCleanResponse,
ApplicationDocument,
InternalIStorageClient,
} from '..';
import { getSchemaVersion } from '../get_schema_version';
import { StorageMappingProperty } from '../types';
@ -94,7 +95,7 @@ function wrapEsCall<T>(p: Promise<T>): Promise<T> {
* - Index Lifecycle Management
* - Schema upgrades w/ fallbacks
*/
export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings> {
export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings, TApplicationType> {
private readonly logger: Logger;
constructor(
private readonly esClient: ElasticsearchClient,
@ -316,7 +317,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings>
return [];
}
private search: StorageClientSearch<TStorageSettings> = async (request) => {
private search: StorageClientSearch<ApplicationDocument<TApplicationType>> = async (request) => {
return (await wrapEsCall(
this.esClient
.search({
@ -345,10 +346,10 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings>
}
throw error;
})
)) as unknown as ReturnType<StorageClientSearch<TStorageSettings>>;
)) as unknown as ReturnType<StorageClientSearch<ApplicationDocument<TApplicationType>>>;
};
private index: StorageClientIndex<TStorageSettings> = async ({
private index: StorageClientIndex<ApplicationDocument<TApplicationType>> = async ({
id,
refresh = 'wait_for',
...request
@ -387,7 +388,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings>
});
};
private bulk: StorageClientBulk<TStorageSettings> = ({
private bulk: StorageClientBulk<ApplicationDocument<TApplicationType>> = ({
operations,
refresh = 'wait_for',
...request
@ -402,7 +403,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings>
_id: operation.index._id,
},
},
operation.index.document,
operation.index.document as {},
];
}
@ -518,7 +519,10 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings>
return { acknowledged: true, result: 'not_found' };
};
private get: StorageClientGet = async ({ id, ...request }) => {
private get: StorageClientGet<ApplicationDocument<TApplicationType>> = async ({
id,
...request
}) => {
const response = await this.search({
track_total_hits: false,
size: 1,
@ -558,7 +562,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings>
_id: hit._id!,
_index: hit._index,
found: true,
_source: hit._source as StorageDocumentOf<TStorageSettings>,
_source: hit._source as ApplicationDocument<TApplicationType>,
_ignored: hit._ignored,
_primary_term: hit._primary_term,
_routing: hit._routing,
@ -574,7 +578,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings>
});
};
getClient(): IStorageClient<TStorageSettings> {
getClient(): InternalIStorageClient<ApplicationDocument<TApplicationType>> {
return {
bulk: this.bulk,
delete: this.delete,
@ -586,3 +590,6 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings>
};
}
}
export type SimpleStorageIndexAdapter<TStorageSettings extends IndexStorageSettings> =
StorageIndexAdapter<TStorageSettings, Omit<StorageDocumentOf<TStorageSettings>, '_id'>>;

View file

@ -11,7 +11,7 @@ import {
type TestKibanaUtils,
} from '@kbn/core-test-helpers-kbn-server';
import {
IStorageClient,
SimpleIStorageClient,
StorageClientBulkResponse,
StorageClientIndexResponse,
StorageIndexAdapter,
@ -22,6 +22,7 @@ import { httpServerMock } from '@kbn/core/server/mocks';
import * as getSchemaVersionModule from '../../get_schema_version';
import { isResponseError } from '@kbn/es-errors';
import { IndicesGetResponse } from '@elastic/elasticsearch/lib/api/types';
import { SimpleStorageIndexAdapter } from '..';
const TEST_INDEX_NAME = 'test_index';
@ -56,8 +57,8 @@ describe('StorageIndexAdapter', () => {
},
} satisfies StorageSettings;
let adapter: StorageIndexAdapter<typeof storageSettings>;
let client: IStorageClient<typeof storageSettings>;
let adapter: SimpleStorageIndexAdapter<typeof storageSettings>;
let client: SimpleIStorageClient<typeof storageSettings>;
describe('with a clean Elasticsearch instance', () => {
beforeAll(async () => {
@ -406,7 +407,7 @@ describe('StorageIndexAdapter', () => {
function createStorageIndexAdapter<TStorageSettings extends StorageSettings>(
settings: TStorageSettings
): StorageIndexAdapter<TStorageSettings> {
): SimpleStorageIndexAdapter<TStorageSettings> {
return new StorageIndexAdapter(esClient, loggerMock, settings);
}

View file

@ -91,7 +91,7 @@ type PrimitiveOf<TProperty extends StorageMappingProperty> = {
float: number;
object: TProperty extends { properties: Record<string, StorageMappingProperty> }
? {
[key in keyof TProperty['properties']]: StorageFieldTypeOf<TProperty['properties'][key]>;
[key in keyof TProperty['properties']]?: StorageFieldTypeOf<TProperty['properties'][key]>;
}
: object;
}[TProperty['type']];

View file

@ -8,7 +8,7 @@ import { SanitizedRule } from '@kbn/alerting-plugin/common';
import { RulesClient } from '@kbn/alerting-plugin/server';
import { SavedObject, SavedObjectsClientContract } from '@kbn/core/server';
import { termQuery } from '@kbn/observability-utils-server/es/queries/term_query';
import { IStorageClient, StorageDocumentOf } from '@kbn/observability-utils-server/es/storage';
import { IStorageClient } from '@kbn/observability-utils-server/es/storage';
import { keyBy } from 'lodash';
import objectHash from 'object-hash';
import pLimit from 'p-limit';
@ -64,7 +64,7 @@ function getAssetDocument({
entityId,
entityType,
assetType,
}: AssetLink & { entityId: string; entityType: string }): StorageDocumentOf<AssetStorageSettings> {
}: AssetLink & { entityId: string; entityType: string }) {
const doc = {
'asset.id': assetId,
'asset.type': assetType,
@ -87,10 +87,17 @@ interface AssetBulkDeleteOperation {
export type AssetBulkOperation = AssetBulkIndexOperation | AssetBulkDeleteOperation;
export interface StoredAssetLink {
'asset.id': string;
'asset.type': AssetType;
'entity.id': string;
'entity.type': string;
}
export class AssetClient {
constructor(
private readonly clients: {
storageClient: IStorageClient<AssetStorageSettings>;
storageClient: IStorageClient<AssetStorageSettings, StoredAssetLink>;
soClient: SavedObjectsClientContract;
rulesClient: RulesClient;
}

View file

@ -8,8 +8,8 @@
import { CoreSetup, KibanaRequest, Logger } from '@kbn/core/server';
import { StorageIndexAdapter } from '@kbn/observability-utils-server/es/storage';
import { StreamsPluginStartDependencies } from '../../../types';
import { AssetClient } from './asset_client';
import { assetStorageSettings } from './storage_settings';
import { AssetClient, StoredAssetLink } from './asset_client';
import { AssetStorageSettings, assetStorageSettings } from './storage_settings';
export class AssetService {
constructor(
@ -20,7 +20,7 @@ export class AssetService {
async getClientWithRequest({ request }: { request: KibanaRequest }): Promise<AssetClient> {
const [coreStart, pluginsStart] = await this.coreSetup.getStartServices();
const adapter = new StorageIndexAdapter(
const adapter = new StorageIndexAdapter<AssetStorageSettings, StoredAssetLink>(
coreStart.elasticsearch.client.asInternalUser,
this.logger.get('assets'),
assetStorageSettings

View file

@ -15,15 +15,19 @@ import type { IScopedClusterClient, Logger } from '@kbn/core/server';
import { isResponseError } from '@kbn/es-errors';
import {
Condition,
GroupStreamDefinition,
IngestStreamLifecycle,
StreamDefinition,
StreamUpsertRequest,
UnwiredStreamDefinition,
WiredStreamDefinition,
asIngestStreamDefinition,
assertsSchema,
getAncestors,
getParentId,
isChildOf,
isGroupStreamDefinition,
isIngestStreamDefinition,
isDslLifecycle,
isIlmLifecycle,
isInheritLifecycle,
@ -34,6 +38,7 @@ import {
} from '@kbn/streams-schema';
import { cloneDeep, keyBy, omit, orderBy } from 'lodash';
import { AssetClient } from './assets/asset_client';
import { ForbiddenMemberTypeError } from './errors/forbidden_member_type_error';
import {
syncUnwiredStreamDefinitionObjects,
syncWiredStreamDefinitionObjects,
@ -321,6 +326,10 @@ export class StreamsClient {
validateStreamTypeChanges(existingDefinition, definition);
}
if (isGroupStreamDefinition(definition)) {
await this.assertValidGroupMembers({ definition });
}
if (isRootStreamDefinition(definition)) {
// only allow selective updates to a root stream
validateRootStreamChanges(
@ -444,6 +453,29 @@ export class StreamsClient {
return { parentDefinition };
}
/**
* Validates the members of the group streams to ensure they are NOT
* GroupStreamDefinitions
*/
async assertValidGroupMembers({ definition }: { definition: GroupStreamDefinition }) {
const { members } = definition.group;
if (members.includes(definition.name)) {
throw new ForbiddenMemberTypeError('Group streams can not include themselves as a member');
}
await Promise.all(
members.map(async (name) => {
const memberStream = await this.getStream(name);
if (isGroupStreamDefinition(memberStream)) {
throw new ForbiddenMemberTypeError(
`Group streams can not be a member of a group, please remove [${name}]`
);
}
})
);
}
/**
* Forks a stream into a child with a specific condition.
* It mostly defers to `upsertStream` for its validations,
@ -465,7 +497,7 @@ export class StreamsClient {
name: string;
if: Condition;
}): Promise<ForkStreamResponse> {
const parentDefinition = await this.getStream(parent);
const parentDefinition = asIngestStreamDefinition(await this.getStream(parent));
const childDefinition: WiredStreamDefinition = {
name,
@ -526,30 +558,44 @@ export class StreamsClient {
* Returns a stream definition for the given name:
* - if a wired stream definition exists
* - if an ingest stream definition exists
* - if a data stream exists (creates an ingest
* definition on the fly)
* - if a data stream exists (creates an ingest definition on the fly)
* - if a group stream definition exists
*
* Throws when:
* - no definition is found
* - the user does not have access to the stream
*/
async getStream(name: string): Promise<StreamDefinition> {
const definition = await this.getStoredStreamDefinition(name)
.catch(async (error) => {
try {
const response = await this.dependencies.storageClient.get({ id: name });
const streamDefinition = response._source;
assertsSchema(streamDefinitionSchema, streamDefinition);
if (isIngestStreamDefinition(streamDefinition)) {
const privileges = await checkAccess({
id: name,
scopedClusterClient: this.dependencies.scopedClusterClient,
});
if (!privileges.read) {
throw new DefinitionNotFoundError(`Stream definition for ${name} not found`);
}
}
return streamDefinition;
} catch (error) {
try {
if (isElasticsearch404(error)) {
const dataStream = await this.getDataStream(name);
return this.getDataStreamAsIngestStream(dataStream);
}
throw error;
})
.catch(async (error) => {
if (isElasticsearch404(error)) {
} catch (e) {
if (isElasticsearch404(e)) {
throw new DefinitionNotFoundError(`Cannot find stream ${name}`);
}
throw error;
});
return definition;
throw e;
}
}
}
private async getStoredStreamDefinition(name: string): Promise<StreamDefinition> {
@ -678,11 +724,14 @@ export class StreamsClient {
});
const privileges = await checkAccessBulk({
ids: streams.map((stream) => stream.name),
ids: streams
.filter((stream) => !isGroupStreamDefinition(stream))
.map((stream) => stream.name),
scopedClusterClient,
});
return streams.filter((stream) => {
if (isGroupStreamDefinition(stream)) return true;
return privileges[stream.name]?.read === true;
});
}
@ -695,13 +744,13 @@ export class StreamsClient {
private async deleteStreamFromDefinition(definition: StreamDefinition): Promise<void> {
const { assetClient, logger, scopedClusterClient } = this.dependencies;
if (!isWiredStreamDefinition(definition)) {
if (isUnwiredStreamDefinition(definition)) {
await deleteUnmanagedStreamObjects({
scopedClusterClient,
id: definition.name,
logger,
});
} else {
} else if (isWiredStreamDefinition(definition)) {
const parentId = getParentId(definition.name);
// need to update parent first to cut off documents streaming down
@ -763,15 +812,20 @@ export class StreamsClient {
* Also verifies whether the user has access to the stream.
*/
async deleteStream(name: string): Promise<DeleteStreamResponse> {
const [definition, access] = await Promise.all([
this.getStream(name).catch((error) => {
if (isDefinitionNotFoundError(error)) {
return undefined;
}
throw error;
}),
checkAccess({ id: name, scopedClusterClient: this.dependencies.scopedClusterClient }),
]);
const definition = await this.getStream(name).catch((error) => {
if (isDefinitionNotFoundError(error)) {
return undefined;
}
throw error;
});
const access =
definition && isGroupStreamDefinition(definition)
? { write: true, read: true }
: await checkAccess({
id: name,
scopedClusterClient: this.dependencies.scopedClusterClient,
});
if (!access.write) {
throw new SecurityError(`Cannot delete stream, insufficient privileges`);
@ -781,9 +835,11 @@ export class StreamsClient {
return { acknowledged: true, result: 'noop' };
}
const parentId = getParentId(name);
if (isWiredStreamDefinition(definition) && !parentId) {
throw new MalformedStreamIdError('Cannot delete root stream');
if (isWiredStreamDefinition(definition)) {
const parentId = getParentId(name);
if (!parentId) {
throw new MalformedStreamIdError('Cannot delete root stream');
}
}
await this.deleteStreamFromDefinition(definition);
@ -794,13 +850,7 @@ export class StreamsClient {
private async updateStoredStream(definition: StreamDefinition) {
return this.dependencies.storageClient.index({
id: definition.name,
document: omit(
definition,
'elasticsearch_assets',
'dashboards',
'inherited_fields',
'lifecycle'
),
document: definition,
});
}

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StatusError } from './status_error';
export class ForbiddenMemberTypeError extends StatusError {
constructor(message: string) {
super(message, 400);
this.name = 'ForbiddenMemberType';
}
}

View file

@ -9,6 +9,7 @@ import {
StreamDefinition,
WiredStreamDefinition,
isIlmLifecycle,
isIngestStreamDefinition,
isInheritLifecycle,
isUnwiredStreamDefinition,
isWiredStreamDefinition,
@ -97,6 +98,9 @@ export function validateStreamChildrenChanges(
}
export function validateStreamLifecycle(definition: StreamDefinition, isServerless: boolean) {
if (!isIngestStreamDefinition(definition)) {
return;
}
const lifecycle = definition.ingest.lifecycle;
if (isServerless && isIlmLifecycle(lifecycle)) {

View file

@ -6,6 +6,7 @@
*/
import {
IngestStreamDefinition,
StreamDefinition,
getParentId,
isRoot,
@ -49,7 +50,7 @@ export function generateIngestPipeline(
value: definition.name,
},
},
...formatToIngestProcessors(definition.ingest.processing),
...((isWiredStream && formatToIngestProcessors(definition.ingest.processing)) || []),
{
pipeline: {
name: `${id}@stream.reroutes`,
@ -65,7 +66,7 @@ export function generateIngestPipeline(
};
}
export function generateClassicIngestPipelineBody(definition: StreamDefinition) {
export function generateClassicIngestPipelineBody(definition: IngestStreamDefinition) {
return {
processors: formatToIngestProcessors(definition.ingest.processing),
_meta: {

View file

@ -5,13 +5,13 @@
* 2.0.
*/
import { StreamDefinition } from '@kbn/streams-schema';
import { IngestStreamDefinition } from '@kbn/streams-schema';
import { ASSET_VERSION } from '../../../../common/constants';
import { conditionToPainless } from '../helpers/condition_to_painless';
import { getReroutePipelineName } from './name';
interface GenerateReroutePipelineParams {
definition: StreamDefinition;
definition: IngestStreamDefinition;
}
export function generateReroutePipeline({ definition }: GenerateReroutePipelineParams) {

View file

@ -12,6 +12,7 @@ import {
StorageSettings,
types,
} from '@kbn/observability-utils-server/es/storage';
import { StreamDefinition } from '@kbn/streams-schema';
import type { StreamsPluginStartDependencies } from '../../types';
import { StreamsClient } from './client';
import { AssetClient } from './assets/asset_client';
@ -22,12 +23,13 @@ export const streamsStorageSettings = {
properties: {
name: types.keyword(),
ingest: types.object({ enabled: false }),
group: types.object({ enabled: false }),
},
},
} satisfies StorageSettings;
export type StreamsStorageSettings = typeof streamsStorageSettings;
export type StreamsStorageClient = IStorageClient<StreamsStorageSettings>;
export type StreamsStorageClient = IStorageClient<StreamsStorageSettings, StreamDefinition>;
export class StreamsService {
constructor(
@ -50,7 +52,7 @@ export class StreamsService {
const isServerless = coreStart.elasticsearch.getCapabilities().serverless;
const storageAdapter = new StorageIndexAdapter(
const storageAdapter = new StorageIndexAdapter<StreamsStorageSettings, StreamDefinition>(
scopedClusterClient.asInternalUser,
logger,
streamsStorageSettings

View file

@ -86,7 +86,12 @@ export class StreamsPlugin
const scopedClusterClient = coreStart.elasticsearch.client.asScoped(request);
const soClient = coreStart.savedObjects.getScopedClient(request);
return { scopedClusterClient, soClient, assetClient, streamsClient };
return {
scopedClusterClient,
soClient,
assetClient,
streamsClient,
};
},
},
core,

View file

@ -6,9 +6,10 @@
*/
import {
IngestStreamGetResponse,
InheritedFieldDefinition,
StreamGetResponse,
WiredStreamGetResponse,
isGroupStreamDefinition,
isUnwiredStreamDefinition,
} from '@kbn/streams-schema';
import { IScopedClusterClient } from '@kbn/core/server';
@ -30,14 +31,25 @@ export async function readStream({
assetClient: AssetClient;
streamsClient: StreamsClient;
scopedClusterClient: IScopedClusterClient;
}): Promise<IngestStreamGetResponse> {
const [streamDefinition, dashboards, ancestors, dataStream] = await Promise.all([
}): Promise<StreamGetResponse> {
const [streamDefinition, dashboards] = await Promise.all([
streamsClient.getStream(name),
assetClient.getAssetIds({
entityId: name,
entityType: 'stream',
assetType: 'dashboard',
}),
]);
if (isGroupStreamDefinition(streamDefinition)) {
return {
stream: streamDefinition,
dashboards,
};
}
// These queries are only relavant for IngestStreams
const [ancestors, dataStream] = await Promise.all([
streamsClient.getAncestors(name),
streamsClient.getDataStream(name).catch((e) => {
if (e.statusCode === 404) {

View file

@ -7,6 +7,7 @@
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import {
isGroupStreamDefinition,
StreamDefinition,
StreamGetResponse,
streamUpsertRequestSchema,
@ -76,9 +77,12 @@ export const streamDetailRoute = createServerRoute({
const { scopedClusterClient, streamsClient } = await getScopedClients({ request });
const streamEntity = await streamsClient.getStream(params.path.id);
const indexPattern = isGroupStreamDefinition(streamEntity)
? streamEntity.group.members.join(',')
: streamEntity.name;
// check doc count
const docCountResponse = await scopedClusterClient.asCurrentUser.search({
index: streamEntity.name,
index: indexPattern,
body: {
track_total_hits: true,
query: {
@ -144,7 +148,6 @@ export const editStreamRoute = createServerRoute({
}),
handler: async ({ params, request, getScopedClients }): Promise<UpsertStreamResponse> => {
const { streamsClient } = await getScopedClients({ request });
return await streamsClient.upsertStream({
request: params.body,
name: params.path.id,

View file

@ -21,7 +21,7 @@ import React, { useMemo } from 'react';
import { css } from '@emotion/css';
import {
IngestStreamGetResponse,
isUnWiredStreamGetResponse,
isUnwiredStreamGetResponse,
isWiredStreamDefinition,
} from '@kbn/streams-schema';
import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range';
@ -132,7 +132,7 @@ export function StreamDetailOverview({ definition }: { definition?: IngestStream
async ({ signal }) => {
if (
!definition ||
(isUnWiredStreamGetResponse(definition) && !definition.data_stream_exists)
(isUnwiredStreamGetResponse(definition) && !definition.data_stream_exists)
) {
return undefined;
}

View file

@ -5,6 +5,7 @@
* 2.0.
*/
import { i18n } from '@kbn/i18n';
import { isUnwiredStreamGetResponse, isWiredStreamGetResponse } from '@kbn/streams-schema';
import React from 'react';
import { useKibana } from '../../hooks/use_kibana';
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';
@ -35,15 +36,45 @@ export function StreamDetailView() {
refresh,
loading,
} = useStreamsAppFetch(
({ signal }) => {
return streamsRepositoryClient.fetch('GET /api/streams/{id}', {
signal,
params: {
path: {
id: key,
async ({ signal }) => {
return streamsRepositoryClient
.fetch('GET /api/streams/{id}', {
signal,
params: {
path: {
id: key,
},
},
},
});
})
.then((response) => {
if (isWiredStreamGetResponse(response)) {
return {
dashboards: response.dashboards,
inherited_fields: response.inherited_fields,
elasticsearch_assets: [],
effective_lifecycle: response.effective_lifecycle,
name: key,
stream: {
...response.stream,
},
};
}
if (isUnwiredStreamGetResponse(response)) {
return {
dashboards: response.dashboards,
elasticsearch_assets: response.elasticsearch_assets,
inherited_fields: {},
effective_lifecycle: response.effective_lifecycle,
name: key,
data_stream_exists: response.data_stream_exists,
stream: {
...response.stream,
},
};
}
throw new Error('Stream detail only supports IngestStreams.');
});
},
[streamsRepositoryClient, key]
);

View file

@ -7,7 +7,7 @@
import expect from '@kbn/expect';
import {
StreamUpsertRequest,
isGroupStreamDefinitionBase,
StreamGetResponse,
WiredStreamGetResponse,
} from '@kbn/streams-schema';
@ -17,124 +17,7 @@ import {
createStreamsRepositoryAdminClient,
} from './helpers/repository_client';
import { disableStreams, enableStreams, indexDocument } from './helpers/requests';
type StreamPutItem = Omit<StreamUpsertRequest, 'dashboards'> & { name: string };
const streams: StreamPutItem[] = [
{
name: 'logs',
stream: {
ingest: {
lifecycle: { dsl: {} },
processing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
},
'stream.name': {
type: 'keyword',
},
},
},
routing: [
{
destination: 'logs.test',
if: {
and: [
{
field: 'numberfield',
operator: 'gt',
value: 15,
},
],
},
},
{
destination: 'logs.test2',
if: {
and: [
{
field: 'field2',
operator: 'eq',
value: 'abc',
},
],
},
},
],
},
},
},
{
name: 'logs.test',
stream: {
ingest: {
lifecycle: { inherit: {} },
routing: [],
processing: [],
wired: {
fields: {
numberfield: {
type: 'long',
},
},
},
},
},
},
{
name: 'logs.test2',
stream: {
ingest: {
lifecycle: { inherit: {} },
processing: [
{
grok: {
field: 'message',
patterns: ['%{NUMBER:numberfield}'],
if: { always: {} },
},
},
],
wired: {
fields: {
field2: {
type: 'keyword',
},
},
},
routing: [],
},
},
},
{
name: 'logs.deeply.nested.streamname',
stream: {
ingest: {
lifecycle: { inherit: {} },
processing: [],
wired: {
fields: {
field2: {
type: 'keyword',
},
},
},
routing: [],
},
},
},
];
import { createStreams } from './helpers/create_streams';
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const roleScopedSupertest = getService('roleScopedSupertest');
@ -147,7 +30,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
before(async () => {
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
await enableStreams(apiClient);
await createStreams();
await createStreams(apiClient);
await indexDocuments();
});
@ -156,7 +39,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('checks whether deeply nested stream is created correctly', async () => {
function getChildNames(stream: StreamGetResponse['stream']) {
function getChildNames(stream: StreamGetResponse['stream']): string[] {
if (isGroupStreamDefinitionBase(stream)) return [];
return stream.ingest.routing.map((r) => r.destination);
}
const logs = await apiClient.fetch('GET /api/streams/{id}', {
@ -224,23 +108,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(logsTest2Response.hits.total).to.eql({ value: 1, relation: 'eq' });
});
async function createStreams() {
for (const { name: streamId, ...stream } of streams) {
await apiClient
.fetch('PUT /api/streams/{id}', {
params: {
body: {
...stream,
dashboards: [],
} as StreamUpsertRequest,
path: { id: streamId },
},
})
.expect(200)
.then((response) => expect(response.body.acknowledged).to.eql(true));
}
}
async function indexDocuments() {
// send data that stays in logs
const doc = {

View file

@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import {
StreamsSupertestRepositoryClient,
createStreamsRepositoryAdminClient,
} from './helpers/repository_client';
import { disableStreams, enableStreams } from './helpers/requests';
import { createStreams } from './helpers/create_streams';
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const roleScopedSupertest = getService('roleScopedSupertest');
let apiClient: StreamsSupertestRepositoryClient;
// An anticipated use case is that a user will want to flush a tree of streams from a config file
describe('GroupStreamDefinition', () => {
describe('CRUD API Operations', () => {
before(async () => {
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
await enableStreams(apiClient);
await createStreams(apiClient);
});
after(async () => {
await disableStreams(apiClient);
});
it('successfully creates a GroupStream', async () => {
await apiClient
.fetch('PUT /api/streams/{id}', {
params: {
path: { id: 'test-group' },
body: {
stream: {
group: {
members: ['logs', 'logs.test2'],
},
},
dashboards: [],
},
},
})
.expect(200)
.then((response) => expect(response.body.acknowledged).to.eql(true));
});
it('successfully creates a second GroupStream', async () => {
await apiClient
.fetch('PUT /api/streams/{id}', {
params: {
path: { id: 'test-group-too' },
body: {
stream: {
group: {
members: ['logs.test2'],
},
},
dashboards: [],
},
},
})
.expect(200)
.then((response) => expect(response.body.acknowledged).to.eql(true));
});
it('unsuccessfully updates a GroupStream with an uknown stream', async () => {
await apiClient
.fetch('PUT /api/streams/{id}', {
params: {
path: { id: 'test-group' },
body: {
stream: {
group: {
members: ['logs', 'non-existent-stream'],
},
},
dashboards: [],
},
},
})
.expect(404);
});
it('unsuccessfully updates a GroupStream with an itself as a member', async () => {
await apiClient
.fetch('PUT /api/streams/{id}', {
params: {
path: { id: 'test-group' },
body: {
stream: {
group: {
members: ['logs', 'test-group'],
},
},
dashboards: [],
},
},
})
.expect(400);
});
it('unsuccessfully updates a GroupStream with a forbidden member', async () => {
await apiClient
.fetch('PUT /api/streams/{id}', {
params: {
path: { id: 'test-group' },
body: {
stream: {
group: {
members: ['logs', 'test-group-too'],
},
},
dashboards: [],
},
},
})
.expect(400);
});
it('successfully deletes a GroupStream', async () => {
await apiClient
.fetch('DELETE /api/streams/{id}', {
params: {
path: { id: 'test-group-too' },
},
})
.expect(200);
});
it('successfully reads a GroupStream', async () => {
const response = await apiClient
.fetch('GET /api/streams/{id}', {
params: {
path: { id: 'test-group' },
},
})
.expect(200);
expect(response.body).to.eql({
stream: {
name: 'test-group',
group: {
members: ['logs', 'logs.test2'],
},
},
dashboards: [],
});
});
it('successfully lists a GroupStream', async () => {
const response = await apiClient.fetch('GET /api/streams').expect(200);
expect(response.body.streams.some((stream) => stream.name === 'test-group')).to.eql(true);
});
});
});
}

View file

@ -0,0 +1,145 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StreamUpsertRequest } from '@kbn/streams-schema';
import expect from '@kbn/expect';
import { StreamsSupertestRepositoryClient } from './repository_client';
type StreamPutItem = Omit<StreamUpsertRequest, 'dashboards'> & { name: string };
const streams: StreamPutItem[] = [
{
name: 'logs',
stream: {
ingest: {
lifecycle: { dsl: {} },
processing: [],
wired: {
fields: {
'@timestamp': {
type: 'date',
},
message: {
type: 'match_only_text',
},
'host.name': {
type: 'keyword',
},
'log.level': {
type: 'keyword',
},
'stream.name': {
type: 'keyword',
},
},
},
routing: [
{
destination: 'logs.test',
if: {
and: [
{
field: 'numberfield',
operator: 'gt',
value: 15,
},
],
},
},
{
destination: 'logs.test2',
if: {
and: [
{
field: 'field2',
operator: 'eq',
value: 'abc',
},
],
},
},
],
},
},
},
{
name: 'logs.test',
stream: {
ingest: {
lifecycle: { inherit: {} },
routing: [],
processing: [],
wired: {
fields: {
numberfield: {
type: 'long',
},
},
},
},
},
},
{
name: 'logs.test2',
stream: {
ingest: {
lifecycle: { inherit: {} },
processing: [
{
grok: {
field: 'message',
patterns: ['%{NUMBER:numberfield}'],
if: { always: {} },
},
},
],
wired: {
fields: {
field2: {
type: 'keyword',
},
},
},
routing: [],
},
},
},
{
name: 'logs.deeply.nested.streamname',
stream: {
ingest: {
lifecycle: { inherit: {} },
processing: [],
wired: {
fields: {
field2: {
type: 'keyword',
},
},
},
routing: [],
},
},
},
];
export async function createStreams(apiClient: StreamsSupertestRepositoryClient) {
for (const { name: streamId, ...stream } of streams) {
await apiClient
.fetch('PUT /api/streams/{id}', {
params: {
body: {
...stream,
dashboards: [],
} as StreamUpsertRequest,
path: { id: streamId },
},
})
.expect(200)
.then((response) => expect(response.body.acknowledged).to.eql(true));
}
}

View file

@ -17,6 +17,7 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext)
loadTestFile(require.resolve('./schema'));
loadTestFile(require.resolve('./processing_simulate'));
loadTestFile(require.resolve('./root_stream'));
loadTestFile(require.resolve('./group_streams'));
loadTestFile(require.resolve('./lifecycle'));
});
}

View file

@ -10,7 +10,9 @@ import {
IngestStreamEffectiveLifecycle,
IngestStreamLifecycle,
IngestStreamUpsertRequest,
WiredReadStreamDefinition,
WiredStreamGetResponse,
asIngestStreamGetResponse,
isDslLifecycle,
isIlmLifecycle,
} from '@kbn/streams-schema';
@ -34,7 +36,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
) {
const definitions = await Promise.all(streams.map((stream) => getStream(apiClient, stream)));
for (const definition of definitions) {
expect(definition.effective_lifecycle).to.eql(expectedLifecycle);
expect(asIngestStreamGetResponse(definition).effective_lifecycle).to.eql(expectedLifecycle);
}
const dataStreams = await esClient.indices.getDataStream({ name: streams });
@ -102,10 +104,12 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(response).to.have.property('acknowledged', true);
const updatedRootDefinition = await getStream(apiClient, 'logs');
expect((updatedRootDefinition as WiredStreamGetResponse).stream.ingest.lifecycle).to.eql({
dsl: { data_retention: '999d' },
});
expect(updatedRootDefinition.effective_lifecycle).to.eql({
expect((updatedRootDefinition as WiredReadStreamDefinition).stream.ingest.lifecycle).to.eql(
{
dsl: { data_retention: '999d' },
}
);
expect((updatedRootDefinition as WiredReadStreamDefinition).effective_lifecycle).to.eql({
dsl: { data_retention: '999d' },
from: 'logs',
});