[Streams 🌊] Improve definition narrowing and reduntant requests (#215897)

## 📓 Summary

These changes lift the check against the definition existence and
narrows its value for the react context consumers.

It also fixes reduntant requests for the AI connectors used for the grok
parsing suggestions.

@flash1293 I'd expect to use the AI capabilities across more places for
the enrichment experience, we should probably lift the AI capabilities
as part of the page initialization at a certain point, although it's not
needed yet 👌
This commit is contained in:
Marco Antonio Ghiani 2025-04-01 16:10:57 +02:00 committed by GitHub
parent d35b60896d
commit 13b536aed8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 80 additions and 92 deletions

View file

@ -19,6 +19,7 @@ import { LicensingPluginStart } from '@kbn/licensing-plugin/public';
import { IndexManagementPluginStart } from '@kbn/index-management-shared-types';
import { IngestPipelinesPluginStart } from '@kbn/ingest-pipelines-plugin/public';
import { DiscoverSharedPublicStart } from '@kbn/discover-shared-plugin/public';
import { ObservabilityAIAssistantPublicStart } from '@kbn/observability-ai-assistant-plugin/public';
import type { StreamsAppKibanaContext } from '../public/hooks/use_kibana';
import { StreamsTelemetryService } from '../public/telemetry/service';
@ -47,6 +48,7 @@ export function getMockStreamsAppContext(): StreamsAppKibanaContext {
indexManagement: {} as unknown as IndexManagementPluginStart,
ingestPipelines: {} as unknown as IngestPipelinesPluginStart,
discoverShared: {} as unknown as DiscoverSharedPublicStart,
observabilityAIAssistant: {} as unknown as ObservabilityAIAssistantPublicStart,
},
},
services: {

View file

@ -10,26 +10,23 @@
"browser": true,
"configPath": ["xpack", "streamsApp"],
"requiredPlugins": [
"streams",
"data",
"datasetQuality",
"dataViews",
"discoverShared",
"unifiedSearch",
"share",
"savedObjectsTagging",
"navigation",
"fieldsMetadata",
"datasetQuality",
"licensing",
"indexManagement",
"ingestPipelines",
"navigation",
"observabilityAIAssistant",
"savedObjectsTagging",
"share",
"streams",
"unifiedSearch",
],
"requiredBundles": [
"kibanaReact"
],
"optionalPlugins": [
"observabilityAIAssistant"
],
"extraPublicDirs": []
}
}

View file

