🌊 Streams: Link to streams in Discover (#214052)

Adding a link to the stream into the overview tab of the discover
document flyout:

<img width="228" alt="Screenshot 2025-03-12 at 08 57 48"
src="https://github.com/user-attachments/assets/dfd396e7-b0dc-4cca-a09c-637357cc88f9"
/>

Some reviewer notes:
* This is using the same strategy as the observability AI assistant via
the discover_shared registry - streams is not an observability-only
plugin, but for now we want to treat it like this. If we move closer to
this becoming a main feature, we can probably have discover depend on
streams directly
* For now, it's only showing the entry in the flyout if streams is
enabled so it's easy to test but doesn't show up accidentally. Before
the initial release, we can change this condition to always show for
observability spaces
* Resolving an index name to a data stream needs an Elasticsearch call
to get the index meta data. I created a new internal route for that. It
means that there is a loading state in theory, but in practice it should
resolve really quickly because it only hits the cluster state, not the
actual data.
* Even if no stream can be resolved it still shows the entry in the
flyout with a `-`. This is because it avoids shifting layout and it
doesn't seem to hurt if it's there.
* As I need to link to streams, I started introducing a locator - I'm
sure it will be needed more soon. I didn't add all the possible routes
yet, we can expand it as needed.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Joe Reuter 2025-03-19 09:56:07 +01:00 committed by GitHub
parent 3c3038b855
commit 856b222142
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 381 additions and 4 deletions

View file

@ -23,6 +23,8 @@ export const createGetDocViewer =
'observability-logs-ai-assistant'
);
const streamsFeature = services.discoverShared.features.registry.getById('streams');
return {
...prevDocViewer,
docViewsRegistry: (registry) => {
@ -36,6 +38,7 @@ export const createGetDocViewer =
<UnifiedDocViewerLogsOverview
{...props}
renderAIAssistant={logsAIAssistantFeature?.render}
renderStreamsField={streamsFeature?.renderStreamsField}
/>
),
});

View file

@ -24,6 +24,15 @@ import { FeaturesRegistry } from '../../../common';
* will be shown on the logs-overview preset tab of the UnifiedDocViewer.
*/
export interface StreamsFeatureRenderDeps {
doc: DataTableRecord;
}
export interface StreamsFeature {
id: 'streams';
renderStreamsField: (deps: StreamsFeatureRenderDeps) => JSX.Element;
}
export interface ObservabilityLogsAIAssistantFeatureRenderDeps {
doc: DataTableRecord;
}
@ -62,6 +71,7 @@ export type SecuritySolutionFeature =
// This should be a union of all the available client features.
export type DiscoverFeature =
| StreamsFeature
| ObservabilityLogsAIAssistantFeature
| ObservabilityCreateSLOFeature
| SecuritySolutionFeature;

View file

