[8.x] 🌊 Add support for existing data streams (#202057) (#202771)

# Backport

This will backport the following commits from `main` to `8.x`:
- [🌊 Add support for existing data streams
(#202057)](https://github.com/elastic/kibana/pull/202057)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Chris
Cowan","email":"chris@elastic.co"},"sourceCommit":{"committedDate":"2024-12-02T23:18:58Z","message":"🌊
Add support for existing data streams (#202057)\n\n## 🍒
Summary\r\n\r\nThis PR introduces the initial support for working with
existing data\r\nstreams. This is done by reading the `/_data_steram`
API endpoint then\r\nconverting those results to stream definitions with
the `managed` flag\r\nset to `false`, and then mixing them in with the
\"managed\" streams\r\nresults. This PR has the following
changes:\r\n\r\n- Add `managed` field to the Stream definition \r\n- Set
`managed: true` on streams created through the system\r\n- Update
`listStreams` to return both managed and un-managed streams \r\n- Update
`readStream` to fallback to \"un-managed\" stream if the
managed\r\nstream is not found\r\n- In `readStream` return all related
Elasticsearch assets\r\n- Add rudimentary UI support for classic data
streams\r\n\r\n---------\r\n\r\nCo-authored-by: Joe Reuter
<johannes.reuter@elastic.co>","sha":"4ec420a816e2eb24a3a978471e80186a470ebdde","branchLabelMapping":{"^v9.0.0$":"main","^v8.18.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team:Observability","v9.0.0","backport:prev-minor","v8.18.0","Feature:Streams"],"title":"🌊
Add support for existing data
streams","number":202057,"url":"https://github.com/elastic/kibana/pull/202057","mergeCommit":{"message":"🌊
Add support for existing data streams (#202057)\n\n## 🍒
Summary\r\n\r\nThis PR introduces the initial support for working with
existing data\r\nstreams. This is done by reading the `/_data_steram`
API endpoint then\r\nconverting those results to stream definitions with
the `managed` flag\r\nset to `false`, and then mixing them in with the
\"managed\" streams\r\nresults. This PR has the following
changes:\r\n\r\n- Add `managed` field to the Stream definition \r\n- Set
`managed: true` on streams created through the system\r\n- Update
`listStreams` to return both managed and un-managed streams \r\n- Update
`readStream` to fallback to \"un-managed\" stream if the
managed\r\nstream is not found\r\n- In `readStream` return all related
Elasticsearch assets\r\n- Add rudimentary UI support for classic data
streams\r\n\r\n---------\r\n\r\nCo-authored-by: Joe Reuter
<johannes.reuter@elastic.co>","sha":"4ec420a816e2eb24a3a978471e80186a470ebdde"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/202057","number":202057,"mergeCommit":{"message":"🌊
Add support for existing data streams (#202057)\n\n## 🍒
Summary\r\n\r\nThis PR introduces the initial support for working with
existing data\r\nstreams. This is done by reading the `/_data_steram`
API endpoint then\r\nconverting those results to stream definitions with
the `managed` flag\r\nset to `false`, and then mixing them in with the
\"managed\" streams\r\nresults. This PR has the following
changes:\r\n\r\n- Add `managed` field to the Stream definition \r\n- Set
`managed: true` on streams created through the system\r\n- Update
`listStreams` to return both managed and un-managed streams \r\n- Update
`readStream` to fallback to \"un-managed\" stream if the
managed\r\nstream is not found\r\n- In `readStream` return all related
Elasticsearch assets\r\n- Add rudimentary UI support for classic data
streams\r\n\r\n---------\r\n\r\nCo-authored-by: Joe Reuter
<johannes.reuter@elastic.co>","sha":"4ec420a816e2eb24a3a978471e80186a470ebdde"}},{"branch":"8.x","label":"v8.18.0","branchLabelMappingKey":"^v8.18.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Chris Cowan <chris@elastic.co>
This commit is contained in:
Kibana Machine 2024-12-04 05:40:20 +11:00 committed by GitHub
parent f3da0dccd4
commit 60c300e739
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 258 additions and 18 deletions

View file

@ -94,6 +94,15 @@ export type StreamWithoutIdDefinition = z.infer<typeof streamDefinitonSchema>;
export const streamDefinitonSchema = streamWithoutIdDefinitonSchema.extend({
id: z.string(),
managed: z.boolean().default(true),
unmanaged_elasticsearch_assets: z.optional(
z.array(
z.object({
type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']),
id: z.string(),
})
)
),
});
export type StreamDefinition = z.infer<typeof streamDefinitonSchema>;

View file

@ -29,6 +29,9 @@ export function createStreamsIndex(scopedClusterClient: IScopedClusterClient) {
id: {
type: 'keyword',
},
managed: {
type: 'boolean',
},
},
},
});

View file

@ -9,6 +9,7 @@ import { StreamDefinition } from '../../../common/types';
export const rootStreamDefinition: StreamDefinition = {
id: 'logs',
managed: true,
processing: [],
children: [],
fields: [

View file

@ -81,7 +81,7 @@ async function upsertInternalStream({ definition, scopedClusterClient }: BasePar
return scopedClusterClient.asInternalUser.index({
id: definition.id,
index: STREAMS_INDEX,
document: definition,
document: { ...definition, managed: true },
refresh: 'wait_for',
});
}
@ -101,15 +101,32 @@ export async function listStreams({
size: 10000,
sort: [{ id: 'asc' }],
});
const definitions = response.hits.hits.map((hit) => hit._source!);
const dataStreams = await listDataStreamsAsStreams({ scopedClusterClient });
const definitions = response.hits.hits.map((hit) => ({ ...hit._source!, managed: true }));
const total = response.hits.total!;
return {
definitions,
total: typeof total === 'number' ? total : total.value,
definitions: [...definitions, ...dataStreams],
total: (typeof total === 'number' ? total : total.value) + dataStreams.length,
};
}
export async function listDataStreamsAsStreams({
scopedClusterClient,
}: ListStreamsParams): Promise<StreamDefinition[]> {
const response = await scopedClusterClient.asInternalUser.indices.getDataStream();
return response.data_streams
.filter((dataStream) => dataStream.template.endsWith('@stream') === false)
.map((dataStream) => ({
id: dataStream.name,
managed: false,
children: [],
fields: [],
processing: [],
}));
}
interface ReadStreamParams extends BaseParams {
id: string;
skipAccessCheck?: boolean;
@ -137,16 +154,80 @@ export async function readStream({
}
}
return {
definition,
definition: {
...definition,
managed: true,
},
};
} catch (e) {
if (e.meta?.statusCode === 404) {
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
return readDataStreamAsStream({ id, scopedClusterClient, skipAccessCheck });
}
throw e;
}
}
export async function readDataStreamAsStream({
id,
scopedClusterClient,
skipAccessCheck,
}: ReadStreamParams) {
const response = await scopedClusterClient.asInternalUser.indices.getDataStream({ name: id });
if (response.data_streams.length === 1) {
const definition: StreamDefinition = {
id,
managed: false,
children: [],
fields: [],
processing: [],
};
if (!skipAccessCheck) {
const hasAccess = await checkReadAccess({ id, scopedClusterClient });
if (!hasAccess) {
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}
}
// retrieve linked index template, component template and ingest pipeline
const templateName = response.data_streams[0].template;
const componentTemplates: string[] = [];
const template = await scopedClusterClient.asInternalUser.indices.getIndexTemplate({
name: templateName,
});
if (template.index_templates.length) {
template.index_templates[0].index_template.composed_of.forEach((componentTemplateName) => {
componentTemplates.push(componentTemplateName);
});
}
const writeIndexName = response.data_streams[0].indices.at(-1)?.index_name!;
const currentIndex = await scopedClusterClient.asInternalUser.indices.get({
index: writeIndexName,
});
const ingestPipelineId = currentIndex[writeIndexName].settings?.index?.default_pipeline!;
definition.unmanaged_elasticsearch_assets = [
{
type: 'ingest_pipeline',
id: ingestPipelineId,
},
...componentTemplates.map((componentTemplateName) => ({
type: 'component_template' as const,
id: componentTemplateName,
})),
{
type: 'index_template',
id: templateName,
},
{
type: 'data_stream',
id,
},
];
return { definition };
}
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}
interface ReadAncestorsParams extends BaseParams {
id: string;
}
@ -285,6 +366,10 @@ export async function syncStream({
rootDefinition,
logger,
}: SyncStreamParams) {
if (!definition.managed) {
// TODO For now, we just don't allow reads at all - later on we will relax this to allow certain operations, but they will use a completely different syncing logic
throw new Error('Cannot sync an unmanaged stream');
}
const componentTemplate = generateLayer(definition.id, definition);
await upsertComponent({
esClient: scopedClusterClient.asCurrentUser,

View file

@ -76,6 +76,7 @@ export const editStreamRoute = createServerRoute({
children: [],
fields: [],
processing: [],
managed: true,
};
await syncStream({
@ -87,7 +88,7 @@ export const editStreamRoute = createServerRoute({
await syncStream({
scopedClusterClient,
definition: { ...streamDefinition, id: params.path.id },
definition: { ...streamDefinition, id: params.path.id, managed: true },
rootDefinition: parentDefinition,
logger,
});

View file

@ -55,7 +55,7 @@ export const forkStreamsRoute = createServerRoute({
id: params.path.id,
});
const childDefinition = { ...params.body.stream, children: [] };
const childDefinition = { ...params.body.stream, children: [], managed: true };
// check whether root stream has a child of the given name already
if (rootDefinition.children.some((child) => child.id === childDefinition.id)) {

View file

@ -52,9 +52,11 @@ export interface StreamTree {
children: StreamTree[];
}
function asTrees(definitions: Array<{ id: string }>) {
function asTrees(definitions: Array<{ id: string; managed?: boolean }>) {
const trees: StreamTree[] = [];
const ids = definitions.map((definition) => definition.id);
const ids = definitions
.filter((definition) => definition.managed)
.map((definition) => definition.id);
ids.sort((a, b) => a.split('.').length - b.split('.').length);

View file

@ -45,6 +45,13 @@ export const readStreamRoute = createServerRoute({
id: params.path.id,
});
if (streamEntity.definition.managed === false) {
return {
...streamEntity.definition,
inheritedFields: [],
};
}
const { ancestors } = await readAncestors({
id: streamEntity.definition.id,
scopedClusterClient,

View file

@ -4,9 +4,10 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { EuiFlexGroup, EuiFlexItem, EuiIcon, EuiLink, EuiPanel } from '@elastic/eui';
import { EuiFlexGroup, EuiFlexItem, EuiIcon, EuiLink, EuiPanel, EuiBadge } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import React from 'react';
import { StreamDefinition } from '@kbn/streams-plugin/common';
import { useStreamsAppBreadcrumbs } from '../../hooks/use_streams_app_breadcrumbs';
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { EntityOverviewTabList } from '../entity_overview_tab_list';
@ -25,6 +26,7 @@ export function EntityDetailViewWithoutParams({
selectedTab,
tabs,
entity,
definition,
}: {
selectedTab: string;
tabs: EntityViewTab[];
@ -32,6 +34,7 @@ export function EntityDetailViewWithoutParams({
displayName?: string;
id: string;
};
definition?: StreamDefinition;
}) {
const router = useStreamsAppRouter();
useStreamsAppBreadcrumbs(() => {
@ -86,7 +89,26 @@ export function EntityDetailViewWithoutParams({
<EuiFlexItem grow={false}>
<StreamsAppPageHeader
verticalPaddingSize="none"
title={<StreamsAppPageHeaderTitle title={entity.displayName} />}
title={
<StreamsAppPageHeaderTitle
title={
<>
{entity.displayName}
{definition && !definition.managed ? (
<>
{' '}
<EuiBadge>
{i18n.translate(
'xpack.streams.entityDetailViewWithoutParams.unmanagedBadgeLabel',
{ defaultMessage: 'Unmanaged' }
)}
</EuiBadge>
</>
) : null}
</>
}
/>
}
>
<EntityOverviewTabList
tabs={Object.entries(tabMap).map(([tabKey, { label, href }]) => {

View file

@ -7,13 +7,14 @@
import React from 'react';
import { i18n } from '@kbn/i18n';
import { StreamDefinition } from '@kbn/streams-plugin/common';
import { EuiButtonGroup, EuiFlexGroup, EuiFlexItem } from '@elastic/eui';
import { EuiButtonGroup, EuiFlexGroup, EuiFlexItem, EuiListGroup, EuiText } from '@elastic/eui';
import { useStreamsAppParams } from '../../hooks/use_streams_app_params';
import { RedirectTo } from '../redirect_to';
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { StreamDetailRouting } from '../stream_detail_routing';
import { StreamDetailEnriching } from '../stream_detail_enriching';
import { StreamDetailSchemaEditor } from '../stream_detail_schema_editor';
import { useKibana } from '../../hooks/use_kibana';
type ManagementSubTabs = 'route' | 'enrich' | 'schemaEditor';
@ -33,6 +34,18 @@ export function StreamDetailManagement({
} = useStreamsAppParams('/{key}/management/{subtab}');
const router = useStreamsAppRouter();
if (subtab === 'overview') {
if (!definition) {
return null;
}
if (definition.managed) {
return (
<RedirectTo path="/{key}/management/{subtab}" params={{ path: { key, subtab: 'route' } }} />
);
}
return <UnmanagedStreamOverview definition={definition} />;
}
const tabs = {
route: {
content: (
@ -66,6 +79,15 @@ export function StreamDetailManagement({
);
}
if (!definition?.managed) {
return (
<RedirectTo
path="/{key}/management/{subtab}"
params={{ path: { key, subtab: 'overview' } }}
/>
);
}
const selectedTabObject = tabs[subtab];
return (
@ -90,3 +112,84 @@ export function StreamDetailManagement({
</EuiFlexGroup>
);
}
function assetToLink(asset: { type: string; id: string }) {
switch (asset.type) {
case 'index_template':
return `/app/management/data/index_management/templates/${asset.id}`;
case 'component_template':
return `/app/management/data/index_management/component_templates/${asset.id}`;
case 'data_stream':
return `/app/management/data/index_management/data_streams/${asset.id}`;
case 'ingest_pipeline':
return `/app/management/ingest/ingest_pipelines?pipeline=${asset.id}`;
default:
return '';
}
}
function assetToTitle(asset: { type: string; id: string }) {
switch (asset.type) {
case 'index_template':
return i18n.translate('xpack.streams.streamDetailView.indexTemplate', {
defaultMessage: 'Index template',
});
case 'component_template':
return i18n.translate('xpack.streams.streamDetailView.componentTemplate', {
defaultMessage: 'Component template',
});
case 'data_stream':
return i18n.translate('xpack.streams.streamDetailView.dataStream', {
defaultMessage: 'Data stream',
});
case 'ingest_pipeline':
return i18n.translate('xpack.streams.streamDetailView.ingestPipeline', {
defaultMessage: 'Ingest pipeline',
});
default:
return '';
}
}
function UnmanagedStreamOverview({ definition }: { definition: StreamDefinition }) {
const {
core: {
http: { basePath },
},
} = useKibana();
const groupedAssets = (definition.unmanaged_elasticsearch_assets ?? []).reduce((acc, asset) => {
const title = assetToTitle(asset);
if (title) {
acc[title] = acc[title] ?? [];
acc[title].push(asset);
}
return acc;
}, {} as Record<string, Array<{ type: string; id: string }>>);
return (
<EuiFlexGroup direction="column" gutterSize="m">
<EuiText>
<p>
{i18n.translate('xpack.streams.streamDetailView.unmanagedStreamOverview', {
defaultMessage:
'This stream is not managed. Follow the links to stack management to change the related Elasticsearch objects.',
})}
</p>
</EuiText>
{Object.entries(groupedAssets).map(([title, assets]) => (
<div key={title}>
<EuiText>
<h3>{title}</h3>
</EuiText>
<EuiListGroup
listItems={assets.map((asset) => ({
label: asset.id,
href: basePath.prepend(assetToLink(asset)),
iconType: 'index',
target: '_blank',
}))}
/>
</div>
))}
</EuiFlexGroup>
);
}

View file

@ -63,5 +63,12 @@ export function StreamDetailView() {
},
];
return <EntityDetailViewWithoutParams tabs={tabs} entity={entity} selectedTab={tab} />;
return (
<EntityDetailViewWithoutParams
tabs={tabs}
entity={entity}
definition={streamEntity}
selectedTab={tab}
/>
);
}

View file

@ -7,7 +7,7 @@
import { EuiTitle } from '@elastic/eui';
import React from 'react';
export function StreamsAppPageHeaderTitle({ title }: { title: string }) {
export function StreamsAppPageHeaderTitle({ title }: { title: React.ReactNode }) {
return (
<EuiTitle size="l">
<h1>{title}</h1>

View file

@ -46,10 +46,10 @@ export function StreamsTable({
name: i18n.translate('xpack.streams.streamsTable.nameColumnTitle', {
defaultMessage: 'Name',
}),
render: (_, { id }) => {
render: (_, { id, managed }) => {
return (
<EuiFlexGroup direction="row" gutterSize="s" alignItems="center">
<EuiIcon type="branch" />
<EuiIcon type={managed ? 'branch' : 'bullseye'} />
<EuiLink
data-test-subj="logsaiColumnsLink"
href={router.link('/{key}', { path: { key: id } })}

View file

@ -48,7 +48,7 @@ const streamsAppRoutes = {
element: (
<RedirectTo
path="/{key}/management/{subtab}"
params={{ path: { subtab: 'route' } }}
params={{ path: { subtab: 'overview' } }}
/>
),
},