🌊 Streams: Restore orphaned streams functionality (#215517)

There were a couple places where our UI and API would break on orphaned
streams (classic data streams that exist in the streams API but the
underlying data stream got deleted by the user). As discussed in
https://github.com/elastic/streams-program/discussions/212 , we should
handle this case gracefully. This PR makes sure that the UI doesn't fail
anywhere

##
`x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/lifecycle/index.ts`

An unwired ingest stream effective lifecycle can also be an error if the
data stream doesn't exist. This was part of the typescript type, but not
the schema, so using our generated typeguards would fail.

## `x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts`

`client.ensureStream` would not actually do what it claimed - it would
not throw if the stream didn't exist at all. Adjusted the logic so it
does.

##
`x-pack/platform/plugins/shared/streams/server/routes/internal/streams/crud/route.ts`

_details endpoint would throw if the data stream doesn't exist, but it
can just do the search and report a doc count of 0

## All the other changes

This is more of a tangent, but I noticed that on switching to the
management tab it would reload the stream definition because it was a
different route, so the component holding the `useStreamsAppFetch` would
remount.

I fixed this by making the detail route a parent and the individual tabs
children. It's a little awkward because the management tab is the only
one having subtabs. I moved the handling of this into the
`StreamDetailViewContent` component because it wouldn't play well with
our typing of routes otherwise, but the behavior is the same as before,
with the difference that the stream definition is not reloaded if not
necessary.

Also added some api-level tests to make sure it stays that way.
This commit is contained in:
Joe Reuter 2025-04-01 12:33:36 +02:00 committed by GitHub
parent 931d2d652e
commit cc15d3ed65
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 218 additions and 54 deletions

View file

@ -66,7 +66,7 @@ export const ingestStreamLifecycleSchema: z.Schema<IngestStreamLifecycle> = z.un
]);
export const unwiredIngestStreamEffectiveLifecycleSchema: z.Schema<UnwiredIngestStreamEffectiveLifecycle> =
z.union([ingestStreamLifecycleSchema, disabledLifecycleSchema]);
z.union([ingestStreamLifecycleSchema, disabledLifecycleSchema, errorLifecycleSchema]);
export const wiredIngestStreamEffectiveLifecycleSchema: z.Schema<WiredIngestStreamEffectiveLifecycle> =
ingestStreamLifecycleSchema.and(z.object({ from: NonEmptyString }));

View file