@ -13,6 +13,7 @@ import { getLogDocumentOverview } from '@kbn/discover-utils';
import { EuiHorizontalRule, EuiSpacer } from '@elastic/eui';
import { ObservabilityLogsAIAssistantFeatureRenderDeps } from '@kbn/discover-shared-plugin/public';
import { getStacktraceFields, LogDocument } from '@kbn/discover-utils/src';
import { StreamsFeatureRenderDeps } from '@kbn/discover-shared-plugin/public/services/discover_features';
import { LogsOverviewHeader } from './logs_overview_header';
import { LogsOverviewHighlights } from './logs_overview_highlights';
import { FieldActionsProvider } from '../../hooks/use_field_actions';
@ -22,6 +23,7 @@ import { LogsOverviewStacktraceSection } from './logs_overview_stacktrace_sectio
export type LogsOverviewProps = DocViewRenderProps & {
renderAIAssistant?: (deps: ObservabilityLogsAIAssistantFeatureRenderDeps) => JSX.Element;
renderStreamsField?: (deps: StreamsFeatureRenderDeps) => JSX.Element;
};
export function LogsOverview({
@ -32,6 +34,7 @@ export function LogsOverview({
onAddColumn,
onRemoveColumn,
renderAIAssistant,
renderStreamsField,
}: LogsOverviewProps) {
const { fieldFormats } = getUnifiedDocViewerServices();
const parsedDoc = getLogDocumentOverview(hit, { dataView, fieldFormats });
@ -49,7 +52,11 @@ export function LogsOverview({
<EuiSpacer size="m" />
<LogsOverviewHeader doc={parsedDoc} />
<EuiHorizontalRule margin="xs" />
<LogsOverviewHighlights formattedDoc={parsedDoc} flattenedDoc={hit.flattened} />
<LogsOverviewHighlights
formattedDoc={parsedDoc}
doc={hit}
renderStreamsField={renderStreamsField}
/>
<LogsOverviewDegradedFields rawDoc={hit.raw} />
{isStacktraceAvailable && <LogsOverviewStacktraceSection hit={hit} dataView={dataView} />}
{LogsOverviewAIAssistant && <LogsOverviewAIAssistant doc={hit} />}

View file

@ -12,6 +12,7 @@ import { CloudProvider, CloudProviderIcon } from '@kbn/custom-icons';
import { first } from 'lodash';
import { i18n } from '@kbn/i18n';
import { DataTableRecord, LogDocumentOverview, fieldConstants } from '@kbn/discover-utils';
import { StreamsFeature } from '@kbn/discover-shared-plugin/public/services/discover_features';
import { HighlightField } from './sub_components/highlight_field';
import { HighlightSection } from './sub_components/highlight_section';
import { getUnifiedDocViewerServices } from '../../plugin';
@ -20,11 +21,14 @@ import { TraceIdHighlightField } from './sub_components/trace_id_highlight_field
export function LogsOverviewHighlights({
formattedDoc,
flattenedDoc,
doc,
renderStreamsField,
}: {
formattedDoc: LogDocumentOverview;
flattenedDoc: DataTableRecord['flattened'];
doc: DataTableRecord;
renderStreamsField?: StreamsFeature['renderStreamsField'];
}) {
const flattenedDoc = doc.flattened;
const {
fieldsMetadata: { useFieldsMetadata },
} = getUnifiedDocViewerServices();
@ -190,6 +194,7 @@ export function LogsOverviewHighlights({
{...getHighlightProps(fieldConstants.DATASTREAM_NAMESPACE_FIELD)}
/>
)}
{renderStreamsField && renderStreamsField({ doc })}
{shouldRenderHighlight(fieldConstants.AGENT_NAME_FIELD) && (
<HighlightField
data-test-subj="unifiedDocViewLogsOverviewLogShipper"

View file

@ -6,7 +6,7 @@
*/
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import { isGroupStreamDefinition } from '@kbn/streams-schema';
import { StreamDefinition, isGroupStreamDefinition } from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { createServerRoute } from '../../../create_server_route';
@ -67,6 +67,42 @@ export const streamDetailRoute = createServerRoute({
},
});
export const resolveIndexRoute = createServerRoute({
endpoint: 'GET /internal/streams/_resolve_index',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
query: z.object({
index: z.string(),
}),
}),
handler: async ({
request,
params,
getScopedClients,
}): Promise<{ stream?: StreamDefinition }> => {
const { scopedClusterClient, streamsClient } = await getScopedClients({ request });
const response = (
await scopedClusterClient.asCurrentUser.indices.get({ index: params.query.index })
)[params.query.index];
const dataStream = response.data_stream;
if (!dataStream) {
return {};
}
const stream = await streamsClient.getStream(dataStream);
return { stream };
},
});
export const internalCrudRoutes = {
...streamDetailRoute,
...resolveIndexRoute,
};

View file

@ -16,6 +16,7 @@ import type { SavedObjectTaggingPluginStart } from '@kbn/saved-objects-tagging-p
import { fieldsMetadataPluginPublicMock } from '@kbn/fields-metadata-plugin/public/mocks';
import { DataStreamsStatsClient } from '@kbn/dataset-quality-plugin/public/services/data_streams_stats/data_streams_stats_client';
import { LicensingPluginStart } from '@kbn/licensing-plugin/public';
import { DiscoverSharedPublicStart } from '@kbn/discover-shared-plugin/public';
import type { StreamsAppKibanaContext } from '../public/hooks/use_kibana';
import { StreamsTelemetryService } from '../public/telemetry/service';
@ -41,6 +42,7 @@ export function getMockStreamsAppContext(): StreamsAppKibanaContext {
savedObjectsTagging: {} as unknown as SavedObjectTaggingPluginStart,
fieldsMetadata: fieldsMetadataPluginPublicMock.createStartContract(),
licensing: {} as unknown as LicensingPluginStart,
discoverShared: {} as unknown as DiscoverSharedPublicStart,
},
},
services: {

View file

@ -13,6 +13,7 @@
"streams",
"data",
"dataViews",
"discoverShared",
"unifiedSearch",
"share",
"savedObjectsTagging",

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { SerializableRecord } from '@kbn/utility-types';
import type { LocatorDefinition, LocatorPublic } from '@kbn/share-plugin/public';
export const STREAMS_APP_LOCATOR = 'STREAMS_APP_LOCATOR';
export interface StreamsAppLocatorParams extends SerializableRecord {
/**
* Optionally set stream ID, if not given it will link to the listing page.
*/
name?: string;
}
export type StreamsAppLocator = LocatorPublic<StreamsAppLocatorParams>;
export class StreamsAppLocatorDefinition implements LocatorDefinition<StreamsAppLocatorParams> {
public readonly id = STREAMS_APP_LOCATOR;
constructor() {}
public readonly getLocation = async (params: StreamsAppLocatorParams) => {
return {
app: 'streams',
path: params.name ? `/${params.name}` : '/',
state: {},
};
};
}

View file

@ -0,0 +1,118 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { DataTableRecord } from '@kbn/discover-utils';
import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api';
import { EuiFlexGroup, EuiTitle, EuiBetaBadge, EuiLoadingSpinner, EuiLink } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import React, { useMemo } from 'react';
import { CoreStart } from '@kbn/core/public';
import { RedirectAppLinks } from '@kbn/shared-ux-link-redirect-app';
import { useStreamsAppFetch } from '../hooks/use_streams_app_fetch';
import { StreamsAppLocator } from '../app_locator';
export interface DiscoverStreamsLinkProps {
doc: DataTableRecord;
streamsRepositoryClient: StreamsRepositoryClient;
coreApplication: CoreStart['application'];
locator: StreamsAppLocator;
}
function DiscoverStreamsLink(props: DiscoverStreamsLinkProps) {
return (
<RedirectAppLinks coreStart={{ application: props.coreApplication }}>
<EuiFlexGroup direction="column" gutterSize="xs" responsive={false}>
<EuiFlexGroup responsive={false} alignItems="center" gutterSize="xs">
<EuiTitle size="xxxs">
<span>
{i18n.translate('xpack.streams.discoverStreamsLink.title', {
defaultMessage: 'Stream',
})}
</span>
</EuiTitle>
<EuiBetaBadge
size="s"
label={i18n.translate('xpack.streams.betaBadgeLabel', {
defaultMessage: 'Streams is currently in tech preview',
})}
color="hollow"
iconType="beaker"
/>
</EuiFlexGroup>
<EuiFlexGroup
responsive={false}
alignItems="center"
justifyContent="flexStart"
gutterSize="xs"
>
<DiscoverStreamsLinkContent {...props} />
</EuiFlexGroup>
</EuiFlexGroup>
</RedirectAppLinks>
);
}
function getFallbackStreamName(flattenedDoc: Record<string, unknown>) {
const wiredStreamName = flattenedDoc['stream.name'];
if (wiredStreamName) {
return String(wiredStreamName);
}
const dsnsType = flattenedDoc['data_stream.type'];
const dsnsDataset = flattenedDoc['data_stream.dataset'];
const dsnsNamespace = flattenedDoc['data_stream.namespace'];
if (dsnsType && dsnsDataset && dsnsNamespace) {
return `${dsnsType}-${dsnsDataset}-${dsnsNamespace}`;
}
return undefined;
}
function DiscoverStreamsLinkContent({
streamsRepositoryClient,
doc,
locator,
}: DiscoverStreamsLinkProps) {
const index = doc.raw._index;
const flattenedDoc = doc.flattened;
const { value, loading, error } = useStreamsAppFetch(
async ({ signal }) => {
if (!index) {
return getFallbackStreamName(flattenedDoc);
}
const definition = await streamsRepositoryClient.fetch(
'GET /internal/streams/_resolve_index',
{
signal,
params: {
query: {
index,
},
},
}
);
return definition?.stream?.name;
},
[streamsRepositoryClient, index],
{ disableToastOnError: true }
);
const params = useMemo(() => ({ name: value }), [value]);
const redirectUrl = useMemo(() => locator.getRedirectUrl(params), [locator, params]);
const empty = <span>-</span>;
if (!index && !value) {
return empty;
}
if (loading) {
return <EuiLoadingSpinner size="s" />;
}
if (error || !value) {
return empty;
}
return <EuiLink href={redirectUrl}>{value}</EuiLink>;
}
// eslint-disable-next-line import/no-default-export
export default DiscoverStreamsLink;

View file

@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StreamsFeatureRenderDeps } from '@kbn/discover-shared-plugin/public/services/discover_features';
import { dynamic } from '@kbn/shared-ux-utility';
import React from 'react';
import { DiscoverStreamsLinkProps } from './discover_streams_link';
export const DiscoverStreamsLink = dynamic(() => import('./discover_streams_link'));
export function createDiscoverStreamsLink(services: Omit<DiscoverStreamsLinkProps, 'doc'>) {
return (props: StreamsFeatureRenderDeps) => <DiscoverStreamsLink {...services} {...props} />;
}

View file

@ -19,6 +19,8 @@ import type {
StreamsApplicationProps,
} from './types';
import { StreamsAppServices } from './services/types';
import { createDiscoverStreamsLink } from './discover_streams_link';
import { StreamsAppLocatorDefinition } from './app_locator';
import { StreamsTelemetryService } from './telemetry/service';
const StreamsApplication = dynamic(() =>
@ -46,6 +48,19 @@ export class StreamsAppPlugin
}
start(coreStart: CoreStart, pluginsStart: StreamsAppStartDependencies): StreamsAppPublicStart {
const locator = new StreamsAppLocatorDefinition();
pluginsStart.share.url.locators.create(locator);
pluginsStart.streams.status$.subscribe((status) => {
if (status.status !== 'enabled') return;
pluginsStart.discoverShared.features.registry.register({
id: 'streams',
renderStreamsField: createDiscoverStreamsLink({
streamsRepositoryClient: pluginsStart.streams.streamsRepositoryClient,
locator: pluginsStart.share.url.locators.get(locator.id)!,
coreApplication: coreStart.application,
}),
});
});
return {
createStreamsApplicationComponent: () => {
return ({ appMountParameters, PageTemplate }: StreamsApplicationProps) => {

View file

@ -21,6 +21,10 @@ import {
} from '@kbn/observability-ai-assistant-plugin/public';
import { AppMountParameters } from '@kbn/core/public';
import { LicensingPluginStart } from '@kbn/licensing-plugin/public';
import {
DiscoverSharedPublicSetup,
DiscoverSharedPublicStart,
} from '@kbn/discover-shared-plugin/public';
/* eslint-disable @typescript-eslint/no-empty-interface*/
export interface ConfigSchema {}
@ -36,6 +40,7 @@ export interface StreamsAppSetupDependencies {
streams: StreamsPluginSetup;
data: DataPublicPluginSetup;
dataViews: DataViewsPublicPluginSetup;
discoverShared: DiscoverSharedPublicSetup;
unifiedSearch: {};
share: SharePublicSetup;
observabilityAIAssistant?: ObservabilityAIAssistantPublicSetup;
@ -50,6 +55,7 @@ export interface StreamsAppStartDependencies {
savedObjectsTagging: SavedObjectTaggingPluginStart;
navigation: NavigationPublicStart;
fieldsMetadata: FieldsMetadataPublicStart;
discoverShared: DiscoverSharedPublicStart;
observabilityAIAssistant?: ObservabilityAIAssistantPublicStart;
licensing: LicensingPluginStart;
}

View file

@ -59,6 +59,9 @@
"@kbn/licensing-plugin",
"@kbn/datemath",
"@kbn/xstate-utils",
"@kbn/utility-types",
"@kbn/discover-utils",
"@kbn/discover-shared-plugin",
"@kbn/core-analytics-browser",
]
}

View file

@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import {
StreamsSupertestRepositoryClient,
createStreamsRepositoryAdminClient,
} from './helpers/repository_client';
import { disableStreams, enableStreams, indexDocument } from './helpers/requests';
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const roleScopedSupertest = getService('roleScopedSupertest');
const esClient = getService('es');
const TEST_STREAM_NAME = 'logs-test-default';
const TEST_INDEX_NAME = 'some-non-datastream-place';
let apiClient: StreamsSupertestRepositoryClient;
describe('Discover integration', () => {
before(async () => {
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
await enableStreams(apiClient);
});
after(async () => {
await disableStreams(apiClient);
await esClient.indices.delete({
index: TEST_INDEX_NAME,
});
await esClient.indices.deleteDataStream({
name: TEST_STREAM_NAME,
});
});
it('endpoint is resolving classic stream properly', async () => {
const doc = {
message: '2023-01-01T00:00:10.000Z error test',
};
const response = await indexDocument(esClient, TEST_STREAM_NAME, doc);
expect(response.result).to.eql('created');
const {
body: { stream },
status,
} = await apiClient.fetch('GET /internal/streams/_resolve_index', {
params: {
query: {
index: response._index,
},
},
});
expect(status).to.eql(200);
expect(stream).to.eql({
name: TEST_STREAM_NAME,
ingest: {
lifecycle: { inherit: {} },
processing: [],
unwired: {},
},
});
});
it('endpoint is resolving wired stream properly', async () => {
const doc = {
message: '2023-01-01T00:00:10.000Z error test',
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
const {
body: { stream },
status,
} = await apiClient.fetch('GET /internal/streams/_resolve_index', {
params: {
query: {
index: response._index,
},
},
});
expect(status).to.eql(200);
expect(stream?.name).to.eql('logs');
});
it('endpoint is returning nothing for regular index', async () => {
const doc = {
message: '2023-01-01T00:00:10.000Z error test',
};
const response = await indexDocument(esClient, TEST_INDEX_NAME, doc);
expect(response.result).to.eql('created');
const {
body: { stream },
status,
} = await apiClient.fetch('GET /internal/streams/_resolve_index', {
params: {
query: {
index: response._index,
},
},
});
expect(status).to.eql(200);
expect(stream).to.eql(undefined);
});
});
}

View file

@ -19,5 +19,6 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext)
loadTestFile(require.resolve('./root_stream'));
loadTestFile(require.resolve('./group_streams'));
loadTestFile(require.resolve('./lifecycle'));
loadTestFile(require.resolve('./discover'));
});
}