🌊 Streams: Fix listing page for orphaned streams (#217854)

The listing page didn't handle "orphaned" streams properly (classic data
streams which have a configuration on the stream level but the
underlying data stream is not available because it got deleted).

This PR fixes that and adds an integration test for it

<img width="774" alt="Screenshot 2025-04-10 at 16 07 15"
src="https://github.com/user-attachments/assets/da15c56b-7dbd-4070-ab6d-4235132da8ed"
/>

In this picture, `logs-test-default` is orphaned.

To test:
* Create a new classic stream (e.g. via executing
```
POST logs-mylogs-default/_doc
{ "message": "Test" }
```
* Go into the streams UI and add a processor for this stream
* Delete the data stream via stack management or via
```
DELETE _data_stream/logs-mylogs-default
```
* Go to the streams listing page
This commit is contained in:
Joe Reuter 2025-04-14 13:24:33 +02:00 committed by GitHub
parent 574ef298f2
commit f3042efa8f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 96 additions and 36 deletions

View file

@ -495,18 +495,33 @@ export class StreamsClient {
* Lists both managed and unmanaged streams
*/
async listStreams(): Promise<StreamDefinition[]> {
const streams = await this.listStreamsWithDataStreamExistence();
return streams.map((stream) => {
const { data_stream_exists: _, ...rest } = stream;
return rest;
});
}
async listStreamsWithDataStreamExistence(): Promise<
Array<StreamDefinition & { data_stream_exists: boolean }>
> {
const [managedStreams, unmanagedStreams] = await Promise.all([
this.getManagedStreams(),
this.getUnmanagedDataStreams(),
]);
const allDefinitionsById = new Map<string, StreamDefinition>(
managedStreams.map((stream) => [stream.name, stream])
const allDefinitionsById = new Map<string, StreamDefinition & { data_stream_exists: boolean }>(
managedStreams.map((stream) => [stream.name, { ...stream, data_stream_exists: false }])
);
unmanagedStreams.forEach((stream) => {
if (!allDefinitionsById.get(stream.name)) {
allDefinitionsById.set(stream.name, stream);
allDefinitionsById.set(stream.name, { ...stream, data_stream_exists: true });
} else {
allDefinitionsById.set(stream.name, {
...allDefinitionsById.get(stream.name)!,
data_stream_exists: true,
});
}
});

View file

@ -16,7 +16,7 @@ import { getDataStreamLifecycle } from '../../../../lib/streams/stream_crud';
export interface ListStreamDetail {
stream: StreamDefinition;
effective_lifecycle: UnwiredIngestStreamEffectiveLifecycle;
data_stream: estypes.IndicesDataStream;
data_stream?: estypes.IndicesDataStream;
}
export const listStreamsRoute = createServerRoute({
@ -34,20 +34,18 @@ export const listStreamsRoute = createServerRoute({
},
handler: async ({ request, getScopedClients }): Promise<{ streams: ListStreamDetail[] }> => {
const { streamsClient, scopedClusterClient } = await getScopedClients({ request });
const streams = await streamsClient.listStreams();
const streams = await streamsClient.listStreamsWithDataStreamExistence();
const dataStreams = await scopedClusterClient.asCurrentUser.indices.getDataStream({
name: streams.map((stream) => stream.name),
name: streams.filter((stream) => stream.data_stream_exists).map((stream) => stream.name),
});
const enrichedStreams = streams.reduce<ListStreamDetail[]>((acc, stream) => {
const match = dataStreams.data_streams.find((dataStream) => dataStream.name === stream.name);
if (match) {
acc.push({
stream,
effective_lifecycle: getDataStreamLifecycle(match),
data_stream: match,
});
}
acc.push({
stream,
effective_lifecycle: getDataStreamLifecycle(match ?? null),
data_stream: match,
});
return acc;
}, []);

View file

@ -6,14 +6,16 @@
*/
import React from 'react';
import { i18n } from '@kbn/i18n';
import { UnwiredStreamGetResponse } from '@kbn/streams-schema';
import { EuiCallOut, EuiFlexGroup } from '@elastic/eui';
import { UnwiredStreamGetResponse, isUnwiredStreamDefinition } from '@kbn/streams-schema';
import { EuiBadgeGroup, EuiCallOut, EuiFlexGroup } from '@elastic/eui';
import { useStreamsAppParams } from '../../../hooks/use_streams_app_params';
import { RedirectTo } from '../../redirect_to';
import { StreamDetailEnrichment } from '../stream_detail_enrichment';
import { ManagementTabs, Wrapper } from './wrapper';
import { StreamDetailLifecycle } from '../stream_detail_lifecycle';
import { UnmanagedElasticsearchAssets } from './unmanaged_elasticsearch_assets';
import { StreamsAppPageTemplate } from '../../streams_app_page_template';
import { ClassicStreamBadge, LifecycleBadge } from '../../stream_badges';
const classicStreamManagementSubTabs = ['enrich', 'advanced', 'lifecycle'] as const;
@ -36,22 +38,42 @@ export function ClassicStreamDetailManagement({
if (!definition.data_stream_exists) {
return (
<EuiFlexGroup direction="column">
<EuiCallOut
title={i18n.translate('xpack.streams.unmanagedStreamOverview.missingDatastream.title', {
defaultMessage: 'Data stream missing',
})}
color="danger"
iconType="error"
>
<p>
{i18n.translate('xpack.streams.unmanagedStreamOverview.missingDatastream.description', {
defaultMessage:
'The underlying Elasticsearch data stream for this classic stream is missing. Recreate the data stream to restore the stream by sending data before using the management features.',
<>
<StreamsAppPageTemplate.Header
bottomBorder="extended"
pageTitle={
<EuiFlexGroup gutterSize="s" alignItems="center">
{i18n.translate('xpack.streams.entityDetailViewWithoutParams.manageStreamTitle', {
defaultMessage: 'Manage stream {streamId}',
values: { streamId: key },
})}
<EuiBadgeGroup gutterSize="s">
{isUnwiredStreamDefinition(definition.stream) && <ClassicStreamBadge />}
<LifecycleBadge lifecycle={definition.effective_lifecycle} />
</EuiBadgeGroup>
</EuiFlexGroup>
}
/>
<StreamsAppPageTemplate.Body>
<EuiCallOut
title={i18n.translate('xpack.streams.unmanagedStreamOverview.missingDatastream.title', {
defaultMessage: 'Data stream missing',
})}
</p>
</EuiCallOut>
</EuiFlexGroup>
color="danger"
iconType="error"
>
<p>
{i18n.translate(
'xpack.streams.unmanagedStreamOverview.missingDatastream.description',
{
defaultMessage:
'The underlying Elasticsearch data stream for this classic stream is missing. Recreate the data stream to restore the stream by sending data before using the management features.',
}
)}
</p>
</EuiCallOut>
</StreamsAppPageTemplate.Body>
</>
);
}

View file

@ -15,7 +15,7 @@ import {
import { css } from '@emotion/css';
import { i18n } from '@kbn/i18n';
import React, { useMemo } from 'react';
import { IngestStreamGetResponse } from '@kbn/streams-schema';
import { IngestStreamGetResponse, isWiredStreamGetResponse } from '@kbn/streams-schema';
import { computeInterval } from '@kbn/visualization-utils';
import moment, { DurationInputArg1, DurationInputArg2 } from 'moment';
import { useKibana } from '../../../hooks/use_kibana';
@ -143,6 +143,8 @@ export function StreamChartPanel({ definition }: StreamChartPanelProps) {
const docCount = docCountFetch?.value?.details.count;
const formattedDocCount = docCount ? formatNumber(docCount, 'decimal0') : '0';
const dataStreamExists = isWiredStreamGetResponse(definition) || definition.data_stream_exists;
return (
<EuiPanel hasShadow={false} hasBorder>
<EuiFlexGroup
@ -170,7 +172,7 @@ export function StreamChartPanel({ definition }: StreamChartPanelProps) {
data-test-subj="streamsDetailOverviewOpenInDiscoverButton"
iconType="discoverApp"
href={discoverLink}
isDisabled={!discoverLink}
isDisabled={!discoverLink || !dataStreamExists}
>
{i18n.translate('xpack.streams.streamDetailOverview.openInDiscoverButtonLabel', {
defaultMessage: 'Open in Discover',

View file

@ -49,7 +49,9 @@ export function StreamDetailOverview({ definition }: { definition: IngestStreamG
<EuiFlexItem grow>
<EuiFlexGroup direction="row" gutterSize="m">
<EuiFlexItem grow={4}>{definition && <TabsPanel tabs={tabs} />}</EuiFlexItem>
<EuiFlexItem grow={4}>
<TabsPanel tabs={tabs} />
</EuiFlexItem>
<EuiFlexItem grow={8}>
<StreamChartPanel definition={definition} />
</EuiFlexItem>

View file

@ -26,7 +26,7 @@ export function RetentionColumn({ lifecycle }: { lifecycle: IngestStreamEffectiv
const ilmLocator = share.url.locators.get<IlmLocatorParams>(ILM_LOCATOR_ID);
if (isErrorLifecycle(lifecycle)) {
return null;
return <EuiBadge color="hollow">{lifecycle.error.message}</EuiBadge>;
}
if (isIlmLifecycle(lifecycle)) {

View file

@ -76,7 +76,10 @@ export function StreamsTreeTable({
defaultMessage: 'Documents',
}),
width: '40%',
render: (_, item) => <DocumentsColumn indexPattern={item.name} numDataPoints={25} />,
render: (_, item) =>
item.data_stream ? (
<DocumentsColumn indexPattern={item.name} numDataPoints={25} />
) : null,
},
{
field: 'effective_lifecycle',

View file

@ -6,7 +6,7 @@
*/
import expect from '@kbn/expect';
import { asUnwiredStreamGetResponse } from '@kbn/streams-schema';
import { asUnwiredStreamGetResponse, isUnwiredStreamDefinition } from '@kbn/streams-schema';
import { isNotFoundError } from '@kbn/es-errors';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import {
@ -565,6 +565,24 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(getDetailsResponse.status).to.eql(404);
});
it('should still return the stream on public listing API', async () => {
const getResponse = await apiClient.fetch('GET /api/streams 2023-10-31');
expect(getResponse.status).to.eql(200);
const classicStream = getResponse.body.streams.find(
(stream) => stream.name === ORPHANED_STREAM_NAME
);
expect(isUnwiredStreamDefinition(classicStream!)).to.be(true);
});
it('should still return the stream on internal listing API', async () => {
const getResponse = await apiClient.fetch('GET /internal/streams');
expect(getResponse.status).to.eql(200);
const classicStream = getResponse.body.streams.find(
(stream) => stream.stream.name === ORPHANED_STREAM_NAME
);
expect(isUnwiredStreamDefinition(classicStream!.stream)).to.be(true);
});
after(async () => {
await apiClient.fetch('DELETE /api/streams/{name} 2023-10-31', {
params: {