🌊 Streams: Handle stale classic streams (#208221)

An invariant we have to handle somehow is if a user made additions to an
unwired data stream via the streams interface, then the underlying data
stream gets deleted.

This is "allowed", since the data stream is not managed by streams.

Currently, the UI breaks if this happens and shows error toasts when
trying to load the doc count or when trying to change processing.

This PR makes this a regular case the API can handle:
* The `GET /api/streams/<id>` endpoint does not throw, but still returns
the existing definition. A new key `data_stream_exists` indicates
whether we are in this situation
* The UI clearly communicates to the user and doesn't try to do
additional requests

<img width="838" alt="Screenshot 2025-01-24 at 16 42 23"
src="https://github.com/user-attachments/assets/92cc5a82-2bb2-4d66-b47a-057ddcff5888"
/>

Trying to update ingest via the API will still result in an error.

Another weird behavior related to that was that if a dashboard is linked
to a stale classic stream, no definition is ever saved and the stream
disappears from the list when deleted, making the dashboard link
inaccessible. This PR introduces `ensureStream` which is called by the
dashboard APIs and makes sure the definition is there if dashboard links
exist. As a side effect, this makes sure that a user can't add dashboard
links to a stream they don't have access to - IMHO we should have done
that from the start.

This does not touch wired streams - for those, the data stream getting
deleted is a breach of contract. We should still handle it gracefully,
but in this case I think we should go another route and offer a button
in the UI to use the "resync" API to reconcile the state of the streams
layer and Elasticsearch. I will look into this on a separate PR.
This commit is contained in:
Joe Reuter 2025-01-30 14:56:22 +01:00 committed by GitHub
parent b655d78773
commit 1b663384fe
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 172 additions and 65 deletions

View file

@ -81,6 +81,7 @@ interface WiredStreamGetResponse extends StreamGetResponseBase {
interface UnwiredStreamGetResponse extends StreamGetResponseBase {
stream: Omit<UnwiredStreamDefinition, 'name'>;
elasticsearch_assets: ElasticsearchAsset[];
data_stream_exists: boolean;
effective_lifecycle: UnwiredIngestStreamEffectiveLifecycle;
}
@ -133,6 +134,7 @@ const unwiredStreamGetResponseSchema: z.Schema<UnwiredStreamGetResponse> = z.int
z.object({
stream: unwiredStreamDefinitionSchemaBase,
elasticsearch_assets: z.array(elasticsearchAssetSchema),
data_stream_exists: z.boolean(),
effective_lifecycle: unwiredIngestStreamEffectiveLifecycleSchema,
})
);
@ -149,7 +151,7 @@ const isWiredStreamGetResponse = createIsNarrowSchema(
const isUnWiredStreamGetResponse = createIsNarrowSchema(
ingestStreamGetResponseSchema,
wiredStreamGetResponseSchema
unwiredStreamGetResponseSchema
);
const asWiredStreamGetResponse = createAsSchemaOrThrow(

View file

@ -21,6 +21,12 @@ export interface IngestStreamLifecycleILM {
};
}
export interface IngestStreamLifecycleError {
error: {
message: string;
};
}
export interface IngestStreamLifecycleInherit {
inherit: {};
}
@ -38,6 +44,7 @@ export type WiredIngestStreamEffectiveLifecycle = IngestStreamLifecycle & { from
export type UnwiredIngestStreamEffectiveLifecycle =
| IngestStreamLifecycle
| IngestStreamLifecycleError
| IngestStreamLifecycleDisabled;
export type IngestStreamEffectiveLifecycle =
@ -50,6 +57,7 @@ const dslLifecycleSchema = z.object({
const ilmLifecycleSchema = z.object({ ilm: z.object({ policy: NonEmptyString }) });
const inheritLifecycleSchema = z.object({ inherit: z.strictObject({}) });
const disabledLifecycleSchema = z.object({ disabled: z.strictObject({}) });
const errorLifecycleSchema = z.object({ error: z.strictObject({ message: NonEmptyString }) });
export const ingestStreamLifecycleSchema: z.Schema<IngestStreamLifecycle> = z.union([
dslLifecycleSchema,
@ -71,6 +79,11 @@ export const isDslLifecycle = createIsNarrowSchema(
dslLifecycleSchema
);
export const isErrorLifecycle = createIsNarrowSchema(
unwiredIngestStreamEffectiveLifecycleSchema,
errorLifecycleSchema
);
export const isIlmLifecycle = createIsNarrowSchema(
ingestStreamEffectiveLifecycleSchema,
ilmLifecycleSchema

View file

@ -13,8 +13,8 @@ import {
UnwiredStreamDefinition,
WiredIngestStreamEffectiveLifecycle,
WiredStreamDefinition,
ingestStreamLifecycleSchema,
inheritedFieldDefinitionSchema,
unwiredIngestStreamEffectiveLifecycleSchema,
unwiredStreamDefinitionSchema,
wiredIngestStreamEffectiveLifecycleSchema,
wiredStreamDefinitionSchema,
@ -40,6 +40,7 @@ interface WiredReadStreamDefinition extends ReadStreamDefinitionBase {
interface UnwiredReadStreamDefinition extends ReadStreamDefinitionBase {
stream: UnwiredStreamDefinition;
data_stream_exists: boolean;
effective_lifecycle: UnwiredIngestStreamEffectiveLifecycle;
}
@ -64,7 +65,8 @@ const unwiredReadStreamDefinitionSchema: z.Schema<UnwiredReadStreamDefinition> =
readStreamDefinitionSchemaBase,
z.object({
stream: unwiredStreamDefinitionSchema,
effective_lifecycle: ingestStreamLifecycleSchema,
data_stream_exists: z.boolean(),
effective_lifecycle: unwiredIngestStreamEffectiveLifecycleSchema,
})
);

View file

@ -501,6 +501,27 @@ export class StreamsClient {
return { acknowledged: true, result: 'created' };
}
/**
* Make sure there is a stream definition for a given stream.
* If the data stream exists but the stream definition does not, it creates an empty stream definition.
* If the stream definition exists, it is a noop.
* If the data stream does not exist or the user does not have access, it throws.
*/
async ensureStream(name: string): Promise<void> {
const [streamDefinition, dataStream] = await Promise.all([
this.getStoredStreamDefinition(name).catch((error) => {
if (isElasticsearch404(error)) {
return undefined;
}
throw error;
}),
this.getDataStream(name),
]);
if (dataStream && !streamDefinition) {
await this.updateStoredStream(this.getDataStreamAsIngestStream(dataStream));
}
}
/**
* Returns a stream definition for the given name:
* - if a wired stream definition exists
@ -513,7 +534,26 @@ export class StreamsClient {
* - the user does not have access to the stream
*/
async getStream(name: string): Promise<StreamDefinition> {
const definition = await Promise.all([
const definition = await this.getStoredStreamDefinition(name)
.catch(async (error) => {
if (isElasticsearch404(error)) {
const dataStream = await this.getDataStream(name);
return this.getDataStreamAsIngestStream(dataStream);
}
throw error;
})
.catch(async (error) => {
if (isElasticsearch404(error)) {
throw new DefinitionNotFoundError(`Cannot find stream ${name}`);
}
throw error;
});
return definition;
}
private async getStoredStreamDefinition(name: string): Promise<StreamDefinition> {
return await Promise.all([
this.dependencies.storageClient.get({ id: name }).then((response) => {
const source = response._source;
assertsSchema(streamDefinitionSchema, source);
@ -526,25 +566,9 @@ export class StreamsClient {
}
}
),
])
.then(([wiredDefinition]) => {
return wiredDefinition;
})
.catch(async (error) => {
if (isElasticsearch404(error)) {
const dataStream = await this.getDataStream(name);
return await this.getDataStreamAsIngestStream(dataStream);
}
throw error;
})
.catch(async (error) => {
if (isElasticsearch404(error)) {
throw new DefinitionNotFoundError(`Cannot find stream ${name}`);
}
throw error;
});
return definition;
]).then(([wiredDefinition]) => {
return wiredDefinition;
});
}
async getDataStream(name: string): Promise<IndicesDataStream> {
@ -560,9 +584,7 @@ export class StreamsClient {
* Creates an on-the-fly ingest stream definition
* from a concrete data stream.
*/
private async getDataStreamAsIngestStream(
dataStream: IndicesDataStream
): Promise<UnwiredStreamDefinition> {
private getDataStreamAsIngestStream(dataStream: IndicesDataStream): UnwiredStreamDefinition {
const definition: UnwiredStreamDefinition = {
name: dataStream.name,
ingest: {

View file

@ -32,8 +32,15 @@ interface DeleteStreamParams extends BaseParams {
}
export function getDataStreamLifecycle(
dataStream: IndicesDataStream
dataStream: IndicesDataStream | null
): UnwiredIngestStreamEffectiveLifecycle {
if (!dataStream) {
return {
error: {
message: 'Data stream not found',
},
};
}
if (
dataStream.ilm_policy &&
(!dataStream.lifecycle || typeof dataStream.prefer_ilm === 'undefined' || dataStream.prefer_ilm)

View file

@ -92,14 +92,16 @@ const linkDashboardRoute = createServerRoute({
dashboardId: z.string(),
}),
}),
handler: async ({ params, request, assets }): Promise<LinkDashboardResponse> => {
const assetsClient = await assets.getClientWithRequest({ request });
handler: async ({ params, request, getScopedClients }): Promise<LinkDashboardResponse> => {
const { assetClient, streamsClient } = await getScopedClients({ request });
const {
path: { dashboardId, id: streamId },
} = params;
await assetsClient.linkAsset({
await streamsClient.ensureStream(streamId);
await assetClient.linkAsset({
entityId: streamId,
entityType: 'stream',
assetId: dashboardId,
@ -209,15 +211,22 @@ const bulkDashboardsRoute = createServerRoute({
),
}),
}),
handler: async ({ params, request, assets, logger }): Promise<BulkUpdateAssetsResponse> => {
const assetsClient = await assets.getClientWithRequest({ request });
handler: async ({
params,
request,
getScopedClients,
logger,
}): Promise<BulkUpdateAssetsResponse> => {
const { assetClient, streamsClient } = await getScopedClients({ request });
const {
path: { id: streamId },
body: { operations },
} = params;
const result = await assetsClient.bulk(
await streamsClient.ensureStream(streamId);
const result = await assetClient.bulk(
{
entityId: streamId,
entityType: 'stream',

View file

@ -40,16 +40,24 @@ export async function readStream({
assetType: 'dashboard',
}),
streamsClient.getAncestors(name),
streamsClient.getDataStream(name),
streamsClient.getDataStream(name).catch((e) => {
if (e.statusCode === 404) {
return null;
}
throw e;
}),
]);
if (isUnwiredStreamDefinition(streamDefinition)) {
return {
stream: omit(streamDefinition, 'name'),
elasticsearch_assets: await getUnmanagedElasticsearchAssets({
dataStream,
scopedClusterClient,
}),
elasticsearch_assets: dataStream
? await getUnmanagedElasticsearchAssets({
dataStream,
scopedClusterClient,
})
: [],
data_stream_exists: !!dataStream,
effective_lifecycle: getDataStreamLifecycle(dataStream),
dashboards,
inherited_fields: {},

View file

@ -13,6 +13,7 @@ import {
IngestStreamEffectiveLifecycle,
ReadStreamDefinition,
isDslLifecycle,
isErrorLifecycle,
isIlmLifecycle,
isUnwiredStreamDefinition,
} from '@kbn/streams-schema';
@ -171,6 +172,16 @@ function LifecycleBadge({ lifecycle }: { lifecycle: IngestStreamEffectiveLifecyc
);
}
if (isErrorLifecycle(lifecycle)) {
return (
<EuiBadge color="hollow">
{i18n.translate('xpack.streams.entityDetailViewWithoutParams.errorBadgeLabel', {
defaultMessage: 'Error: {message}',
values: { message: lifecycle.error.message },
})}
</EuiBadge>
);
}
if (isDslLifecycle(lifecycle)) {
return (
<EuiBadge color="hollow">

View file

@ -6,13 +6,13 @@
*/
import React from 'react';
import { i18n } from '@kbn/i18n';
import { ReadStreamDefinition } from '@kbn/streams-schema';
import { EuiFlexGroup, EuiListGroup, EuiText } from '@elastic/eui';
import { UnwiredReadStreamDefinition } from '@kbn/streams-schema';
import { EuiCallOut, EuiFlexGroup, EuiListGroup, EuiText } from '@elastic/eui';
import { useStreamsAppParams } from '../../hooks/use_streams_app_params';
import { RedirectTo } from '../redirect_to';
import { StreamDetailEnrichment } from '../stream_detail_enrichment';
import { useKibana } from '../../hooks/use_kibana';
import { Wrapper } from './wrapper';
import { ManagementTabs, Wrapper } from './wrapper';
type ManagementSubTabs = 'enrich' | 'overview';
@ -24,29 +24,32 @@ export function ClassicStreamDetailManagement({
definition,
refreshDefinition,
}: {
definition: ReadStreamDefinition;
definition: UnwiredReadStreamDefinition;
refreshDefinition: () => void;
}) {
const {
path: { key, subtab },
} = useStreamsAppParams('/{key}/management/{subtab}');
const tabs = {
const tabs: ManagementTabs = {
overview: {
content: <UnmanagedStreamOverview definition={definition} />,
label: i18n.translate('xpack.streams.streamDetailView.overviewTab', {
defaultMessage: 'Overview',
}),
},
enrich: {
};
if (definition.data_stream_exists) {
tabs.enrich = {
content: (
<StreamDetailEnrichment definition={definition} refreshDefinition={refreshDefinition} />
),
label: i18n.translate('xpack.streams.streamDetailView.enrichmentTab', {
defaultMessage: 'Extract field',
}),
},
};
};
}
if (!isValidManagementSubTab(subtab)) {
return (
@ -60,7 +63,7 @@ export function ClassicStreamDetailManagement({
return <Wrapper tabs={tabs} streamId={key} subtab={subtab} />;
}
function UnmanagedStreamOverview({ definition }: { definition: ReadStreamDefinition }) {
function UnmanagedStreamOverview({ definition }: { definition: UnwiredReadStreamDefinition }) {
const {
core: {
http: { basePath },
@ -74,6 +77,24 @@ function UnmanagedStreamOverview({ definition }: { definition: ReadStreamDefinit
}
return acc;
}, {} as Record<string, Array<{ type: string; id: string }>>);
if (!definition.data_stream_exists) {
return (
<EuiCallOut
title={i18n.translate('xpack.streams.unmanagedStreamOverview.missingDatastream.title', {
defaultMessage: 'Data stream missing',
})}
color="danger"
iconType="error"
>
<p>
{i18n.translate('xpack.streams.unmanagedStreamOverview.missingDatastream.description', {
defaultMessage:
'The underlying Elasticsearch data stream for this classic stream is missing. Recreate the data stream to restore the stream by sending data before using the management features.',
})}{' '}
</p>
</EuiCallOut>
);
}
return (
<EuiFlexGroup direction="column" gutterSize="m">
<EuiText>

View file

@ -10,12 +10,20 @@ import React from 'react';
import { css } from '@emotion/css';
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
export type ManagementTabs = Record<
string,
{
content: JSX.Element;
label: string;
}
>;
export function Wrapper({
tabs,
streamId,
subtab,
}: {
tabs: Record<string, { content: JSX.Element; label: string }>;
tabs: ManagementTabs;
streamId: string;
subtab: string;
}) {
@ -28,22 +36,24 @@ export function Wrapper({
max-width: 100%;
`}
>
<EuiFlexItem grow={false}>
<EuiButtonGroup
legend="Management tabs"
idSelected={subtab}
onChange={(optionId) => {
router.push('/{key}/management/{subtab}', {
path: { key: streamId, subtab: optionId },
query: {},
});
}}
options={Object.keys(tabs).map((id) => ({
id,
label: tabs[id].label,
}))}
/>
</EuiFlexItem>
{Object.keys(tabs).length > 1 && (
<EuiFlexItem grow={false}>
<EuiButtonGroup
legend="Management tabs"
idSelected={subtab}
onChange={(optionId) => {
router.push('/{key}/management/{subtab}', {
path: { key: streamId, subtab: optionId },
query: {},
});
}}
options={Object.keys(tabs).map((id) => ({
id,
label: tabs[id].label,
}))}
/>
</EuiFlexItem>
)}
<EuiFlexItem
className={css`
overflow: auto;

View file

@ -21,6 +21,7 @@ import React, { useMemo } from 'react';
import { css } from '@emotion/css';
import {
ReadStreamDefinition,
isUnwiredReadStream,
isWiredReadStream,
isWiredStreamDefinition,
} from '@kbn/streams-schema';
@ -130,7 +131,7 @@ export function StreamDetailOverview({ definition }: { definition?: ReadStreamDe
const docCountFetch = useStreamsAppFetch(
async ({ signal }) => {
if (!definition) {
if (!definition || (isUnwiredReadStream(definition) && !definition.data_stream_exists)) {
return undefined;
}
return streamsRepositoryClient.fetch('GET /api/streams/{id}/_details', {

View file

@ -67,6 +67,7 @@ export function StreamDetailView() {
inherited_fields: {},
effective_lifecycle: response.effective_lifecycle,
name: key,
data_stream_exists: response.data_stream_exists,
stream: {
name: key,
...response.stream,