@ -16,7 +16,7 @@ const StreamDetailEnrichmentContent = dynamic(() =>
);
interface StreamDetailEnrichmentProps {
definition?: IngestStreamGetResponse;
definition: IngestStreamGetResponse;
refreshDefinition: () => void;
}
@ -24,8 +24,6 @@ export function StreamDetailEnrichment({
definition,
refreshDefinition,
}: StreamDetailEnrichmentProps) {
if (!definition) return null;
if (isRootStreamDefinition(definition.stream)) {
return <RootStreamEmptyPrompt />;
}

View file

@ -24,11 +24,12 @@ import {
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { useWatch, useFormContext } from 'react-hook-form';
import { FlattenRecord, IngestStreamGetResponse } from '@kbn/streams-schema';
import { FlattenRecord } from '@kbn/streams-schema';
import type { FindActionResult } from '@kbn/actions-plugin/server';
import { UseGenAIConnectorsResult } from '@kbn/observability-ai-assistant-plugin/public/hooks/use_genai_connectors';
import { useAbortController, useBoolean } from '@kbn/react-hooks';
import useObservable from 'react-use/lib/useObservable';
import { isEmpty } from 'lodash';
import { css } from '@emotion/css';
import { useStreamDetail } from '../../../../../hooks/use_stream_detail';
import { useKibana } from '../../../../../hooks/use_kibana';
@ -128,15 +129,16 @@ const RefreshButton = ({
);
};
function useAiEnabled() {
function useAIFeatures() {
const { dependencies, core } = useKibana();
const { observabilityAIAssistant, licensing } = dependencies.start;
const aiAssistantEnabled = observabilityAIAssistant?.service.isEnabled();
const aiAssistantEnabled = observabilityAIAssistant.service.isEnabled();
const genAiConnectors = observabilityAIAssistant?.useGenAIConnectors();
const genAiConnectors = observabilityAIAssistant.useGenAIConnectors();
const aiEnabled = aiAssistantEnabled && (genAiConnectors?.connectors || []).length > 0;
const aiEnabled =
aiAssistantEnabled && genAiConnectors.connectors && !isEmpty(genAiConnectors.connectors);
const currentLicense = useObservable(licensing.license$);
@ -146,15 +148,16 @@ function useAiEnabled() {
return {
enabled: aiEnabled,
couldBeEnabled,
genAiConnectors,
};
}
function InnerGrokAiSuggestions({
previewDocuments,
definition,
genAiConnectors,
}: {
previewDocuments: FlattenRecord[];
definition: IngestStreamGetResponse;
genAiConnectors: UseGenAIConnectorsResult;
}) {
const {
dependencies,
@ -162,13 +165,12 @@ function InnerGrokAiSuggestions({
} = useKibana();
const {
streams: { streamsRepositoryClient },
observabilityAIAssistant,
} = dependencies.start;
const { definition } = useStreamDetail();
const fieldValue = useWatch<ProcessorFormState, 'field'>({ name: 'field' });
const form = useFormContext<GrokFormState>();
const genAiConnectors = observabilityAIAssistant?.useGenAIConnectors();
const currentConnector = genAiConnectors?.selectedConnector;
const [isLoadingSuggestions, setSuggestionsLoading] = useState(false);
@ -387,8 +389,7 @@ export function GrokAiSuggestions() {
const {
core: { http },
} = useKibana();
const { enabled: isAiEnabled, couldBeEnabled } = useAiEnabled();
const { definition } = useStreamDetail();
const { enabled: isAiEnabled, couldBeEnabled, genAiConnectors } = useAIFeatures();
const previewDocuments = useSimulatorSelector((snapshot) =>
selectPreviewDocuments(snapshot.context)
);
@ -412,18 +413,18 @@ export function GrokAiSuggestions() {
>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.aiAssistantNotEnabled',
{
defaultMessage: 'Enable AI Assistant features',
}
{ defaultMessage: 'Enable AI Assistant features' }
)}
</EuiLink>
</EuiToolTip>
);
}
if (!isAiEnabled || !definition) {
if (!isAiEnabled) {
return null;
}
return <InnerGrokAiSuggestions definition={definition} previewDocuments={previewDocuments} />;
return (
<InnerGrokAiSuggestions previewDocuments={previewDocuments} genAiConnectors={genAiConnectors} />
);
}

View file

@ -16,16 +16,12 @@ export type DataStreamStats = DataStreamStatServiceResponse['dataStreamsStats'][
bytesPerDay: number;
};
export const useDataStreamStats = ({ definition }: { definition?: IngestStreamGetResponse }) => {
export const useDataStreamStats = ({ definition }: { definition: IngestStreamGetResponse }) => {
const {
services: { dataStreamsClient },
} = useKibana();
const statsFetch = useStreamsAppFetch(async () => {
if (!definition) {
return;
}
const client = await dataStreamsClient;
const {
dataStreamsStats: [dsStats],

View file

@ -47,7 +47,7 @@ export const useIngestionRate = ({
stats,
timeRange,
}: {
definition?: IngestStreamGetResponse;
definition: IngestStreamGetResponse;
stats?: DataStreamStats;
timeRange: TimeRange;
}) => {
@ -60,7 +60,7 @@ export const useIngestionRate = ({
const ingestionRateFetch = useStreamsAppFetch(
async ({ signal }) => {
if (!definition || !stats) {
if (!stats) {
return;
}
@ -146,7 +146,7 @@ export const useIngestionRatePerTier = ({
stats,
timeRange,
}: {
definition?: IngestStreamGetResponse;
definition: IngestStreamGetResponse;
stats?: DataStreamStats;
timeRange: TimeRange;
}) => {
@ -164,7 +164,7 @@ export const useIngestionRatePerTier = ({
const ingestionRateFetch = useStreamsAppFetch(
async ({ signal }) => {
if (!definition || !stats) {
if (!stats) {
return;
}

View file

@ -55,8 +55,6 @@ export function IlmSummary({
const { value, loading, error } = useStreamsAppFetch(
({ signal }) => {
if (!definition) return;
return streamsRepositoryClient.fetch('GET /internal/streams/{name}/lifecycle/_stats', {
params: { path: { name: definition.stream.name } },
signal,

View file

@ -36,15 +36,13 @@ function useLifecycleState({
definition,
isServerless,
}: {
definition?: IngestStreamGetResponse;
definition: IngestStreamGetResponse;
isServerless: boolean;
}) {
const [updateInProgress, setUpdateInProgress] = useState(false);
const [openEditModal, setOpenEditModal] = useState<LifecycleEditAction>('none');
const lifecycleActions = useMemo(() => {
if (!definition) return [];
const actions: Array<{ name: string; action: LifecycleEditAction }> = [];
const isWired = isWiredStreamGetResponse(definition);
const isUnwired = isUnwiredStreamGetResponse(definition);
@ -93,7 +91,7 @@ export function StreamDetailLifecycle({
definition,
refreshDefinition,
}: {
definition?: IngestStreamGetResponse;
definition: IngestStreamGetResponse;
refreshDefinition: () => void;
}) {
const {
@ -124,10 +122,6 @@ export function StreamDetailLifecycle({
const { signal } = useAbortController();
if (!definition) {
return null;
}
const ilmLocator = share.url.locators.get<IlmLocatorParams>(ILM_LOCATOR_ID);
const getIlmPolicies = () =>

View file

@ -42,7 +42,7 @@ export function IngestionRate({
isLoadingStats,
refreshStats,
}: {
definition?: IngestStreamGetResponse;
definition: IngestStreamGetResponse;
stats?: DataStreamStats;
isLoadingStats: boolean;
refreshStats: () => void;
@ -100,7 +100,7 @@ export function IngestionRate({
direction="column"
gutterSize="xs"
>
{!definition ? null : isIlmLifecycle(definition?.effective_lifecycle) ? (
{isIlmLifecycle(definition.effective_lifecycle) ? (
<ChartBarSeries
definition={definition}
stats={stats}
@ -126,7 +126,7 @@ function ChartAreaSeries({
timeRange,
isLoadingStats,
}: {
definition?: IngestStreamGetResponse;
definition: IngestStreamGetResponse;
stats?: DataStreamStats;
timeRange: TimeRange;
isLoadingStats: boolean;
@ -140,7 +140,7 @@ function ChartAreaSeries({
return ingestionRateError ? (
'Failed to load ingestion rate'
) : !definition || isLoadingStats || isLoadingIngestionRate || !ingestionRate ? (
) : isLoadingStats || isLoadingIngestionRate || !ingestionRate ? (
<EuiLoadingChart />
) : (
<>
@ -191,7 +191,7 @@ function ChartBarSeries({
timeRange,
isLoadingStats,
}: {
definition?: IngestStreamGetResponse;
definition: IngestStreamGetResponse;
stats?: DataStreamStats;
timeRange: TimeRange;
isLoadingStats: boolean;
@ -206,7 +206,7 @@ function ChartBarSeries({
return ingestionRateError ? (
'Failed to load ingestion rate'
) : !definition || isLoadingStats || isLoadingIngestionRate || !ingestionRate ? (
) : isLoadingStats || isLoadingIngestionRate || !ingestionRate ? (
<EuiLoadingChart />
) : (
<>

View file

@ -13,13 +13,9 @@ export function StreamDetailManagement({
definition,
refreshDefinition,
}: {
definition?: IngestStreamGetResponse;
definition: IngestStreamGetResponse;
refreshDefinition: () => void;
}) {
if (!definition) {
return null;
}
if (isWiredStreamGetResponse(definition)) {
return (
<WiredStreamDetailManagement definition={definition} refreshDefinition={refreshDefinition} />

View file

@ -25,7 +25,7 @@ export function WiredStreamDetailManagement({
definition,
refreshDefinition,
}: {
definition?: WiredStreamGetResponse;
definition: WiredStreamGetResponse;
refreshDefinition: () => void;
}) {
const {

View file

@ -20,7 +20,7 @@ export function useRoutingState({
definition,
toasts,
}: {
definition?: WiredStreamGetResponse;
definition: WiredStreamGetResponse;
toasts: IToasts;
}) {
const [lastDisplayedToast, setLastDisplayedToast] = React.useState<Toast | undefined>();
@ -40,14 +40,14 @@ export function useRoutingState({
// Child streams: either represents the child streams as they are, or the new order from drag and drop.
const [childStreams, setChildStreams] = React.useState<
WiredStreamGetResponse['stream']['ingest']['wired']['routing']
>(definition?.stream.ingest.wired.routing ?? []);
>(definition.stream.ingest.wired.routing ?? []);
useEffect(() => {
setChildStreams(definition?.stream.ingest.wired.routing ?? []);
setChildStreams(definition.stream.ingest.wired.routing ?? []);
}, [definition]);
// Note: just uses reference equality to check if the order has changed as onChildStreamReorder will create a new array.
const hasChildStreamsOrderChanged = childStreams !== definition?.stream.ingest.wired.routing;
const hasChildStreamsOrderChanged = childStreams !== definition.stream.ingest.wired.routing;
// Child stream currently being dragged
const [draggingChildStream, setDraggingChildStream] = React.useState<string | undefined>();
@ -73,8 +73,8 @@ export function useRoutingState({
const cancelChanges = useCallback(() => {
setChildUnderEdit(undefined);
setChildStreams(definition?.stream.ingest.wired.routing ?? []);
}, [definition?.stream.ingest.wired.routing]);
setChildStreams(definition.stream.ingest.wired.routing);
}, [definition.stream.ingest.wired.routing]);
const debouncedChildUnderEdit = useDebounced(childUnderEdit, 300);

View file

@ -27,7 +27,7 @@ export function StreamDetailRouting({
definition,
refreshDefinition,
}: {
definition?: WiredStreamGetResponse;
definition: WiredStreamGetResponse;
refreshDefinition: () => void;
}) {
const { appParams, core } = useKibana();
@ -61,10 +61,6 @@ export function StreamDetailRouting({
openConfirm: core.overlays.openConfirm,
});
if (!definition) {
return null;
}
const closeModal = () => routingAppState.setShowDeleteModal(false);
return (

View file

@ -11,16 +11,11 @@ import { SchemaEditor } from '../schema_editor';
import { useSchemaFields } from '../schema_editor/hooks/use_schema_fields';
interface SchemaEditorProps {
definition?: WiredStreamGetResponse;
definition: WiredStreamGetResponse;
refreshDefinition: () => void;
}
export function StreamDetailSchemaEditor(props: SchemaEditorProps) {
if (!props.definition) return null;
return <Content definition={props.definition} {...props} />;
}
const Content = ({ definition, refreshDefinition }: Required<SchemaEditorProps>) => {
export const StreamDetailSchemaEditor = ({ definition, refreshDefinition }: SchemaEditorProps) => {
const { loading } = useStreamDetail();
const { fields, isLoadingUnmappedFields, refreshFields, unmapField, updateField } =

View file

@ -17,7 +17,7 @@ import { useDashboardsFetch } from '../../hooks/use_dashboards_fetch';
export function StreamDetailDashboardsView({
definition,
}: {
definition?: IngestStreamGetResponse;
definition: IngestStreamGetResponse;
}) {
const [query, setQuery] = useState('');

View file

@ -23,7 +23,7 @@ import { StreamStatsPanel } from './components/stream_stats_panel';
import { StreamChartPanel } from './components/stream_chart_panel';
import { TabsPanel } from './components/tabs_panel';
export function StreamDetailOverview({ definition }: { definition?: IngestStreamGetResponse }) {
export function StreamDetailOverview({ definition }: { definition: IngestStreamGetResponse }) {
const {
dependencies: {
start: {

View file

@ -12,6 +12,7 @@ import {
isWiredStreamGetResponse,
isUnwiredStreamGetResponse,
} from '@kbn/streams-schema';
import { EuiFlexGroup, EuiLoadingSpinner } from '@elastic/eui';
import { useStreamsAppFetch } from './use_streams_app_fetch';
export interface StreamDetailContextProviderProps {
@ -20,7 +21,7 @@ export interface StreamDetailContextProviderProps {
}
export interface StreamDetailContextValue {
definition?: IngestStreamGetResponse;
definition: IngestStreamGetResponse;
loading: boolean;
refresh: () => void;
}
@ -59,10 +60,24 @@ export function StreamDetailContextProvider({
);
const context = React.useMemo(
() => ({ definition, loading, refresh }),
// useMemo cannot be used conditionally after the definition narrowing, the assertion is to narrow correctly the context value
() => ({ definition, loading, refresh } as StreamDetailContextValue),
[definition, loading, refresh]
);
// Display loading spinner for first data-fetching only to have SWR-like behaviour
if (!definition && loading) {
return (
<EuiFlexGroup justifyContent="center" alignItems="center">
<EuiLoadingSpinner size="xxl" />
</EuiFlexGroup>
);
}
if (!definition) {
return null;
}
return <StreamDetailContext.Provider value={context}>{children}</StreamDetailContext.Provider>;
}

View file

@ -39,29 +39,29 @@ export interface StreamsApplicationProps {
export type StreamsApplicationComponentType = React.FC<StreamsApplicationProps>;
export interface StreamsAppSetupDependencies {
streams: StreamsPluginSetup;
data: DataPublicPluginSetup;
dataViews: DataViewsPublicPluginSetup;
discoverShared: DiscoverSharedPublicSetup;
unifiedSearch: {};
observabilityAIAssistant: ObservabilityAIAssistantPublicSetup;
share: SharePublicSetup;
observabilityAIAssistant?: ObservabilityAIAssistantPublicSetup;
streams: StreamsPluginSetup;
unifiedSearch: {};
}
export interface StreamsAppStartDependencies {
streams: StreamsPluginStart;
data: DataPublicPluginStart;
dataViews: DataViewsPublicPluginStart;
unifiedSearch: UnifiedSearchPublicPluginStart;
share: SharePublicStart;
savedObjectsTagging: SavedObjectTaggingPluginStart;
navigation: NavigationPublicStart;
fieldsMetadata: FieldsMetadataPublicStart;
discoverShared: DiscoverSharedPublicStart;
observabilityAIAssistant?: ObservabilityAIAssistantPublicStart;
fieldsMetadata: FieldsMetadataPublicStart;
licensing: LicensingPluginStart;
indexManagement: IndexManagementPluginStart;
ingestPipelines: IngestPipelinesPluginStart;
navigation: NavigationPublicStart;
observabilityAIAssistant: ObservabilityAIAssistantPublicStart;
savedObjectsTagging: SavedObjectTaggingPluginStart;
share: SharePublicStart;
streams: StreamsPluginStart;
unifiedSearch: UnifiedSearchPublicPluginStart;
}
export interface StreamsAppPublicSetup {}