@ -601,15 +601,27 @@ export class StreamsClient {
const [streamDefinition, dataStream] = await Promise.all([
this.getStoredStreamDefinition(name).catch((error) => {
if (isElasticsearch404(error)) {
return undefined;
return error;
}
throw error;
}),
this.getDataStream(name).catch((error) => {
if (isElasticsearch404(error)) {
return error;
}
throw error;
}),
this.getDataStream(name),
]);
if (dataStream && !streamDefinition) {
if (!isElasticsearch404(streamDefinition)) {
// stream definitely exists, all good
return;
}
if (!isElasticsearch404(dataStream) && isElasticsearch404(streamDefinition)) {
// stream definition does not exist, but data stream does - create an empty stream definition
await this.updateStoredStream(this.getDataStreamAsIngestStream(dataStream));
}
// if both do not exist, the stream does not exist, so this should be a 404
throw streamDefinition;
}
/**

View file

@ -84,6 +84,7 @@ export const streamDetailRoute = createServerRoute({
const docCountResponse = await scopedClusterClient.asCurrentUser.search({
index: indexPattern,
track_total_hits: true,
ignore_unavailable: true,
query: {
range: {
'@timestamp': {

View file

@ -7,6 +7,7 @@
import React from 'react';
import { i18n } from '@kbn/i18n';
import { UnwiredStreamGetResponse } from '@kbn/streams-schema';
import { EuiCallOut, EuiFlexGroup } from '@elastic/eui';
import { useStreamsAppParams } from '../../../hooks/use_streams_app_params';
import { RedirectTo } from '../../redirect_to';
import { StreamDetailEnrichment } from '../stream_detail_enrichment';
@ -29,7 +30,28 @@ export function ClassicStreamDetailManagement({
}) {
const {
path: { key, subtab },
} = useStreamsAppParams('/{key}/management/{subtab}');
} = useStreamsAppParams('/{key}/{tab}/{subtab}');
if (!definition.data_stream_exists) {
return (
<EuiFlexGroup direction="column">
<EuiCallOut
title={i18n.translate('xpack.streams.unmanagedStreamOverview.missingDatastream.title', {
defaultMessage: 'Data stream missing',
})}
color="danger"
iconType="error"
>
<p>
{i18n.translate('xpack.streams.unmanagedStreamOverview.missingDatastream.description', {
defaultMessage:
'The underlying Elasticsearch data stream for this classic stream is missing. Recreate the data stream to restore the stream by sending data before using the management features.',
})}
</p>
</EuiCallOut>
</EuiFlexGroup>
);
}
const tabs: ManagementTabs = {};
@ -64,7 +86,10 @@ export function ClassicStreamDetailManagement({
if (!isValidManagementSubTab(subtab)) {
return (
<RedirectTo path="/{key}/management/{subtab}" params={{ path: { key, subtab: 'enrich' } }} />
<RedirectTo
path="/{key}/{tab}/{subtab}"
params={{ path: { key, tab: 'management', subtab: 'enrich' } }}
/>
);
}

View file

@ -30,7 +30,7 @@ export function WiredStreamDetailManagement({
}) {
const {
path: { key, subtab },
} = useStreamsAppParams('/{key}/management/{subtab}');
} = useStreamsAppParams('/{key}/{tab}/{subtab}');
const tabs = {
route: {
@ -69,7 +69,10 @@ export function WiredStreamDetailManagement({
if (!isValidManagementSubTab(subtab)) {
return (
<RedirectTo path="/{key}/management/{subtab}" params={{ path: { key, subtab: 'route' } }} />
<RedirectTo
path="/{key}/{tab}/{subtab}"
params={{ path: { key, tab: 'management', subtab: 'route' } }}
/>
);
}

View file

@ -42,8 +42,8 @@ export function Wrapper({
legend="Management tabs"
idSelected={subtab}
onChange={(optionId) => {
router.push('/{key}/management/{subtab}', {
path: { key: streamId, subtab: optionId },
router.push('/{key}/{tab}/{subtab}', {
path: { key: streamId, subtab: optionId, tab: 'management' },
query: {},
});
}}

View file

@ -54,9 +54,10 @@ export function ChildStreamList({ definition }: { definition?: IngestStreamGetRe
<EuiButton
data-test-subj="streamsAppChildStreamListCreateChildStreamButton"
iconType="plusInCircle"
href={router.link('/{key}/management/{subtab}', {
href={router.link('/{key}/{tab}/{subtab}', {
path: {
key: definition?.stream.name,
key: definition.stream.name,
tab: 'management',
subtab: 'route',
},
})}

View file

@ -16,7 +16,7 @@ import {
import { css } from '@emotion/css';
import { i18n } from '@kbn/i18n';
import React, { ReactNode } from 'react';
import { IngestStreamGetResponse, IngestStreamLifecycleILM } from '@kbn/streams-schema';
import { IngestStreamGetResponse, isDslLifecycle, isIlmLifecycle } from '@kbn/streams-schema';
import { IlmLocatorParams } from '@kbn/index-lifecycle-management-common-shared';
import { LocatorPublic } from '@kbn/share-plugin/public';
@ -44,7 +44,7 @@ const RetentionDisplay = ({
}) => {
if (!definition) return <>-</>;
if ('dsl' in definition.effective_lifecycle) {
if (isDslLifecycle(definition.effective_lifecycle)) {
return (
<>
{definition?.effective_lifecycle.dsl.data_retention ||
@ -55,12 +55,11 @@ const RetentionDisplay = ({
);
}
return (
<IlmLink
lifecycle={definition.effective_lifecycle as IngestStreamLifecycleILM}
ilmLocator={ilmLocator}
/>
);
if (isIlmLifecycle(definition.effective_lifecycle)) {
return <IlmLink lifecycle={definition.effective_lifecycle} ilmLocator={ilmLocator} />;
}
return <>-</>;
};
interface StatItemProps {

View file

@ -6,6 +6,7 @@
*/
import { i18n } from '@kbn/i18n';
import React from 'react';
import { Outlet } from '@kbn/typed-react-router-config';
import { useKibana } from '../../hooks/use_kibana';
import { useStreamsAppParams } from '../../hooks/use_streams_app_params';
import { EntityDetailViewWithoutParams, EntityViewTab } from '../entity_detail_view';
@ -13,24 +14,28 @@ import { StreamDetailDashboardsView } from '../stream_detail_dashboards_view';
import { StreamDetailManagement } from '../data_management/stream_detail_management';
import { StreamDetailOverview } from '../stream_detail_overview';
import { StreamDetailContextProvider, useStreamDetail } from '../../hooks/use_stream_detail';
import { RedirectTo } from '../redirect_to';
export function StreamDetailView() {
const { streamsRepositoryClient } = useKibana().dependencies.start.streams;
const params1 = useStreamsAppParams('/{key}/{tab}', true);
const params2 = useStreamsAppParams('/{key}/management/{subtab}', true);
const name = params1?.path?.key || params2.path.key;
const tab = params1?.path?.tab || 'management';
const {
path: { key: name },
} = useStreamsAppParams('/{key}/{tab}', true);
return (
<StreamDetailContextProvider name={name} streamsRepositoryClient={streamsRepositoryClient}>
<StreamDetailViewContent name={name} tab={tab} />
<Outlet />
</StreamDetailContextProvider>
);
}
export function StreamDetailViewContent({ name, tab }: { name: string; tab: string }) {
export function StreamDetailViewContent() {
const params1 = useStreamsAppParams('/{key}/{tab}', true);
const params2 = useStreamsAppParams('/{key}/{tab}/{subtab}', true);
const name = params1?.path?.key || params2.path.key;
const tab = params1?.path?.tab || 'management';
const { definition, refresh } = useStreamDetail();
const entity = {
@ -38,6 +43,15 @@ export function StreamDetailViewContent({ name, tab }: { name: string; tab: stri
displayName: name,
};
if (params2?.path?.subtab && tab !== 'management') {
// only management tab has subtabs
return <RedirectTo path="/{key}/{tab}" params={{ path: { tab } }} />;
}
if (!params2?.path?.subtab && tab === 'management') {
// management tab requires a subtab
return <RedirectTo path="/{key}/{tab}/{subtab}" params={{ path: { tab, subtab: 'route' } }} />;
}
const tabs: EntityViewTab[] = [
{
name: 'overview',

View file

@ -298,7 +298,7 @@ function StreamNode({
aria-label={i18n.translate('xpack.streams.streamsTable.management', {
defaultMessage: 'Management',
})}
href={router.link('/{key}/management', { path: { key: node.name } })}
href={router.link('/{key}/{tab}', { path: { key: node.name, tab: 'management' } })}
/>
</EuiToolTip>
</EuiFlexGroup>

View file

@ -8,7 +8,7 @@ import { i18n } from '@kbn/i18n';
import { createRouter, Outlet, RouteMap } from '@kbn/typed-react-router-config';
import * as t from 'io-ts';
import React from 'react';
import { StreamDetailView } from '../components/stream_detail_view';
import { StreamDetailView, StreamDetailViewContent } from '../components/stream_detail_view';
import { StreamsAppPageTemplate } from '../components/streams_app_page_template';
import { StreamsAppRouterBreadcrumb } from '../components/streams_app_router_breadcrumb';
import { RedirectTo } from '../components/redirect_to';
@ -44,22 +44,6 @@ const streamsAppRoutes = {
'/{key}': {
element: <RedirectTo path="/{key}/{tab}" params={{ path: { tab: 'overview' } }} />,
},
'/{key}/management': {
element: (
<RedirectTo
path="/{key}/management/{subtab}"
params={{ path: { subtab: 'overview' } }}
/>
),
},
'/{key}/management/{subtab}': {
element: <StreamDetailView />,
params: t.type({
path: t.type({
subtab: t.string,
}),
}),
},
'/{key}/{tab}': {
element: <StreamDetailView />,
params: t.type({
@ -67,15 +51,25 @@ const streamsAppRoutes = {
tab: t.string,
}),
}),
},
'/{key}/{tab}/{subtab}': {
element: <StreamDetailView />,
params: t.type({
path: t.type({
tab: t.string,
subtab: t.string,
}),
}),
children: {
'/{key}/{tab}/{subtab}': {
element: <StreamDetailViewContent />,
params: t.type({
path: t.type({
subtab: t.string,
tab: t.string,
}),
}),
},
'/{key}/{tab}': {
element: <StreamDetailViewContent />,
params: t.type({
path: t.type({
tab: t.string,
}),
}),
},
},
},
},
},

View file

@ -301,5 +301,120 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
});
});
describe('Orphaned classic stream', () => {
const ORPHANED_STREAM_NAME = 'logs-orphaned-default';
before(async () => {
const doc = {
message: '2023-01-01T00:00:10.000Z error test',
};
const response = await indexDocument(esClient, ORPHANED_STREAM_NAME, doc);
expect(response.result).to.eql('created');
// PUT the stream to make sure it's a classic stream
await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
path: {
name: ORPHANED_STREAM_NAME,
},
body: {
dashboards: [],
stream: {
ingest: {
lifecycle: { inherit: {} },
processing: [],
unwired: {},
},
},
},
},
});
// delete the underlying data stream
await esClient.indices.deleteDataStream({
name: ORPHANED_STREAM_NAME,
});
});
it('should still be able to fetch the stream', async () => {
const getResponse = await apiClient.fetch('GET /api/streams/{name} 2023-10-31', {
params: {
path: {
name: ORPHANED_STREAM_NAME,
},
},
});
expect(getResponse.status).to.eql(200);
});
it('should still be able to fetch the dashboards for the stream', async () => {
const getResponse = await apiClient.fetch('GET /api/streams/{name}/dashboards 2023-10-31', {
params: {
path: {
name: ORPHANED_STREAM_NAME,
},
},
});
expect(getResponse.status).to.eql(200);
});
it('should still be possible to call _details', async () => {
const getResponse = await apiClient.fetch('GET /internal/streams/{name}/_details', {
params: {
path: {
name: ORPHANED_STREAM_NAME,
},
query: {
start: '2023-01-01T00:00:00.000Z',
end: '2023-01-01T00:00:20.000Z',
},
},
});
expect(getResponse.status).to.eql(200);
});
it('same APIs should return 404 for actually non-existing streams', async () => {
const getStreamResponse = await apiClient.fetch('GET /api/streams/{name} 2023-10-31', {
params: {
path: {
name: 'non-existing-stream',
},
},
});
expect(getStreamResponse.status).to.eql(404);
const getDashboardsResponse = await apiClient.fetch(
'GET /api/streams/{name}/dashboards 2023-10-31',
{
params: {
path: {
name: 'non-existing-stream',
},
},
}
);
expect(getDashboardsResponse.status).to.eql(404);
const getDetailsResponse = await apiClient.fetch('GET /internal/streams/{name}/_details', {
params: {
path: {
name: 'non-existing-stream',
},
query: {
start: '2023-01-01T00:00:00.000Z',
end: '2023-01-01T00:00:20.000Z',
},
},
});
expect(getDetailsResponse.status).to.eql(404);
});
after(async () => {
await apiClient.fetch('DELETE /api/streams/{name} 2023-10-31', {
params: {
path: {
name: ORPHANED_STREAM_NAME,
},
},
});
});
});
});
}