diff --git a/src/platform/packages/shared/kbn-xstate-utils/src/get_placeholder_for.ts b/src/platform/packages/shared/kbn-xstate-utils/src/get_placeholder_for.ts new file mode 100644 index 000000000000..bcbcc765fbbf --- /dev/null +++ b/src/platform/packages/shared/kbn-xstate-utils/src/get_placeholder_for.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +export const getPlaceholderFor = any>( + _implementationFactory: ImplementationFactory +): ReturnType => + (() => { + throw new Error('Not implemented'); + }) as ReturnType; diff --git a/src/platform/packages/shared/kbn-xstate-utils/src/index.ts b/src/platform/packages/shared/kbn-xstate-utils/src/index.ts index 3edf83e8a32c..2be37e739949 100644 --- a/src/platform/packages/shared/kbn-xstate-utils/src/index.ts +++ b/src/platform/packages/shared/kbn-xstate-utils/src/index.ts @@ -9,6 +9,7 @@ export * from './actions'; export * from './dev_tools'; +export * from './get_placeholder_for'; export * from './console_inspector'; export * from './notification_channel'; export * from './types'; diff --git a/x-pack/platform/packages/shared/logs-overview/src/services/categorize_logs_service/categorize_logs_service.ts b/x-pack/platform/packages/shared/logs-overview/src/services/categorize_logs_service/categorize_logs_service.ts index deeb758d2d73..c14fcc81e94b 100644 --- a/x-pack/platform/packages/shared/logs-overview/src/services/categorize_logs_service/categorize_logs_service.ts +++ b/x-pack/platform/packages/shared/logs-overview/src/services/categorize_logs_service/categorize_logs_service.ts @@ -6,8 +6,8 @@ */ import { MachineImplementationsFrom, assign, setup } from 'xstate5'; +import { getPlaceholderFor } from '@kbn/xstate-utils'; import { LogCategory } from '../../types'; -import { getPlaceholderFor } from '../../utils/xstate5_utils'; import { categorizeDocuments } from './categorize_documents'; import { countDocuments } from './count_documents'; import { CategorizeLogsServiceDependencies, LogCategorizationParams } from './types'; diff --git a/x-pack/platform/packages/shared/logs-overview/src/utils/xstate5_utils.ts b/x-pack/platform/packages/shared/logs-overview/src/utils/xstate5_utils.ts deleted file mode 100644 index 3df0bf4ea398..000000000000 --- a/x-pack/platform/packages/shared/logs-overview/src/utils/xstate5_utils.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export const getPlaceholderFor = any>( - implementationFactory: ImplementationFactory -): ReturnType => - (() => { - throw new Error('Not implemented'); - }) as ReturnType; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/asset_image/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/asset_image/index.tsx index 4a77974bdca4..93887b1be3ee 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/asset_image/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/asset_image/index.tsx @@ -38,9 +38,18 @@ export function AssetImage({ type = 'welcome', ...props }: AssetImageProps) { const [imageSrc, setImageSrc] = useState(); useEffect(() => { + let isMounted = true; const dynamicImageImport = colorMode === 'LIGHT' ? light() : dark(); - dynamicImageImport.then((module) => setImageSrc(module.default)); + dynamicImageImport.then((module) => { + if (isMounted) { + setImageSrc(module.default); + } + }); + + return () => { + isMounted = false; + }; }, [colorMode, dark, light]); return imageSrc ? : null; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/management_bottom_bar/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/management_bottom_bar/index.tsx index 70671b09d4c9..b484c4f720f1 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/management_bottom_bar/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/management_bottom_bar/index.tsx @@ -6,13 +6,12 @@ */ import React from 'react'; -import { EuiButton, EuiButtonEmpty, EuiFlexGroup, EuiToolTip, EuiToolTipProps } from '@elastic/eui'; +import { EuiButton, EuiButtonEmpty, EuiFlexGroup } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; import { useDiscardConfirm } from '../../../hooks/use_discard_confirm'; interface ManagementBottomBarProps { confirmButtonText?: string; - confirmTooltip?: Partial; disabled?: boolean; isLoading?: boolean; onCancel: () => void; @@ -21,7 +20,6 @@ interface ManagementBottomBarProps { export function ManagementBottomBar({ confirmButtonText = defaultConfirmButtonText, - confirmTooltip, disabled = false, isLoading = false, onCancel, @@ -34,31 +32,11 @@ export function ManagementBottomBar({ cancelButtonText: keepEditingLabel, }); - const confirmButtonContent = ( - - {confirmButtonText} - - ); - - const confirmButton = confirmTooltip ? ( - {confirmButtonContent} - ) : ( - confirmButtonContent - ); - return ( - {confirmButton} + + {confirmButtonText} + ); } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_definition.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_definition.ts deleted file mode 100644 index caab0adf002e..000000000000 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_definition.ts +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { useState, useMemo, useEffect, useRef, useCallback } from 'react'; -import { i18n } from '@kbn/i18n'; -import { useAbortController, useBoolean } from '@kbn/react-hooks'; -import { - IngestStreamGetResponse, - isWiredStreamGetResponse, - FieldDefinition, - WiredStreamGetResponse, - IngestUpsertRequest, - ProcessorDefinition, - getProcessorType, -} from '@kbn/streams-schema'; -import { DetectedField, ProcessorDefinitionWithUIAttributes } from '../types'; -import { useKibana } from '../../../../hooks/use_kibana'; -import { processorConverter } from '../utils'; - -export interface UseDefinitionReturn { - processors: ProcessorDefinitionWithUIAttributes[]; - hasChanges: boolean; - isSavingChanges: boolean; - addProcessor: (newProcessor: ProcessorDefinition, newFields?: DetectedField[]) => void; - updateProcessor: ( - id: string, - processor: ProcessorDefinition, - status?: ProcessorDefinitionWithUIAttributes['status'] - ) => void; - deleteProcessor: (id: string) => void; - reorderProcessors: (processors: ProcessorDefinitionWithUIAttributes[]) => void; - saveChanges: () => Promise; - setProcessors: (processors: ProcessorDefinitionWithUIAttributes[]) => void; - resetChanges: () => void; -} - -export const useDefinition = ( - definition: IngestStreamGetResponse, - refreshDefinition: () => void -): UseDefinitionReturn => { - const { core, dependencies } = useKibana(); - - const { toasts } = core.notifications; - const { processing: existingProcessorDefinitions } = definition.stream.ingest; - const { streamsRepositoryClient } = dependencies.start.streams; - - const abortController = useAbortController(); - const [isSavingChanges, { on: startsSaving, off: endsSaving }] = useBoolean(); - - const [processors, setProcessors] = useState(() => - createProcessorsList(existingProcessorDefinitions) - ); - - const initialProcessors = useRef(processors); - - const [fields, setFields] = useState(() => - isWiredStreamGetResponse(definition) ? definition.stream.ingest.wired.fields : {} - ); - - const nextProcessorDefinitions = useMemo( - () => processors.map(processorConverter.toAPIDefinition), - [processors] - ); - - useEffect(() => { - // Reset processors when definition refreshes - const resetProcessors = createProcessorsList(definition.stream.ingest.processing); - setProcessors(resetProcessors); - initialProcessors.current = resetProcessors; - }, [definition]); - - const hasChanges = useMemo( - () => - processors.length !== initialProcessors.current.length || // Processor count changed, a processor might be deleted - processors.some((proc) => proc.status === 'draft' || proc.status === 'updated') || // New or updated processors - hasOrderChanged(processors, initialProcessors.current), // Processor order changed - [processors] - ); - - const addProcessor = useCallback( - (newProcessor: ProcessorDefinition, newFields?: DetectedField[]) => { - setProcessors((prevProcs) => - prevProcs.concat(processorConverter.toUIDefinition(newProcessor, { status: 'draft' })) - ); - - if (isWiredStreamGetResponse(definition) && newFields) { - setFields((currentFields) => mergeFields(definition, currentFields, newFields)); - } - }, - [definition] - ); - - const updateProcessor = useCallback( - ( - id: string, - processorUpdate: ProcessorDefinition, - status: ProcessorDefinitionWithUIAttributes['status'] = 'updated' - ) => { - setProcessors((prevProcs) => - prevProcs.map((proc) => - proc.id === id - ? { - ...processorUpdate, - id, - type: getProcessorType(processorUpdate), - status, - } - : proc - ) - ); - }, - [] - ); - - const reorderProcessors = setProcessors; - - const deleteProcessor = useCallback((id: string) => { - setProcessors((prevProcs) => prevProcs.filter((proc) => proc.id !== id)); - }, []); - - const resetChanges = () => { - const resetProcessors = createProcessorsList(existingProcessorDefinitions); - setProcessors(resetProcessors); - initialProcessors.current = resetProcessors; - setFields(isWiredStreamGetResponse(definition) ? definition.stream.ingest.wired.fields : {}); - }; - - const saveChanges = async () => { - startsSaving(); - try { - await streamsRepositoryClient.fetch(`PUT /api/streams/{name}/_ingest`, { - signal: abortController.signal, - params: { - path: { - name: definition.stream.name, - }, - body: { - ingest: { - ...definition.stream.ingest, - processing: nextProcessorDefinitions, - ...(isWiredStreamGetResponse(definition) && { - wired: { ...definition.stream.ingest.wired, fields }, - }), - }, - } as IngestUpsertRequest, - }, - }); - - toasts.addSuccess( - i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.saveChangesSuccess', - { defaultMessage: "Stream's processors updated" } - ) - ); - - refreshDefinition(); - } catch (error) { - toasts.addError(new Error(error.body.message), { - title: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.saveChangesError', - { defaultMessage: "An issue occurred saving processors' changes." } - ), - toastMessage: error.body.message, - }); - } finally { - endsSaving(); - } - }; - - return { - // Values - processors, - // Actions - addProcessor, - updateProcessor, - deleteProcessor, - reorderProcessors, - resetChanges, - saveChanges, - setProcessors, - // Flags - hasChanges, - isSavingChanges, - }; -}; - -const createProcessorsList = (processors: ProcessorDefinition[]) => { - return processors.map((processor) => processorConverter.toUIDefinition(processor)); -}; - -const hasOrderChanged = ( - processors: ProcessorDefinitionWithUIAttributes[], - initialProcessors: ProcessorDefinitionWithUIAttributes[] -) => { - return processors.some((processor, index) => processor.id !== initialProcessors[index].id); -}; - -const mergeFields = ( - definition: WiredStreamGetResponse, - currentFields: FieldDefinition, - newFields: DetectedField[] -) => { - return { - ...definition.stream.ingest.wired.fields, - ...newFields.reduce((acc, field) => { - // Add only new fields and ignore unmapped ones - if ( - !(field.name in currentFields) && - !(field.name in definition.inherited_fields) && - field.type !== undefined - ) { - acc[field.name] = { type: field.type }; - } - return acc; - }, {} as FieldDefinition), - }; -}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_processing_simulator.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_processing_simulator.ts deleted file mode 100644 index 0f219f92e858..000000000000 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_processing_simulator.ts +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { useEffect, useMemo, useRef, useState } from 'react'; -import { debounce, isEmpty, isEqual, uniq, uniqBy } from 'lodash'; -import { - IngestStreamGetResponse, - getProcessorConfig, - UnaryOperator, - Condition, - processorDefinitionSchema, - isSchema, - FlattenRecord, -} from '@kbn/streams-schema'; -import { IHttpFetchError, ResponseErrorBody } from '@kbn/core/public'; -import { APIReturnType } from '@kbn/streams-plugin/public/api'; -import { i18n } from '@kbn/i18n'; -import { flattenObjectNestedLast } from '@kbn/object-utils'; -import { useStreamsAppFetch } from '../../../../hooks/use_streams_app_fetch'; -import { useKibana } from '../../../../hooks/use_kibana'; -import { DetectedField, ProcessorDefinitionWithUIAttributes } from '../types'; -import { processorConverter } from '../utils'; - -export type Simulation = APIReturnType<'POST /api/streams/{name}/processing/_simulate'>; -export type ProcessorMetrics = - Simulation['processors_metrics'][keyof Simulation['processors_metrics']]; - -export interface TableColumn { - name: string; - origin: 'processor' | 'detected'; -} - -export const docsFilterOptions = { - outcome_filter_all: { - id: 'outcome_filter_all', - label: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.all', - { defaultMessage: 'All samples' } - ), - }, - outcome_filter_matched: { - id: 'outcome_filter_matched', - label: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.matched', - { defaultMessage: 'Matched' } - ), - }, - outcome_filter_unmatched: { - id: 'outcome_filter_unmatched', - label: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.unmatched', - { defaultMessage: 'Unmatched' } - ), - }, -} as const; - -export type DocsFilterOption = keyof typeof docsFilterOptions; - -export interface UseProcessingSimulatorProps { - definition: IngestStreamGetResponse; - processors: ProcessorDefinitionWithUIAttributes[]; -} - -export interface UseProcessingSimulatorReturn { - hasLiveChanges: boolean; - error?: IHttpFetchError; - isLoading: boolean; - samples: FlattenRecord[]; - filteredSamples: FlattenRecord[]; - simulation?: Simulation | null; - tableColumns: TableColumn[]; - refreshSamples: () => void; - watchProcessor: ( - processor: ProcessorDefinitionWithUIAttributes | { id: string; deleteIfExists: true } - ) => void; - refreshSimulation: () => void; - selectedDocsFilter: DocsFilterOption; - setSelectedDocsFilter: (filter: DocsFilterOption) => void; -} - -export const useProcessingSimulator = ({ - definition, - processors, -}: UseProcessingSimulatorProps): UseProcessingSimulatorReturn => { - const { dependencies } = useKibana(); - const { - data, - streams: { streamsRepositoryClient }, - } = dependencies.start; - - const { - absoluteTimeRange: { start, end }, - } = data.query.timefilter.timefilter.useTimefilter(); - - const draftProcessors = useMemo( - () => processors.filter((processor) => processor.status === 'draft'), - [processors] - ); - - const [liveDraftProcessors, setLiveDraftProcessors] = useState(draftProcessors); - - useEffect(() => { - setLiveDraftProcessors((prevLiveProcessors) => { - const inProgressDraft = prevLiveProcessors.find((proc) => proc.id === 'draft'); - return inProgressDraft ? [...draftProcessors, inProgressDraft] : draftProcessors; - }); - }, [draftProcessors]); - - const watchProcessor = useMemo( - () => - debounce( - (processor: ProcessorDefinitionWithUIAttributes | { id: string; deleteIfExists: true }) => { - if ('deleteIfExists' in processor) { - return setLiveDraftProcessors((prevLiveDraftProcessors) => - prevLiveDraftProcessors.filter((proc) => proc.id !== processor.id) - ); - } - - if (processor.status === 'draft') { - setLiveDraftProcessors((prevLiveDraftProcessors) => { - const newLiveDraftProcessors = prevLiveDraftProcessors.slice(); - - const existingIndex = prevLiveDraftProcessors.findIndex( - (proc) => proc.id === processor.id - ); - - if (existingIndex !== -1) { - newLiveDraftProcessors[existingIndex] = processor; - } else { - newLiveDraftProcessors.push(processor); - } - - return newLiveDraftProcessors; - }); - } - }, - 800 - ), - [] - ); - - const memoizedSamplingCondition = useRef(); - - const samplingCondition = useMemo(() => { - const newSamplingCondition = composeSamplingCondition(liveDraftProcessors); - if (isEqual(newSamplingCondition, memoizedSamplingCondition.current)) { - return memoizedSamplingCondition.current; - } - memoizedSamplingCondition.current = newSamplingCondition; - return newSamplingCondition; - }, [liveDraftProcessors]); - - const { - loading: isLoadingSamples, - value: sampleDocs, - refresh: refreshSamples, - } = useStreamsAppFetch( - async ({ signal }) => { - if (!definition) { - return []; - } - - const samplesBody = await streamsRepositoryClient.fetch('POST /api/streams/{name}/_sample', { - signal, - params: { - path: { name: definition.stream.name }, - body: { - if: samplingCondition, - start: start?.valueOf(), - end: end?.valueOf(), - size: 100, - }, - }, - }); - - return samplesBody.documents.map((doc) => flattenObjectNestedLast(doc)) as FlattenRecord[]; - }, - [definition, streamsRepositoryClient, start, end, samplingCondition], - { disableToastOnError: true } - ); - - const { - loading: isLoadingSimulation, - value: simulation, - error: simulationError, - refresh: refreshSimulation, - } = useStreamsAppFetch( - ({ signal }): Promise => { - if (!definition || isEmpty(sampleDocs) || isEmpty(liveDraftProcessors)) { - // This is a hack to avoid losing the previous value of the simulation once the conditions are not met. The state management refactor will fix this. - return Promise.resolve(simulation!); - } - - const processing = liveDraftProcessors.map(processorConverter.toAPIDefinition); - - const hasValidProcessors = processing.every((processor) => - isSchema(processorDefinitionSchema, processor) - ); - - // Each processor should meet the minimum schema requirements to run the simulation - if (!hasValidProcessors) { - // This is a hack to avoid losing the previous value of the simulation once the conditions are not met. The state management refactor will fix this. - return Promise.resolve(simulation!); - } - - return streamsRepositoryClient.fetch('POST /api/streams/{name}/processing/_simulate', { - signal, - params: { - path: { name: definition.stream.name }, - body: { - documents: sampleDocs, - processing: liveDraftProcessors.map(processorConverter.toSimulateDefinition), - }, - }, - }); - }, - [definition, sampleDocs, liveDraftProcessors, streamsRepositoryClient], - { disableToastOnError: true } - ); - - const tableColumns = useMemo(() => { - // If there is an error, we only want the source fields - const detectedFields = simulationError ? [] : simulation?.detected_fields ?? []; - - return getTableColumns(liveDraftProcessors, detectedFields); - }, [liveDraftProcessors, simulation, simulationError]); - - const hasLiveChanges = !isEmpty(liveDraftProcessors); - - const [selectedDocsFilter, setSelectedDocsFilter] = - useState('outcome_filter_all'); - - const filteredSamples = useMemo(() => { - if (!simulation?.documents) { - return sampleDocs?.map((doc) => flattenObjectNestedLast(doc)) as FlattenRecord[]; - } - - const filterDocuments = (filter: DocsFilterOption) => { - switch (filter) { - case 'outcome_filter_matched': - return simulation.documents.filter((doc) => doc.status === 'parsed'); - case 'outcome_filter_unmatched': - return simulation.documents.filter((doc) => doc.status !== 'parsed'); - case 'outcome_filter_all': - default: - return simulation.documents; - } - }; - - return filterDocuments(selectedDocsFilter).map((doc) => doc.value); - }, [sampleDocs, simulation?.documents, selectedDocsFilter]); - - return { - hasLiveChanges, - isLoading: isLoadingSamples || isLoadingSimulation, - error: simulationError as IHttpFetchError | undefined, - refreshSamples, - simulation, - samples: sampleDocs ?? [], - filteredSamples: filteredSamples ?? [], - tableColumns, - watchProcessor, - refreshSimulation, - selectedDocsFilter, - setSelectedDocsFilter, - }; -}; - -const composeSamplingCondition = ( - processors: ProcessorDefinitionWithUIAttributes[] -): Condition | undefined => { - if (isEmpty(processors)) { - return undefined; - } - - const uniqueFields = uniq(getSourceFields(processors)); - - const conditions = uniqueFields.map((field) => ({ - field, - operator: 'exists' as UnaryOperator, - })); - - return { or: conditions }; -}; - -const getSourceFields = (processors: ProcessorDefinitionWithUIAttributes[]): string[] => { - return processors.map((processor) => getProcessorConfig(processor).field); -}; - -const getTableColumns = ( - processors: ProcessorDefinitionWithUIAttributes[], - fields: DetectedField[] -) => { - const uniqueProcessorsFields = getSourceFields(processors).map((name) => ({ - name, - origin: 'processor', - })); - - const uniqueDetectedFields = fields.map((field) => ({ - name: field.name, - origin: 'detected', - })); - - return uniqBy([...uniqueProcessorsFields, ...uniqueDetectedFields], 'name') as TableColumn[]; -}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/page_content.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/page_content.tsx index 3f8d67440945..2eaaa1ca604c 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/page_content.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/page_content.tsx @@ -18,23 +18,22 @@ import { useEuiTheme, } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; -import { IngestStreamGetResponse, isRootStreamDefinition } from '@kbn/streams-schema'; +import { IngestStreamGetResponse } from '@kbn/streams-schema'; import { useUnsavedChangesPrompt } from '@kbn/unsaved-changes-prompt'; import { css } from '@emotion/react'; import { isEmpty } from 'lodash'; -import { UseDefinitionReturn, useDefinition } from './hooks/use_definition'; import { useKibana } from '../../../hooks/use_kibana'; -import { RootStreamEmptyPrompt } from './root_stream_empty_prompt'; import { DraggableProcessorListItem } from './processors_list'; import { SortableList } from './sortable_list'; import { ManagementBottomBar } from '../management_bottom_bar'; import { AddProcessorPanel } from './processors'; import { SimulationPlayground } from './simulation_playground'; import { - UseProcessingSimulatorReturn, - useProcessingSimulator, -} from './hooks/use_processing_simulator'; -import { SimulatorContextProvider } from './simulator_context'; + StreamEnrichmentContextProvider, + useSimulatorSelector, + useStreamEnrichmentEvents, + useStreamsEnrichmentSelector, +} from './state_management/stream_enrichment_state_machine'; const MemoSimulationPlayground = React.memo(SimulationPlayground); @@ -43,238 +42,171 @@ interface StreamDetailEnrichmentContentProps { refreshDefinition: () => void; } -export function StreamDetailEnrichmentContent({ - definition, - refreshDefinition, -}: StreamDetailEnrichmentContentProps) { +export function StreamDetailEnrichmentContent(props: StreamDetailEnrichmentContentProps) { + const { core, dependencies } = useKibana(); + const { + data, + streams: { streamsRepositoryClient }, + } = dependencies.start; + + return ( + + + + ); +} + +export function StreamDetailEnrichmentContentImpl() { const { appParams, core } = useKibana(); - const { - processors, - addProcessor, - updateProcessor, - deleteProcessor, - resetChanges, - saveChanges, - reorderProcessors, - hasChanges, - isSavingChanges, - } = useDefinition(definition, refreshDefinition); + const { resetChanges, saveChanges } = useStreamEnrichmentEvents(); - const processingSimulator = useProcessingSimulator({ definition, processors }); - - const { - hasLiveChanges, - isLoading, - refreshSamples, - filteredSamples, - simulation, - tableColumns, - watchProcessor, - selectedDocsFilter, - setSelectedDocsFilter, - } = processingSimulator; + const hasChanges = useStreamsEnrichmentSelector((state) => state.can({ type: 'stream.update' })); + const isSavingChanges = useStreamsEnrichmentSelector((state) => + state.matches({ ready: { stream: 'updating' } }) + ); useUnsavedChangesPrompt({ - hasUnsavedChanges: hasChanges || hasLiveChanges, + hasUnsavedChanges: hasChanges, history: appParams.history, http: core.http, navigateToUrl: core.application.navigateToUrl, openConfirm: core.overlays.openConfirm, }); - if (isRootStreamDefinition(definition.stream)) { - return ; - } - - const isNonAdditiveSimulation = simulation && simulation.is_non_additive_simulation; - const isSubmitDisabled = Boolean(!hasChanges || isNonAdditiveSimulation); - - const confirmTooltip = isNonAdditiveSimulation - ? { - title: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.nonAdditiveProcessorsTooltip.title', - { defaultMessage: 'Non additive simulation detected' } - ), - content: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.nonAdditiveProcessorsTooltip.content', - { - defaultMessage: - 'We currently prevent adding processors that change/remove existing data. Please update your processor configurations to continue.', - } - ), - } - : undefined; - return ( - - - - - - - - + + + + + + ); } -interface ProcessorsEditorProps { - definition: IngestStreamGetResponse; - processors: UseDefinitionReturn['processors']; - onAddProcessor: UseDefinitionReturn['addProcessor']; - onDeleteProcessor: UseDefinitionReturn['deleteProcessor']; - onReorderProcessor: UseDefinitionReturn['reorderProcessors']; - onUpdateProcessor: UseDefinitionReturn['updateProcessor']; - onWatchProcessor: UseProcessingSimulatorReturn['watchProcessor']; - simulation: UseProcessingSimulatorReturn['simulation']; -} +const ProcessorsEditor = React.memo(() => { + const { euiTheme } = useEuiTheme(); -const ProcessorsEditor = React.memo( - ({ - definition, - processors, - onAddProcessor, - onDeleteProcessor, - onReorderProcessor, - onUpdateProcessor, - onWatchProcessor, - simulation, - }: ProcessorsEditorProps) => { - const { euiTheme } = useEuiTheme(); + const { reorderProcessors } = useStreamEnrichmentEvents(); - const handlerItemDrag: DragDropContextProps['onDragEnd'] = ({ source, destination }) => { - if (source && destination) { - const items = euiDragDropReorder(processors, source.index, destination.index); - onReorderProcessor(items); - } - }; + const processorsRefs = useStreamsEnrichmentSelector((state) => + state.context.processorsRefs.filter((processorRef) => + processorRef.getSnapshot().matches('configured') + ) + ); - const hasProcessors = !isEmpty(processors); + const simulationSnapshot = useSimulatorSelector((s) => s); - return ( - <> - - -

- {i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.headingTitle', - { - defaultMessage: 'Processors for field extraction', - } - )} -

-
- + const handlerItemDrag: DragDropContextProps['onDragEnd'] = ({ source, destination }) => { + if (source && destination) { + const items = euiDragDropReorder(processorsRefs, source.index, destination.index); + reorderProcessors(items); + } + }; + + const hasProcessors = !isEmpty(processorsRefs); + + return ( + <> + + +

{i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.headingSubtitle', + 'xpack.streams.streamDetailView.managementTab.enrichment.headingTitle', { - defaultMessage: - 'Drag and drop existing processors to update their execution order.', + defaultMessage: 'Processors for field extraction', } )} - - - - {hasProcessors && ( - - {processors.map((processor, idx) => ( - - ))} - +

+
+ + {i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.headingSubtitle', + { + defaultMessage: 'Drag and drop existing processors to update their execution order.', + } )} - -
- - ); - } -); +
+
+ + {hasProcessors && ( + + {processorsRefs.map((processorRef, idx) => ( + + ))} + + )} + + + + ); +}); const verticalFlexCss = css` display: flex; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx index f3c7b22c8ef3..d21752045221 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx @@ -16,121 +16,74 @@ import { EuiProgress, } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; -import { TimeRange } from '@kbn/es-query'; -import { isEmpty } from 'lodash'; -import { SampleDocument } from '@kbn/streams-schema'; -import { useKibana } from '../../../hooks/use_kibana'; +import { useSelector } from '@xstate5/react'; +import { isEmpty, isEqual } from 'lodash'; import { StreamsAppSearchBar, StreamsAppSearchBarProps } from '../../streams_app_search_bar'; import { PreviewTable } from '../preview_table'; -import { - DocsFilterOption, - TableColumn, - UseProcessingSimulatorReturn, - docsFilterOptions, -} from './hooks/use_processing_simulator'; import { AssetImage } from '../../asset_image'; +import { + useSimulatorSelector, + useStreamEnrichmentEvents, +} from './state_management/stream_enrichment_state_machine'; +import { + PreviewDocsFilterOption, + getTableColumns, + previewDocsFilterOptions, +} from './state_management/simulation_state_machine'; -interface ProcessorOutcomePreviewProps { - columns: TableColumn[]; - isLoading: UseProcessingSimulatorReturn['isLoading']; - simulation: UseProcessingSimulatorReturn['simulation']; - filteredSamples: UseProcessingSimulatorReturn['samples']; - onRefreshSamples: UseProcessingSimulatorReturn['refreshSamples']; - selectedDocsFilter: UseProcessingSimulatorReturn['selectedDocsFilter']; - setSelectedDocsFilter: UseProcessingSimulatorReturn['setSelectedDocsFilter']; -} - -export const ProcessorOutcomePreview = ({ - columns, - isLoading, - simulation, - filteredSamples, - onRefreshSamples, - selectedDocsFilter, - setSelectedDocsFilter, -}: ProcessorOutcomePreviewProps) => { - const { dependencies } = useKibana(); - const { data } = dependencies.start; - - const { timeRange, setTimeRange } = data.query.timefilter.timefilter.useTimefilter(); - - const tableColumns = useMemo(() => { - switch (selectedDocsFilter) { - case 'outcome_filter_unmatched': - return columns - .filter((column) => column.origin === 'processor') - .map((column) => column.name); - case 'outcome_filter_matched': - case 'outcome_filter_all': - default: - return columns.map((column) => column.name); - } - }, [columns, selectedDocsFilter]); - - const simulationFailureRate = simulation - ? simulation?.failure_rate + simulation?.skipped_rate - : undefined; - const simulationSuccessRate = simulation?.success_rate; +export const ProcessorOutcomePreview = () => { + const isLoading = useSimulatorSelector( + (state) => + state.matches('debouncingChanges') || + state.matches('loadingSamples') || + state.matches('runningSimulation') + ); return ( <> - + - + {isLoading && } ); }; -interface OutcomeControlsProps { - docsFilter: DocsFilterOption; - timeRange: TimeRange; - onDocsFilterChange: (filter: DocsFilterOption) => void; - onTimeRangeChange: (timeRange: TimeRange) => void; - onTimeRangeRefresh: () => void; - simulationFailureRate?: number; - simulationSuccessRate?: number; -} +const OutcomeControls = () => { + const { changePreviewDocsFilter } = useStreamEnrichmentEvents(); + + const previewDocsFilter = useSimulatorSelector((state) => state.context.previewDocsFilter); + const simulationFailureRate = useSimulatorSelector((state) => + state.context.simulation + ? state.context.simulation.failure_rate + state.context.simulation.skipped_rate + : undefined + ); + const simulationSuccessRate = useSimulatorSelector( + (state) => state.context.simulation?.success_rate + ); + + const dateRangeRef = useSimulatorSelector((state) => state.context.dateRangeRef); + const timeRange = useSelector(dateRangeRef, (state) => state.context.timeRange); + const handleRefresh = () => dateRangeRef.send({ type: 'dateRange.refresh' }); -const OutcomeControls = ({ - docsFilter, - timeRange, - onDocsFilterChange, - onTimeRangeChange, - onTimeRangeRefresh, - simulationFailureRate, - simulationSuccessRate, -}: OutcomeControlsProps) => { const handleQuerySubmit: StreamsAppSearchBarProps['onQuerySubmit'] = ( { dateRange }, isUpdate ) => { if (!isUpdate) { - return onTimeRangeRefresh(); + return handleRefresh(); } if (dateRange) { - onTimeRangeChange({ - from: dateRange.from, - to: dateRange?.to, - mode: dateRange.mode, - }); + dateRangeRef.send({ type: 'dateRange.update', range: dateRange }); } }; - const getFilterButtonPropsFor = (filterId: DocsFilterOption) => ({ - hasActiveFilters: docsFilter === filterId, - onClick: () => onDocsFilterChange(filterId), + const getFilterButtonPropsFor = (filter: PreviewDocsFilterOption) => ({ + hasActiveFilters: previewDocsFilter === filter, + onClick: () => changePreviewDocsFilter(filter), }); return ( @@ -141,45 +94,60 @@ const OutcomeControls = ({ { defaultMessage: 'Filter for all, matching or unmatching previewed documents.' } )} > - - {docsFilterOptions.outcome_filter_all.label} + + {previewDocsFilterOptions.outcome_filter_all.label} - {docsFilterOptions.outcome_filter_matched.label} + {previewDocsFilterOptions.outcome_filter_matched.label} - {docsFilterOptions.outcome_filter_unmatched.label} + {previewDocsFilterOptions.outcome_filter_unmatched.label} ); }; -interface OutcomePreviewTableProps { - documents: SampleDocument[]; - columns: string[]; -} +const MemoPreviewTable = React.memo(PreviewTable, (prevProps, nextProps) => { + // Need to specify the props to compare since the columns might be the same even if the useMemo call returns a new array + return ( + prevProps.documents === nextProps.documents && + isEqual(prevProps.displayColumns, nextProps.displayColumns) + ); +}); -const OutcomePreviewTable = ({ documents, columns }: OutcomePreviewTableProps) => { - if (isEmpty(documents)) { +const OutcomePreviewTable = () => { + const processors = useSimulatorSelector((state) => state.context.processors); + const detectedFields = useSimulatorSelector((state) => state.context.simulation?.detected_fields); + const previewDocsFilter = useSimulatorSelector((state) => state.context.previewDocsFilter); + const previewDocuments = useSimulatorSelector((state) => state.context.previewDocuments); + + const previewColumns = useMemo( + () => getTableColumns(processors, detectedFields ?? [], previewDocsFilter), + [detectedFields, previewDocsFilter, processors] + ); + + if (!previewDocuments || isEmpty(previewDocuments)) { return ( ; + return ; }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/grok/grok_ai_suggestions.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/grok/grok_ai_suggestions.tsx index 6a61317c01c6..ab5fd8579f92 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/grok/grok_ai_suggestions.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/grok/grok_ai_suggestions.tsx @@ -29,10 +29,10 @@ import type { FindActionResult } from '@kbn/actions-plugin/server'; import { UseGenAIConnectorsResult } from '@kbn/observability-ai-assistant-plugin/public/hooks/use_genai_connectors'; import { useAbortController, useBoolean } from '@kbn/react-hooks'; import useObservable from 'react-use/lib/useObservable'; +import { useStreamDetail } from '../../../../../hooks/use_stream_detail'; import { useKibana } from '../../../../../hooks/use_kibana'; import { GrokFormState, ProcessorFormState } from '../../types'; -import { UseProcessingSimulatorReturn } from '../../hooks/use_processing_simulator'; -import { useSimulatorContext } from '../../simulator_context'; +import { useSimulatorSelector } from '../../state_management/stream_enrichment_state_machine'; const RefreshButton = ({ generatePatterns, @@ -148,12 +148,10 @@ function useAiEnabled() { } function InnerGrokAiSuggestions({ - refreshSimulation, - filteredSamples, + previewDocuments, definition, }: { - refreshSimulation: UseProcessingSimulatorReturn['refreshSimulation']; - filteredSamples: FlattenRecord[]; + previewDocuments: FlattenRecord[]; definition: IngestStreamGetResponse; }) { const { dependencies } = useKibana(); @@ -193,7 +191,7 @@ function InnerGrokAiSuggestions({ body: { field: fieldValue, connectorId: currentConnector, - samples: filteredSamples, + samples: previewDocuments, }, }, }) @@ -210,7 +208,7 @@ function InnerGrokAiSuggestions({ currentConnector, definition.stream.name, fieldValue, - filteredSamples, + previewDocuments, streamsRepositoryClient, ]); @@ -225,11 +223,11 @@ function InnerGrokAiSuggestions({ const hasValidField = useMemo(() => { return Boolean( currentFieldName && - filteredSamples.some( + previewDocuments.some( (sample) => sample[currentFieldName] && typeof sample[currentFieldName] === 'string' ) ); - }, [filteredSamples, currentFieldName]); + }, [previewDocuments, currentFieldName]); const filteredSuggestions = suggestions?.patterns .map((pattern, i) => ({ @@ -304,7 +302,6 @@ function InnerGrokAiSuggestions({ { value: suggestion.pattern }, ]); } - refreshSimulation(); }} data-test-subj="streamsAppGrokAiSuggestionsButton" iconType="plusInCircle" @@ -360,7 +357,8 @@ export function GrokAiSuggestions() { core: { http }, } = useKibana(); const { enabled: isAiEnabled, couldBeEnabled } = useAiEnabled(); - const props = useSimulatorContext(); + const { definition } = useStreamDetail(); + const previewDocuments = useSimulatorSelector((state) => state.context.previewDocuments); if (!isAiEnabled && couldBeEnabled) { return ( @@ -390,8 +388,9 @@ export function GrokAiSuggestions() { ); } - if (!isAiEnabled) { + if (!isAiEnabled || !definition) { return null; } - return ; + + return ; } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/index.tsx index dce409d449a6..46f977776003 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/index.tsx @@ -20,94 +20,84 @@ import { EuiText, EuiBadge, } from '@elastic/eui'; +import { useSelector } from '@xstate5/react'; import { i18n } from '@kbn/i18n'; -import { ProcessorType, IngestStreamGetResponse } from '@kbn/streams-schema'; -import { isEmpty, isEqual } from 'lodash'; -import React, { useEffect, useMemo, useState } from 'react'; +import { isEmpty } from 'lodash'; +import React, { useEffect, useMemo } from 'react'; import { useForm, SubmitHandler, FormProvider, useWatch } from 'react-hook-form'; import { css } from '@emotion/react'; -import { useBoolean } from '@kbn/react-hooks'; +import { DiscardPromptOptions, useDiscardConfirm } from '../../../../hooks/use_discard_confirm'; import { DissectProcessorForm } from './dissect'; import { GrokProcessorForm } from './grok'; import { ProcessorTypeSelector } from './processor_type_selector'; import { ProcessorFormState, ProcessorDefinitionWithUIAttributes } from '../types'; import { - getDefaultFormState, + getFormStateFrom, convertFormStateToProcessor, isGrokProcessor, isDissectProcessor, + getDefaultFormStateByType, } from '../utils'; -import { useDiscardConfirm } from '../../../../hooks/use_discard_confirm'; -import { ProcessorMetrics, UseProcessingSimulatorReturn } from '../hooks/use_processing_simulator'; import { ProcessorErrors, ProcessorMetricBadges } from './processor_metrics'; -import { UseDefinitionReturn } from '../hooks/use_definition'; +import { + useStreamEnrichmentEvents, + useStreamsEnrichmentSelector, + useSimulatorSelector, + StreamEnrichmentContext, +} from '../state_management/stream_enrichment_state_machine'; +import { ProcessorMetrics } from '../state_management/simulation_state_machine'; -export interface ProcessorPanelProps { - definition: IngestStreamGetResponse; - processorMetrics?: ProcessorMetrics; - onWatchProcessor: UseProcessingSimulatorReturn['watchProcessor']; -} - -export interface AddProcessorPanelProps extends ProcessorPanelProps { - isInitiallyOpen?: boolean; - onAddProcessor: UseDefinitionReturn['addProcessor']; -} - -export interface EditProcessorPanelProps extends ProcessorPanelProps { - processor: ProcessorDefinitionWithUIAttributes; - onDeleteProcessor: UseDefinitionReturn['deleteProcessor']; - onUpdateProcessor: UseDefinitionReturn['updateProcessor']; -} - -export function AddProcessorPanel({ - onAddProcessor, - onWatchProcessor, - processorMetrics, -}: AddProcessorPanelProps) { +export function AddProcessorPanel() { const { euiTheme } = useEuiTheme(); - const [hasChanges, setHasChanges] = useState(false); - const [isOpen, { on: openPanel, off: closePanel }] = useBoolean(false); + const { addProcessor } = useStreamEnrichmentEvents(); - const defaultValues = useMemo(() => getDefaultFormState('grok'), []); + const processorRef = useStreamsEnrichmentSelector((state) => + state.context.processorsRefs.find((p) => p.getSnapshot().matches('draft')) + ); + const processorMetrics = useSimulatorSelector( + (state) => processorRef && state.context.simulation?.processors_metrics[processorRef.id] + ); + + const isOpen = Boolean(processorRef); + + const defaultValues = useMemo(() => getDefaultFormStateByType('grok'), []); const methods = useForm({ defaultValues, mode: 'onChange' }); const type = useWatch({ control: methods.control, name: 'type' }); useEffect(() => { - if (isOpen) { + if (!processorRef) { + methods.reset(defaultValues); + } + }, [defaultValues, methods, processorRef]); + + useEffect(() => { + if (processorRef) { const { unsubscribe } = methods.watch((value) => { - const draftProcessor = createDraftProcessorFromForm(value as ProcessorFormState); - onWatchProcessor(draftProcessor); - setHasChanges(!isEqual(defaultValues, value)); + const processor = convertFormStateToProcessor(value as ProcessorFormState); + processorRef.send({ type: 'processor.change', processor }); }); + return () => unsubscribe(); } - }, [defaultValues, isOpen, methods, onWatchProcessor]); + }, [methods, processorRef]); - const handleSubmit: SubmitHandler = async (data) => { - const processingDefinition = convertFormStateToProcessor(data); + const handleCancel = useDiscardConfirm( + () => processorRef?.send({ type: 'processor.cancel' }), + discardChangesPromptOptions + ); - onWatchProcessor({ id: 'draft', deleteIfExists: true }); - onAddProcessor(processingDefinition, data.detected_fields); - closePanel(); - }; - - const handleCancel = () => { - closePanel(); - methods.reset(); - onWatchProcessor({ id: 'draft', deleteIfExists: true }); + const handleSubmit: SubmitHandler = async () => { + processorRef?.send({ type: 'processor.stage' }); }; const handleOpen = () => { const draftProcessor = createDraftProcessorFromForm(defaultValues); - onWatchProcessor(draftProcessor); - openPanel(); + addProcessor(draftProcessor); }; - const confirmDiscardAndClose = useDiscardConfirm(handleCancel); - const buttonContent = isOpen ? ( i18n.translate( 'xpack.streams.streamDetailView.managementTab.enrichment.processorPanel.addingProcessor', @@ -146,7 +136,7 @@ export function AddProcessorPanel({ {i18n.translate( @@ -195,74 +185,74 @@ const createDraftProcessorFromForm = ( return { id: 'draft', - status: 'draft', type: formState.type, ...processingDefinition, }; }; -export function EditProcessorPanel({ - onDeleteProcessor, - onUpdateProcessor, - onWatchProcessor, - processor, - processorMetrics, -}: EditProcessorPanelProps) { - const { euiTheme } = useEuiTheme(); +export interface EditProcessorPanelProps { + processorRef: StreamEnrichmentContext['processorsRefs'][number]; + processorMetrics?: ProcessorMetrics; +} - const [hasChanges, setHasChanges] = useState(false); - const [isOpen, { on: openPanel, off: closePanel }] = useBoolean(); +export function EditProcessorPanel({ processorRef, processorMetrics }: EditProcessorPanelProps) { + const { euiTheme } = useEuiTheme(); + const state = useSelector(processorRef, (s) => s); + const previousProcessor = state.context.previousProcessor; + const processor = state.context.processor; const processorDescription = getProcessorDescription(processor); - const isDraft = processor.status === 'draft'; - const isUnsaved = isDraft || processor.status === 'updated'; + const isOpen = state.matches({ configured: 'edit' }); + const isNew = state.context.isNew; + const isUnsaved = isNew || state.context.isUpdated; - const defaultValues = useMemo(() => getDefaultFormState(processor.type, processor), [processor]); + const defaultValues = useMemo(() => getFormStateFrom(processor), [processor]); - const methods = useForm({ defaultValues, mode: 'onChange' }); + const methods = useForm({ + defaultValues, + mode: 'onChange', + }); const type = useWatch({ control: methods.control, name: 'type' }); useEffect(() => { const { unsubscribe } = methods.watch((value) => { const processingDefinition = convertFormStateToProcessor(value as ProcessorFormState); - onWatchProcessor({ - id: processor.id, - status: processor.status, - type: value.type as ProcessorType, - ...processingDefinition, + processorRef.send({ + type: 'processor.change', + processor: processingDefinition, }); - setHasChanges(!isEqual(defaultValues, value)); }); return () => unsubscribe(); - }, [defaultValues, methods, onWatchProcessor, processor.id, processor.status]); + }, [methods, processorRef]); - const handleSubmit: SubmitHandler = (data) => { - const processorDefinition = convertFormStateToProcessor(data); + useEffect(() => { + const subscription = processorRef.on('processor.changesDiscarded', () => { + methods.reset(getFormStateFrom(previousProcessor)); + }); - onUpdateProcessor(processor.id, processorDefinition, isDraft ? 'draft' : 'updated'); - closePanel(); + return () => subscription.unsubscribe(); + }, [methods, previousProcessor, processorRef]); + + const handleCancel = useDiscardConfirm( + () => processorRef?.send({ type: 'processor.cancel' }), + discardChangesPromptOptions + ); + + const handleProcessorDelete = useDiscardConfirm( + () => processorRef?.send({ type: 'processor.delete' }), + deleteProcessorPromptOptions + ); + + const handleSubmit: SubmitHandler = () => { + processorRef.send({ type: 'processor.update' }); }; - const handleProcessorDelete = () => { - onDeleteProcessor(processor.id); - closePanel(); + const handleOpen = () => { + processorRef.send({ type: 'processor.edit' }); }; - const handleCancel = () => { - methods.reset(); - closePanel(); - }; - - const confirmDiscardAndClose = useDiscardConfirm(handleCancel); - const confirmDeletionAndClose = useDiscardConfirm(handleProcessorDelete, { - title: deleteProcessorTitle, - message: deleteProcessorMessage, - confirmButtonText: deleteProcessorLabel, - cancelButtonText: deleteProcessorCancelLabel, - }); - const buttonContent = isOpen ? ( {processor.type.toUpperCase()} ) : ( @@ -278,7 +268,7 @@ export function EditProcessorPanel({ return ( {i18n.translate( @@ -321,7 +311,7 @@ export function EditProcessorPanel({ size="s" fill onClick={methods.handleSubmit(handleSubmit)} - disabled={!methods.formState.isValid} + disabled={!methods.formState.isValid || !state.can({ type: 'processor.update' })} > {i18n.translate( 'xpack.streams.streamDetailView.managementTab.enrichment.processorPanel.confirmEditProcessor', @@ -342,7 +332,7 @@ export function EditProcessorPanel({ )} {type === 'grok' && } {type === 'dissect' && } - - - {deleteProcessorLabel} - + + + {deleteProcessorLabel} + {processorMetrics && !isEmpty(processorMetrics.errors) && ( )} @@ -397,21 +387,6 @@ const deleteProcessorLabel = i18n.translate( { defaultMessage: 'Delete processor' } ); -const deleteProcessorCancelLabel = i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.deleteProcessorCancelLabel', - { defaultMessage: 'Cancel' } -); - -const deleteProcessorTitle = i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.deleteProcessorTitle', - { defaultMessage: 'Are you sure you want to delete this processor?' } -); - -const deleteProcessorMessage = i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.deleteProcessorMessage', - { defaultMessage: 'Deleting this processor will permanently impact the field configuration.' } -); - const getProcessorDescription = (processor: ProcessorDefinitionWithUIAttributes) => { if (isGrokProcessor(processor)) { return processor.grok.patterns.join(' • '); @@ -421,3 +396,37 @@ const getProcessorDescription = (processor: ProcessorDefinitionWithUIAttributes) return ''; }; + +export const discardChangesPromptOptions: DiscardPromptOptions = { + message: i18n.translate('xpack.streams.enrichment.processor.discardChanges.message', { + defaultMessage: 'Are you sure you want to discard your changes?', + }), + title: i18n.translate('xpack.streams.enrichment.processor.discardChanges.title', { + defaultMessage: 'Discard changes?', + }), + confirmButtonText: i18n.translate( + 'xpack.streams.enrichment.processor.discardChanges.confirmButtonText', + { defaultMessage: 'Discard' } + ), + cancelButtonText: i18n.translate( + 'xpack.streams.enrichment.processor.discardChanges.cancelButtonText', + { defaultMessage: 'Keep editing' } + ), +}; + +export const deleteProcessorPromptOptions: DiscardPromptOptions = { + message: i18n.translate('xpack.streams.enrichment.processor.deleteProcessor.message', { + defaultMessage: 'Deleting this processor will permanently impact the field configuration.', + }), + title: i18n.translate('xpack.streams.enrichment.processor.deleteProcessor.title', { + defaultMessage: 'Are you sure you want to delete this processor?', + }), + confirmButtonText: i18n.translate( + 'xpack.streams.enrichment.processor.deleteProcessor.confirmButtonText', + { defaultMessage: 'Delete processor' } + ), + cancelButtonText: i18n.translate( + 'xpack.streams.enrichment.processor.deleteProcessor.cancelButtonText', + { defaultMessage: 'Cancel' } + ), +}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx index edc211ddec06..f2fbd7a8e074 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx @@ -20,7 +20,7 @@ import React from 'react'; import { i18n } from '@kbn/i18n'; import useToggle from 'react-use/lib/useToggle'; import { css } from '@emotion/react'; -import { ProcessorMetrics } from '../hooks/use_processing_simulator'; +import { ProcessorMetrics } from '../state_management/simulation_state_machine'; type ProcessorMetricBadgesProps = ProcessorMetrics; @@ -116,7 +116,9 @@ export const ProcessorErrors = ({ metrics }: { metrics: ProcessorMetrics }) => { const shouldDisplayErrorToggle = remainingCount > 0; const getCalloutProps = (type: ProcessorMetrics['errors'][number]['type']): EuiCallOutProps => { - const isWarningError = type === 'generic_processor_failure' && success_rate > 0; + const isWarningError = + type === 'non_additive_processor_failure' || + (type === 'generic_processor_failure' && success_rate > 0); return { color: isWarningError ? 'warning' : 'danger', diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_type_selector.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_type_selector.tsx index a0d436003aef..01e765ccfa84 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_type_selector.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_type_selector.tsx @@ -12,7 +12,7 @@ import { FormattedMessage } from '@kbn/i18n-react'; import { useController, useFormContext, useWatch } from 'react-hook-form'; import { ProcessorType } from '@kbn/streams-schema'; import { useKibana } from '../../../../hooks/use_kibana'; -import { getDefaultFormState } from '../utils'; +import { getDefaultFormStateByType } from '../utils'; import { ProcessorFormState } from '../types'; interface TAvailableProcessor { @@ -38,7 +38,7 @@ export const ProcessorTypeSelector = ({ const processorType = useWatch<{ type: ProcessorType }>({ name: 'type' }); const handleChange = (type: ProcessorType) => { - const formState = getDefaultFormState(type); + const formState = getDefaultFormStateByType(type); reset(formState); }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors_list.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors_list.tsx index 5334fc95403e..35b19a1f5734 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors_list.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors_list.tsx @@ -10,20 +10,19 @@ import { EuiDraggable } from '@elastic/eui'; import { EditProcessorPanel, type EditProcessorPanelProps } from './processors'; export const DraggableProcessorListItem = ({ - processor, idx, ...props }: EditProcessorPanelProps & { idx: number }) => ( - {() => } + {() => } ); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulation_playground.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulation_playground.tsx index bd38edb95efe..11dab5583283 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulation_playground.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulation_playground.tsx @@ -5,83 +5,56 @@ * 2.0. */ -import React, { useState } from 'react'; +import React from 'react'; import { i18n } from '@kbn/i18n'; import { EuiFlexItem, EuiSpacer, EuiTab, EuiTabs } from '@elastic/eui'; -import { IngestStreamGetResponse, isWiredStreamGetResponse } from '@kbn/streams-schema'; +import { isWiredStreamGetResponse } from '@kbn/streams-schema'; import { ProcessorOutcomePreview } from './processor_outcome_preview'; -import { TableColumn, UseProcessingSimulatorReturn } from './hooks/use_processing_simulator'; +import { + useStreamEnrichmentEvents, + useStreamsEnrichmentSelector, +} from './state_management/stream_enrichment_state_machine'; -interface SimulationPlaygroundProps { - definition: IngestStreamGetResponse; - columns: TableColumn[]; - isLoading: UseProcessingSimulatorReturn['isLoading']; - simulation: UseProcessingSimulatorReturn['simulation']; - filteredSamples: UseProcessingSimulatorReturn['filteredSamples']; - selectedDocsFilter: UseProcessingSimulatorReturn['selectedDocsFilter']; - setSelectedDocsFilter: UseProcessingSimulatorReturn['setSelectedDocsFilter']; - onRefreshSamples: UseProcessingSimulatorReturn['refreshSamples']; -} +export const SimulationPlayground = () => { + const isViewingDataPreview = useStreamsEnrichmentSelector((state) => + state.matches({ + ready: { enrichment: { displayingSimulation: 'viewDataPreview' } }, + }) + ); + const isViewingDetectedFields = useStreamsEnrichmentSelector((state) => + state.matches({ + ready: { enrichment: { displayingSimulation: 'viewDetectedFields' } }, + }) + ); + const canViewDetectedFields = useStreamsEnrichmentSelector((state) => + isWiredStreamGetResponse(state.context.definition) + ); -export const SimulationPlayground = (props: SimulationPlaygroundProps) => { - const { - definition, - columns, - isLoading, - simulation, - filteredSamples, - onRefreshSamples, - setSelectedDocsFilter, - selectedDocsFilter, - } = props; - - const tabs = { - dataPreview: { - name: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.dataPreview', - { defaultMessage: 'Data preview' } - ), - }, - ...(isWiredStreamGetResponse(definition) && { - detectedFields: { - name: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFields', - { defaultMessage: 'Detected fields' } - ), - }, - }), - } as const; - - const [selectedTabId, setSelectedTabId] = useState('dataPreview'); + const { viewSimulationPreviewData, viewSimulationDetectedFields } = useStreamEnrichmentEvents(); return ( <> - {Object.entries(tabs).map(([tabId, tab]) => ( - setSelectedTabId(tabId as keyof typeof tabs)} - > - {tab.name} + + {i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.dataPreview', + { defaultMessage: 'Data preview' } + )} + + {canViewDetectedFields && ( + + {i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFields', + { defaultMessage: 'Detected fields' } + )} - ))} + )} - {selectedTabId === 'dataPreview' && ( - - )} - {selectedTabId === 'detectedFields' && + {isViewingDataPreview && } + {isViewingDetectedFields && i18n.translate('xpack.streams.simulationPlayground.div.detectedFieldsLabel', { defaultMessage: 'WIP', })} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulator_context.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulator_context.tsx deleted file mode 100644 index 57a9d9cf3388..000000000000 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulator_context.tsx +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import React, { useMemo } from 'react'; -import { createContext } from 'react'; -import { IngestStreamGetResponse } from '@kbn/streams-schema'; -import { UseProcessingSimulatorReturn } from './hooks/use_processing_simulator'; - -export const context = createContext(undefined); - -export interface SimulatorContextValue extends UseProcessingSimulatorReturn { - definition: IngestStreamGetResponse; -} - -export function SimulatorContextProvider({ - processingSimulator, - definition, - - children, -}: { - processingSimulator: UseProcessingSimulatorReturn; - definition: IngestStreamGetResponse; - children: React.ReactNode; -}) { - const contextValue = useMemo(() => { - return { - definition, - ...processingSimulator, - }; - }, [definition, processingSimulator]); - return {children}; -} - -export function useSimulatorContext() { - const ctx = React.useContext(context); - if (!ctx) { - throw new Error( - 'useStreamsEnrichmentContext must be used within a StreamsEnrichmentContextProvider' - ); - } - return ctx; -} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/index.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/index.ts new file mode 100644 index 000000000000..11bed7d7d2f2 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './processor_state_machine'; +export * from './types'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/processor_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/processor_state_machine.ts new file mode 100644 index 000000000000..e14ab48b60a7 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/processor_state_machine.ts @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { ActorRefFrom, assign, emit, forwardTo, sendTo, setup } from 'xstate5'; +import { isEqual } from 'lodash'; +import { ProcessorDefinition, getProcessorType } from '@kbn/streams-schema'; +import { ProcessorInput, ProcessorContext, ProcessorEvent, ProcessorEmittedEvent } from './types'; + +export type ProcessorActorRef = ActorRefFrom; + +export const processorMachine = setup({ + types: { + input: {} as ProcessorInput, + context: {} as ProcessorContext, + events: {} as ProcessorEvent, + emitted: {} as ProcessorEmittedEvent, + }, + actions: { + changeProcessor: assign(({ context }, params: { processor: ProcessorDefinition }) => ({ + processor: { + id: context.processor.id, + type: getProcessorType(params.processor), + ...params.processor, + }, + })), + resetToPrevious: assign(({ context }) => ({ + processor: context.previousProcessor, + })), + markAsUpdated: assign(({ context }) => ({ + previousProcessor: context.processor, + isUpdated: true, + })), + forwardEventToParent: forwardTo(({ context }) => context.parentRef), + forwardChangeEventToParent: sendTo( + ({ context }) => context.parentRef, + ({ context }) => ({ + type: 'processor.change', + id: context.processor.id, + }) + ), + notifyProcessorDelete: sendTo( + ({ context }) => context.parentRef, + ({ context }) => ({ + type: 'processor.delete', + id: context.processor.id, + }) + ), + emitChangesDiscarded: emit({ type: 'processor.changesDiscarded' }), + }, + guards: { + isDraft: ({ context }) => context.isNew, + hasEditingChanges: ({ context }) => !isEqual(context.previousProcessor, context.processor), + }, +}).createMachine({ + /** @xstate-layout N4IgpgJg5mDOIC5QAcBOB7AxnW7UDoBXAO1TnQBsA3SAYgG0AGAXURXVgEsAXT9YtiAAeiAMz4AbACYJAFgDsATkWN5jCfNETRUgDQgAnogCM8gBz4ArMbOjjsxossalAX1f60WHHiKly1HT0xqxIIMgcPHwCYSIIUrKW+IxSlimpEtrOorL6RgimFta2jLLKMlJSivLunhjYsLgEEKgAhgBm3PiQUcRQtF4NTfiw3K0wTKHsXLz8gnGVsviilopmxpWMxnYSeYiJ4hLV8kpSovLGpme14fU+zW2d3RC9-YP3+JitxNgUk4IRGbReaIRT4RSVUTrMpmI7yByWPYICSlfC2RSiHSMSzyKo1Dy3byNXwtDpdHq8PoDO7EgiYAAW3wmLABkVmMVAcQAtNtGOC7NYqpYpIxHCckVolkdZMZnBoUvJLGZLDd3rTPvx2pwoIQyBB8JwIBQwNSicMKf8woConNYohpBJwZopOszGZZEdFMYJal8IqUplTMUpMZVTThphNdrdZBnjw45S3uHfIRkBBWtwwJbpjaOcJ7VDwcYvfI5KkVhIzEiDpJjqdzpdcWGzb5I8QtTq9QmE5wqWqI99ftnwmzgXaEG7HbI7FD7NKId7DIgqnydJZrKVKhJ7GZm0NW1HO7GKT2+8nmmBjZnh9b2SCEJjjH715iZVpMW6kfC+TJZZdjJWZSKHuHxth2Mb6ieFK9kmLZ0oyfRZiyVqjranKIGYLr4KkLpmBiphQkcSJmFsfpKE4UjmCKGLuASxDoBAcAAuerJAmh+YIFylQWIksJQgotjGIwUJfksEJbDkKzFriOQgeqJBkLggQQKxub3jyJH4Lx2juuYdjCVWS4FBcyQqCGIYnMoSoqgS-Yko83CqXe448uY-J-kKIpivISKUfI+DFjKsjTmUlarHJwykk80F9E5Y7ocifKyBUiruvpsi2NWKgBWkbprKIGLSFIEUHu20Z6nF7FxDiFjVDorrup6i75ABUhaSKc4aBklQ2XUcEamVR76oaxqVXmcQnEkthKMKUINuKRkJGC4n2KoZhqMlmIlXSh4QQmY33pkYJaC6Xp4phOhIhIwp+tulgrDIWyndtA3gV2UEvImB3jhc-knXhGzVBdehGd+2F3f+gHKC9jFXpA30JVyFi2GkKzVPd8JQiD+S4kkXp4dkQmlIktGuEAA */ + id: 'processor', + context: ({ input }) => ({ + parentRef: input.parentRef, + previousProcessor: input.processor, + processor: input.processor, + isNew: input.isNew ?? false, + }), + initial: 'unresolved', + states: { + unresolved: { + always: [{ target: 'draft', guard: 'isDraft' }, { target: 'configured' }], + }, + draft: { + initial: 'editing', + states: { + editing: { + on: { + 'processor.stage': { + target: '#configured', + actions: [{ type: 'markAsUpdated' }, { type: 'forwardEventToParent' }], + }, + 'processor.cancel': { + target: '#deleted', + actions: [{ type: 'resetToPrevious' }], + }, + 'processor.change': { + actions: [ + { type: 'changeProcessor', params: ({ event }) => event }, + { type: 'forwardChangeEventToParent' }, + ], + }, + }, + }, + }, + }, + configured: { + id: 'configured', + initial: 'idle', + states: { + idle: { + on: { 'processor.edit': 'edit' }, + }, + edit: { + initial: 'editing', + states: { + editing: { + on: { + 'processor.update': { + guard: 'hasEditingChanges', + target: '#configured.idle', + actions: [{ type: 'markAsUpdated' }, { type: 'forwardEventToParent' }], + }, + 'processor.cancel': { + target: '#configured.idle', + actions: [ + { type: 'emitChangesDiscarded' }, + { type: 'resetToPrevious' }, + { type: 'forwardEventToParent' }, + ], + }, + 'processor.delete': '#deleted', + 'processor.change': { + actions: [ + { type: 'changeProcessor', params: ({ event }) => event }, + { type: 'forwardChangeEventToParent' }, + ], + }, + }, + }, + }, + }, + }, + }, + deleted: { + id: 'deleted', + type: 'final', + entry: [{ type: 'notifyProcessorDelete' }], + }, + }, +}); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/types.ts new file mode 100644 index 000000000000..d6bb62889f17 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/types.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ProcessorDefinition } from '@kbn/streams-schema'; +import { ActorRef, Snapshot } from 'xstate5'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; + +export type ProcessorToParentEvent = + | { type: 'processor.change'; id: string } + | { type: 'processor.delete'; id: string } + | { type: 'processor.stage' }; + +export interface ProcessorInput { + parentRef: ProcessorParentActor; + processor: ProcessorDefinitionWithUIAttributes; + isNew?: boolean; +} + +export type ProcessorParentActor = ActorRef, ProcessorToParentEvent>; + +export interface ProcessorContext { + parentRef: ProcessorParentActor; + previousProcessor: ProcessorDefinitionWithUIAttributes; + processor: ProcessorDefinitionWithUIAttributes; + isNew: boolean; + isUpdated?: boolean; +} + +export type ProcessorEvent = + | { type: 'processor.cancel' } + | { type: 'processor.change'; processor: ProcessorDefinition } + | { type: 'processor.delete' } + | { type: 'processor.edit' } + | { type: 'processor.stage' } + | { type: 'processor.update' }; + +export interface ProcessorEmittedEvent { + type: 'processor.changesDiscarded'; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/index.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/index.ts new file mode 100644 index 000000000000..2e33f97d5674 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './preview_docs_filter'; +export * from './simulation_state_machine'; +export * from './types'; +export * from './utils'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/preview_docs_filter.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/preview_docs_filter.ts new file mode 100644 index 000000000000..f27ad9078283 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/preview_docs_filter.ts @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { i18n } from '@kbn/i18n'; + +export const previewDocsFilterOptions = { + outcome_filter_all: { + id: 'outcome_filter_all', + label: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.all', + { defaultMessage: 'All samples' } + ), + }, + outcome_filter_matched: { + id: 'outcome_filter_matched', + label: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.matched', + { defaultMessage: 'Matched' } + ), + }, + outcome_filter_unmatched: { + id: 'outcome_filter_unmatched', + label: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.unmatched', + { defaultMessage: 'Unmatched' } + ), + }, +} as const; + +export type PreviewDocsFilterOption = keyof typeof previewDocsFilterOptions; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/samples_fetcher_actor.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/samples_fetcher_actor.ts new file mode 100644 index 000000000000..9b4cee2ad2ce --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/samples_fetcher_actor.ts @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { i18n } from '@kbn/i18n'; +import { flattenObjectNestedLast } from '@kbn/object-utils'; +import { Condition, FlattenRecord } from '@kbn/streams-schema'; +import { fromPromise, ErrorActorEvent } from 'xstate5'; +import { errors as esErrors } from '@elastic/elasticsearch'; +import { DateRangeContext } from '../../../../../state_management/date_range_state_machine'; +import { SimulationMachineDeps } from './types'; + +export interface SamplesFetchInput { + condition?: Condition; + streamName: string; + absoluteTimeRange: DateRangeContext['absoluteTimeRange']; +} + +export function createSamplesFetchActor({ + streamsRepositoryClient, +}: Pick) { + return fromPromise(async ({ input, signal }) => { + const samplesBody = await streamsRepositoryClient.fetch('POST /api/streams/{name}/_sample', { + signal, + params: { + path: { name: input.streamName }, + body: { + if: input.condition, + start: input.absoluteTimeRange.start, + end: input.absoluteTimeRange.end, + size: 100, + }, + }, + }); + + return samplesBody.documents.map(flattenObjectNestedLast) as FlattenRecord[]; + }); +} + +export function createSamplesFetchFailureNofitier({ + toasts, +}: Pick) { + return (params: { event: unknown }) => { + const event = params.event as ErrorActorEvent; + toasts.addError(new Error(event.error.body.message), { + title: i18n.translate('xpack.streams.enrichment.simulation.samplesFetchError', { + defaultMessage: 'An issue occurred retrieving samples.', + }), + toastMessage: event.error.body.message, + }); + }; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_runner_actor.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_runner_actor.ts new file mode 100644 index 000000000000..f2e691022805 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_runner_actor.ts @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { i18n } from '@kbn/i18n'; +import { FlattenRecord } from '@kbn/streams-schema'; +import { fromPromise, ErrorActorEvent } from 'xstate5'; +import { errors as esErrors } from '@elastic/elasticsearch'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; +import { processorConverter } from '../../utils'; +import { Simulation, SimulationMachineDeps } from './types'; + +export interface SimulationRunnerInput { + streamName: string; + documents: FlattenRecord[]; + processors: ProcessorDefinitionWithUIAttributes[]; +} + +export function createSimulationRunnerActor({ + streamsRepositoryClient, +}: Pick) { + return fromPromise(({ input, signal }) => + streamsRepositoryClient.fetch('POST /api/streams/{name}/processing/_simulate', { + signal, + params: { + path: { name: input.streamName }, + body: { + documents: input.documents, + processing: input.processors.map(processorConverter.toSimulateDefinition), + }, + }, + }) + ); +} + +export function createSimulationRunFailureNofitier({ + toasts, +}: Pick) { + return (params: { event: unknown }) => { + const event = params.event as ErrorActorEvent; + toasts.addError(new Error(event.error.body.message), { + title: i18n.translate('xpack.streams.enrichment.simulation.simulationRunError', { + defaultMessage: 'An issue occurred running the simulation.', + }), + toastMessage: event.error.body.message, + }); + }; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts new file mode 100644 index 000000000000..c250e35ef2c4 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts @@ -0,0 +1,280 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { ActorRefFrom, MachineImplementationsFrom, SnapshotFrom, assign, setup } from 'xstate5'; +import { getPlaceholderFor } from '@kbn/xstate-utils'; +import { FlattenRecord, isSchema, processorDefinitionSchema } from '@kbn/streams-schema'; +import { isEmpty, isEqual } from 'lodash'; +import { + dateRangeMachine, + createDateRangeMachineImplementations, +} from '../../../../../state_management/date_range_state_machine'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; +import { processorConverter } from '../../utils'; +import { + SimulationInput, + SimulationContext, + SimulationEvent, + Simulation, + SimulationMachineDeps, +} from './types'; +import { PreviewDocsFilterOption } from './preview_docs_filter'; +import { + createSamplesFetchActor, + createSamplesFetchFailureNofitier, +} from './samples_fetcher_actor'; +import { + createSimulationRunnerActor, + createSimulationRunFailureNofitier, +} from './simulation_runner_actor'; +import { filterSimulationDocuments, composeSamplingCondition } from './utils'; + +export type SimulationActorRef = ActorRefFrom; +export type SimulationActorSnapshot = SnapshotFrom; +export interface ProcessorEventParams { + processors: ProcessorDefinitionWithUIAttributes[]; +} + +const hasSamples = (samples: FlattenRecord[]) => !isEmpty(samples); + +const isValidProcessor = (processor: ProcessorDefinitionWithUIAttributes) => + isSchema(processorDefinitionSchema, processorConverter.toAPIDefinition(processor)); +const hasValidProcessors = (processors: ProcessorDefinitionWithUIAttributes[]) => + processors.every(isValidProcessor); + +export const simulationMachine = setup({ + types: { + input: {} as SimulationInput, + context: {} as SimulationContext, + events: {} as SimulationEvent, + }, + actors: { + fetchSamples: getPlaceholderFor(createSamplesFetchActor), + runSimulation: getPlaceholderFor(createSimulationRunnerActor), + dateRangeMachine: getPlaceholderFor(() => dateRangeMachine), + }, + actions: { + notifySamplesFetchFailure: getPlaceholderFor(createSamplesFetchFailureNofitier), + notifySimulationRunFailure: getPlaceholderFor(createSimulationRunFailureNofitier), + storeTimeUpdated: getPlaceholderFor(createSimulationRunFailureNofitier), + storePreviewDocsFilter: assign((_, params: { filter: PreviewDocsFilterOption }) => ({ + previewDocsFilter: params.filter, + })), + storeProcessors: assign((_, params: ProcessorEventParams) => ({ + processors: params.processors, + })), + storeSamples: assign((_, params: { samples: FlattenRecord[] }) => ({ + samples: params.samples, + })), + storeSimulation: assign((_, params: { simulation: Simulation | undefined }) => ({ + simulation: params.simulation, + })), + derivePreviewDocuments: assign(({ context }) => { + return { + previewDocuments: context.simulation + ? filterSimulationDocuments(context.simulation.documents, context.previewDocsFilter) + : context.samples, + }; + }), + deriveSamplingCondition: assign(({ context }) => ({ + samplingCondition: composeSamplingCondition(context.processors), + })), + resetSimulation: assign({ + processors: [], + simulation: undefined, + samplingCondition: composeSamplingCondition([]), + previewDocsFilter: 'outcome_filter_all', + }), + }, + delays: { + debounceTime: 800, + }, + guards: { + canSimulate: ({ context }, params: ProcessorEventParams) => + hasSamples(context.samples) && hasValidProcessors(params.processors), + hasProcessors: (_, params: ProcessorEventParams) => !isEmpty(params.processors), + hasSamples: ({ context }) => hasSamples(context.samples), + hasValidProcessors: (_, params: ProcessorEventParams) => hasValidProcessors(params.processors), + shouldRefetchSamples: ({ context }) => + Boolean( + context.samplingCondition && + !isEqual(context.samplingCondition, composeSamplingCondition(context.processors)) + ), + }, +}).createMachine({ + /** @xstate-layout N4IgpgJg5mDOIC5SwJYFsCuAbAhgFxQHsA7AYgnzACUdiYA6DABwrzAG0AGAXUVCcKoCJPiAAeiAIwB2AGwAWetICsnTgA5Js2cuUBOZQCYANCACeiALSTOh+rfUBmWdL3TpzzkYC+306kxcYTIA7HwiYnoAYwALWhgABQAnMAA3FDAAdwARQijYADEULDYkrl4kEAEhCNEJBFlHSXplFXVDPRl1ZVlNUwsES2VJR3oRw0dDeXUXR04ZX390MODSUKCI+hTYMDxy0WqUYLrEKdcx9T15ecNJPVlO6X6rQ3V6SddDdzd5K-llRyLEDrcIkUhMJJ5OCwQhJWD0ABU+0qh2OlXqkhmikc8kkhmU6hm+n08meg3xzU4-3aUy0kmmyiBINWEKhsBhSWitCiYCwyP4giOtXRpxGynoBhmhkMsipLlJ5isb2U13u8mk6qmRl0TOWGzBrJ57Nh0TidA4PAOgrRoHqhL07w6elpBL0GjJlkcDoBvWkcr0k0ahl1gVBZEN0JNEF5uwtFQFNREIoQ0oU9GmOlusr9KrJNkULlkmLdk2l6g0IZWEXBkKNHPo0awsfYknjVWtwttpzmdnkuNubk6KscZL7dkL8g+-3V0hllf1kRQxCFOCwKAAXkuoKR+e3E8QTggRo43r9NGOs6m87J6L0epxnZcFPIOvOw-Qlyu15u6DvW1b90PMVmnUdwvC8GxnSuMkXCUSCvE6B4mimN9ggbMAACNCAwYgoi3ABhM0YFgGs2XrWJ4jjAChSTLsEF+aQJXuTgNT0J9Og9HF7FVV4Az9RwCVkVDNmjLCcLwuhCMokixFgPBKHoHAADNSgAClE7DcLAAAVdAwAASjWPV3w08SCKIuBd1RTtxCkWwQJfRxpFAjwNUmPNOBvGQix0dUNHuV5hJIdCxNw8zpNIWT5LYRSVLAJJ1MwzSeV0tADKM0M0NMsLJIs2AWzbazaNso8WJvbR-leTRhlkExFRTXp7G0bQOmkcZXCEvxgWMtCsEIHAIC3ABlHA0CYJsSIgEgwA-YhUkIABrGbYFG8a4AKXZYnigBBKI8FhKyO2KjEVE4JrGnmRwvSuiYPLsVRJ1eB9XAEq4gsiPqBuG1aJtIeLIU5cb8CU2E0HoFaxomja8C2pJdv2spLRRI6D2TQx5jeSQeh6X4XxkBUBhse6qUmctB1e+R3sU9l4oIOghp6iIqDAABHDAUBSNLiDwEjDsA5N6SaehbhYr08SlOqBiLM7CU8kYri+Nr1CpnAaaSOmoAZzKmdZ9nObAbnef-ZH+bowWzvslVZVuFVMRgnRhZmOR3AZS6qaSHDl3pxmwSm4gZqXealvBn3iCoT2dr2g6kYTGjUbouQb1UVQ5g8IwVFkUdJiUUDdCcAE3Wmd3PeG0O-qSAH6CBvAQaSMHmSZiO4ajxHCpRw9nbGDx5kHbQvT0MkCUYrHrnkHR3C0QEgWIQho3gSoG+Kor45KvsHT0ZjWPYyROJPFpqSxhRug1Tgp6WbXgs-AhVw3LdqJtEqxRvOZBJUMeRnUAmlRvNr+-+XRMTlmkFTFAEAmz3xsvUQs9BP7qjxP8DeFIyQakUAhF2fxdAbyptlCSUApLmnnrHB+doRhKF6P8ekWhuh23qpILG6YaqT0QqfZ0VNPqDXpj9OAEDjpSF6M0CYk41StC9EYD01wmq2FlAGB4rw+yUy6ovSIqsdjq1LhfMOusOZgC5jzHhK8MQPAdNoDebhpTox6E8eqXxRi3BGM5U+E8R7F2IF7TWod9FAVsGdAk-9nSunlIPboYw5jZjmE5ekytfDeCAA */ + id: 'simulation', + context: ({ input, self, spawn }) => ({ + dateRangeRef: spawn('dateRangeMachine', { + id: 'dateRange', + input: { + parentRef: self, + }, + }), + previewDocsFilter: 'outcome_filter_all', + previewDocuments: [], + processors: input.processors, + samples: [], + samplingCondition: composeSamplingCondition(input.processors), + streamName: input.streamName, + }), + initial: 'initializing', + on: { + 'dateRange.update': '.loadingSamples', + 'simulation.changePreviewDocsFilter': { + actions: [ + { type: 'storePreviewDocsFilter', params: ({ event }) => event }, + { type: 'derivePreviewDocuments' }, + ], + }, + 'simulation.reset': { + target: '.idle', + actions: [{ type: 'resetSimulation' }, { type: 'derivePreviewDocuments' }], + }, + // Handle adding/reordering processors + 'processors.*': { + target: '.assertingSimulationRequirements', + actions: [{ type: 'storeProcessors', params: ({ event }) => event }], + }, + 'processor.cancel': { + target: '.assertingSimulationRequirements', + actions: [{ type: 'storeProcessors', params: ({ event }) => event }], + }, + 'processor.change': { + target: '.debouncingChanges', + actions: [{ type: 'storeProcessors', params: ({ event }) => event }], + }, + 'processor.delete': [ + { + guard: { + type: 'hasProcessors', + params: ({ event }) => ({ processors: event.processors }), + }, + target: '.assertingSimulationRequirements', + actions: [{ type: 'storeProcessors', params: ({ event }) => event }], + }, + { + target: '.idle', + actions: [{ type: 'resetSimulation' }, { type: 'derivePreviewDocuments' }], + }, + ], + }, + states: { + initializing: { + always: [ + { + guard: { + type: 'hasProcessors', + params: ({ context }) => ({ processors: context.processors }), + }, + target: 'loadingSamples', + }, + { target: 'idle' }, + ], + }, + + idle: {}, + + debouncingChanges: { + on: { + 'processor.change': { + target: 'debouncingChanges', + actions: [{ type: 'storeProcessors', params: ({ event }) => event }], + description: 'Re-enter debouncing state and reinitialize the delayed processing.', + reenter: true, + }, + }, + after: { + debounceTime: [ + { + guard: 'shouldRefetchSamples', + target: 'loadingSamples', + actions: [{ type: 'deriveSamplingCondition' }], + }, + { target: 'assertingSimulationRequirements' }, + ], + }, + }, + + loadingSamples: { + invoke: { + id: 'samplesFetcherActor', + src: 'fetchSamples', + input: ({ context }) => ({ + condition: context.samplingCondition, + streamName: context.streamName, + absoluteTimeRange: context.dateRangeRef.getSnapshot().context.absoluteTimeRange, + }), + onDone: { + target: 'assertingSimulationRequirements', + actions: [ + { type: 'storeSamples', params: ({ event }) => ({ samples: event.output }) }, + { type: 'derivePreviewDocuments' }, + ], + }, + onError: { + target: 'idle', + actions: [ + { type: 'storeSamples', params: () => ({ samples: [] }) }, + { type: 'notifySamplesFetchFailure' }, + ], + }, + }, + }, + + assertingSimulationRequirements: { + always: [ + { + guard: { + type: 'canSimulate', + params: ({ context }) => ({ processors: context.processors }), + }, + target: 'runningSimulation', + }, + { target: 'idle' }, + ], + }, + + runningSimulation: { + invoke: { + id: 'simulationRunnerActor', + src: 'runSimulation', + input: ({ context }) => ({ + streamName: context.streamName, + documents: context.samples, + processors: context.processors, + }), + onDone: { + target: 'idle', + actions: [ + { type: 'storeSimulation', params: ({ event }) => ({ simulation: event.output }) }, + { type: 'derivePreviewDocuments' }, + ], + }, + onError: { + target: 'idle', + actions: [{ type: 'notifySimulationRunFailure' }], + }, + }, + }, + }, +}); + +export const createSimulationMachineImplementations = ({ + data, + streamsRepositoryClient, + toasts, +}: SimulationMachineDeps): MachineImplementationsFrom => ({ + actors: { + fetchSamples: createSamplesFetchActor({ streamsRepositoryClient }), + runSimulation: createSimulationRunnerActor({ streamsRepositoryClient }), + dateRangeMachine: dateRangeMachine.provide(createDateRangeMachineImplementations({ data })), + }, + actions: { + notifySamplesFetchFailure: createSamplesFetchFailureNofitier({ toasts }), + notifySimulationRunFailure: createSimulationRunFailureNofitier({ toasts }), + }, +}); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/types.ts new file mode 100644 index 000000000000..1aedbd31b363 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/types.ts @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Condition, FlattenRecord } from '@kbn/streams-schema'; +import { APIReturnType, StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; +import { IToasts } from '@kbn/core/public'; +import { DataPublicPluginStart } from '@kbn/data-plugin/public'; +import { + DateRangeToParentEvent, + DateRangeActorRef, +} from '../../../../../state_management/date_range_state_machine'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; +import { PreviewDocsFilterOption } from './preview_docs_filter'; + +export type Simulation = APIReturnType<'POST /api/streams/{name}/processing/_simulate'>; + +export interface SimulationMachineDeps { + data: DataPublicPluginStart; + streamsRepositoryClient: StreamsRepositoryClient; + toasts: IToasts; +} + +export type ProcessorMetrics = + Simulation['processors_metrics'][keyof Simulation['processors_metrics']]; + +export interface SimulationInput { + processors: ProcessorDefinitionWithUIAttributes[]; + streamName: string; +} + +export type SimulationEvent = + | DateRangeToParentEvent + | { type: 'simulation.changePreviewDocsFilter'; filter: PreviewDocsFilterOption } + | { type: 'simulation.reset' } + | { type: 'processors.add'; processors: ProcessorDefinitionWithUIAttributes[] } + | { type: 'processor.cancel'; processors: ProcessorDefinitionWithUIAttributes[] } + | { type: 'processor.change'; processors: ProcessorDefinitionWithUIAttributes[] } + | { type: 'processor.delete'; processors: ProcessorDefinitionWithUIAttributes[] }; + +export interface SimulationContext { + dateRangeRef: DateRangeActorRef; + previewDocsFilter: PreviewDocsFilterOption; + previewDocuments: FlattenRecord[]; + processors: ProcessorDefinitionWithUIAttributes[]; + samples: FlattenRecord[]; + samplingCondition?: Condition; + simulation?: Simulation; + streamName: string; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts new file mode 100644 index 000000000000..964261c568b3 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Condition, UnaryOperator, getProcessorConfig } from '@kbn/streams-schema'; +import { isEmpty, uniq } from 'lodash'; +import { ALWAYS_CONDITION } from '../../../../../util/condition'; +import { ProcessorDefinitionWithUIAttributes, DetectedField } from '../../types'; +import { PreviewDocsFilterOption } from './preview_docs_filter'; +import { Simulation } from './types'; + +export function composeSamplingCondition( + processors: ProcessorDefinitionWithUIAttributes[] +): Condition | undefined { + if (isEmpty(processors)) { + return undefined; + } + + const uniqueFields = uniq(getSourceFields(processors)); + + if (isEmpty(uniqueFields)) { + return ALWAYS_CONDITION; + } + + const conditions = uniqueFields.map((field) => ({ + field, + operator: 'exists' as UnaryOperator, + })); + + return { or: conditions }; +} + +export function getSourceFields(processors: ProcessorDefinitionWithUIAttributes[]): string[] { + return processors.map((processor) => getProcessorConfig(processor).field.trim()).filter(Boolean); +} + +export function getTableColumns( + processors: ProcessorDefinitionWithUIAttributes[], + fields: DetectedField[], + filter: PreviewDocsFilterOption +) { + const uniqueProcessorsFields = uniq(getSourceFields(processors)); + + if (filter === 'outcome_filter_unmatched') { + return uniqueProcessorsFields; + } + + const uniqueDetectedFields = uniq(fields.map((field) => field.name)); + + return uniq([...uniqueProcessorsFields, ...uniqueDetectedFields]); +} + +export function filterSimulationDocuments( + documents: Simulation['documents'], + filter: PreviewDocsFilterOption +) { + switch (filter) { + case 'outcome_filter_matched': + return documents.filter((doc) => doc.status === 'parsed').map((doc) => doc.value); + case 'outcome_filter_unmatched': + return documents.filter((doc) => doc.status !== 'parsed').map((doc) => doc.value); + case 'outcome_filter_all': + default: + return documents.map((doc) => doc.value); + } +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/index.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/index.ts new file mode 100644 index 000000000000..c9d28a5e5cd3 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './use_stream_enrichment'; +export * from './types'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts new file mode 100644 index 000000000000..f814dc958d83 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts @@ -0,0 +1,354 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { + MachineImplementationsFrom, + assign, + forwardTo, + not, + setup, + sendTo, + stopChild, + and, + ActorRefFrom, +} from 'xstate5'; +import { getPlaceholderFor } from '@kbn/xstate-utils'; +import { + IngestStreamGetResponse, + isRootStreamDefinition, + isWiredStreamGetResponse, +} from '@kbn/streams-schema'; +import { htmlIdGenerator } from '@elastic/eui'; +import { + StreamEnrichmentContext, + StreamEnrichmentEvent, + StreamEnrichmentInput, + StreamEnrichmentServiceDependencies, +} from './types'; +import { processorConverter } from '../../utils'; +import { + createUpsertStreamActor, + createUpsertStreamFailureNofitier, + createUpsertStreamSuccessNofitier, +} from './upsert_stream_actor'; + +import { ProcessorDefinitionWithUIAttributes } from '../../types'; +import { + simulationMachine, + createSimulationMachineImplementations, +} from '../simulation_state_machine'; +import { processorMachine, ProcessorActorRef } from '../processor_state_machine'; + +const createId = htmlIdGenerator(); + +export type StreamEnrichmentActorRef = ActorRefFrom; + +export const streamEnrichmentMachine = setup({ + types: { + input: {} as StreamEnrichmentInput, + context: {} as StreamEnrichmentContext, + events: {} as StreamEnrichmentEvent, + }, + actors: { + upsertStream: getPlaceholderFor(createUpsertStreamActor), + processorMachine: getPlaceholderFor(() => processorMachine), + simulationMachine: getPlaceholderFor(() => simulationMachine), + }, + actions: { + spawnSimulationMachine: assign(({ context, spawn }) => ({ + simulatorRef: + context.simulatorRef || + spawn('simulationMachine', { + id: 'simulator', + input: { + processors: getStagedProcessors(context), + streamName: context.definition.stream.name, + }, + }), + })), + notifyUpsertStreamSuccess: getPlaceholderFor(createUpsertStreamSuccessNofitier), + notifyUpsertStreamFailure: getPlaceholderFor(createUpsertStreamFailureNofitier), + refreshDefinition: () => {}, + storeDefinition: assign((_, params: { definition: IngestStreamGetResponse }) => ({ + definition: params.definition, + })), + stopProcessors: ({ context }) => context.processorsRefs.forEach(stopChild), + setupProcessors: assign(({ self, spawn }, params: { definition: IngestStreamGetResponse }) => { + const processorsRefs = params.definition.stream.ingest.processing.map((proc) => { + const processor = processorConverter.toUIDefinition(proc); + return spawn('processorMachine', { + id: processor.id, + input: { + parentRef: self, + processor, + }, + }); + }); + + return { + initialProcessorsRefs: processorsRefs, + processorsRefs, + }; + }), + addProcessor: assign( + ( + { context, spawn, self }, + { processor }: { processor: ProcessorDefinitionWithUIAttributes } + ) => { + const id = createId(); + return { + processorsRefs: context.processorsRefs.concat( + spawn('processorMachine', { + id, + input: { + parentRef: self, + processor: { ...processor, id }, + isNew: true, + }, + }) + ), + }; + } + ), + stopProcessor: stopChild((_, params: { id: string }) => params.id), + deleteProcessor: assign(({ context }, params: { id: string }) => ({ + processorsRefs: context.processorsRefs.filter((proc) => proc.id !== params.id), + })), + reorderProcessors: assign((_, params: { processorsRefs: ProcessorActorRef[] }) => ({ + processorsRefs: params.processorsRefs, + })), + reassignProcessors: assign(({ context }) => ({ + processorsRefs: [...context.processorsRefs], + })), + forwardProcessorsEventToSimulator: sendTo( + 'simulator', + ({ context }, params: { type: StreamEnrichmentEvent['type'] }) => ({ + type: params.type, + processors: getStagedProcessors(context), + }) + ), + sendResetEventToSimulator: sendTo('simulator', { type: 'simulation.reset' }), + }, + guards: { + hasMultipleProcessors: ({ context }) => context.processorsRefs.length > 1, + hasStagedChanges: ({ context }) => { + const { initialProcessorsRefs, processorsRefs } = context; + return ( + // Deleted processors + initialProcessorsRefs.length !== processorsRefs.length || + // New/updated processors + processorsRefs.some((processorRef) => { + const state = processorRef.getSnapshot(); + return state.matches('configured') && state.context.isUpdated; + }) || + // Processor order changed + processorsRefs.some( + (processorRef, pos) => initialProcessorsRefs[pos]?.id !== processorRef.id + ) + ); + }, + hasPendingDraft: ({ context }) => + Boolean(context.processorsRefs.find((p) => p.getSnapshot().matches('draft'))), + '!hasPendingDraft': not('hasPendingDraft'), + canUpdateStream: and(['hasStagedChanges', '!hasPendingDraft']), + isStagedProcessor: ({ context }, params: { id: string }) => { + const processorRef = context.processorsRefs.find((p) => p.id === params.id); + + if (!processorRef) return false; + return processorRef.getSnapshot().context.isNew; + }, + isRootStream: ({ context }) => isRootStreamDefinition(context.definition.stream), + isWiredStream: ({ context }) => isWiredStreamGetResponse(context.definition), + }, +}).createMachine({ + /** @xstate-layout N4IgpgJg5mDOIC5RgHYCcCWBjAFgZQBc0wBDAWwDoMUMCMSAbDAL2qgGIBtABgF1FQABwD2sWhmEoBIAB6IAtAA4ATAE4K3AGzdlizQBYAjPoCsi7voA0IAJ4KAzIoomT3cwHYdixfferFAL4B1qiYuITE5FQ0dIwsbFyG-EggImJ0ktJyCErGzpqGqoaa7u7KmkWG1nYIyiYUytwmFX7mjZqK+kEh6Nj4RKSUkRA27LADUcRYYBgAbpA8yUKi4pkp2fJ13BSObib6Ksr6+o3u1QrHFO6Kqi6q+h0PBcrd4L3hE0OkIxTjkZQYCAMMBjT4UYiwMAERbSNKrKTrRDKPL6biqVTKTE6AyKExnWyIQz2PImZH7Y76Rz2VSvUJ9CKDcHfGy-MGA4Gg-4UACugggJAIYBhKThGQRoA23mUV3cRJMGMaWiK5wQxl0FBOJnsakMWmJD1p736XOGLL+jN5-LoKA4EEkYGis2EAGsHbzIWgCAzyABBLAEYRoYXLdIScWyBQuexXezcNyqdz6VSaeymFVE1TR-RlZSy3GGYp4w1hY2M02srmWgUJMBoNCBiiCBgCgBmgco7trXs+foDQb4sJWYqyDgLFEzufs2k0zWuePTqM044O2nMqh01y6wTeJe9XxIPzpuDIqAIFAgGFgTZINjYAAV69NYLBA7B2IJH3AX2hYBQDxBg1SIcwxHHJOm2bhSn2TxjmUex7BMFUp3cDVzCnApiR8SCTGLekwXLI8cBPFAzwvK9m1vG0H2EJ9vzfD8aK-V8mUDCBa0A0UQMRHJjiXLUmmpA5KQqQx8RqMwnG4QozDjbUzGKXCPhNZkKEI4jSMva9KKgajaNfd9P2fBs2OBQUOOAtYJQcfYdmg1wkzjbhqSQmzZOudxmiaAxDBw7dCL3JkDxZNTT3PTSKPvQy6IMxijLQVkSBgczQ0siMcmubZ-F8DzikaSlFBVeVo2gjpCk8dzAj8o0AoIo11LC8ibzYPAMDIblmzFGK9J-CgACpkvhUDUSueUigqfY0SkzQVXRPjvFlPF-BMYwtx6Xd8JUkKSIarTmta9rq0kLqmPi-qBxFCzw2yYaPPRYpblRdciRVTpoxW1c-EMNQ0UU0tJk2urQrI3abRatqOrDChZgwMAAHcABEBRIB8wGhuGxn2iHJChmGEahMB-UgAAxGGGAgWABuHbj5BcaVdSwtFTHKE4ZtuVDdQQ5oVFXX6aoBkt6uBiLQcxw6UBxuHEYIZHiDR2GMfBsWKFwEgbTAFG5fhmjYBJhhBX7JYgJSq6FB8fQKG8WNCng65iXsF6imcXMikcUlmkKXmNqC1TAe2oWmpFxWxQlvHBUJiASbAMm3zEIPIc1pGNdxymuKsnJ9mlMabj8eD7hudMtEMHZ3G1ODY2ze5fO3FBhDY+AUn8z5B2N0D5F1NQrgqOCvrlRRjBVNvtQaG5mljdcEPsT2uWocQ4lYG1m8G6n-CL9wu+JZEEL7+2CR47ZzDcRmNzqeDfLWvDlKCxeqbT+QEycS2pMzXwfGe3fdSXNyXALOpF0nqr1qXx+Oacg19U5pTvrGGUcoFRxhEumOCKEEylGxK0MwU8ywqRAQCIEYAwGpQ2JoOC44iGzTRCoL6Vh35yQ1KUdyGdFDuAwf9b22CeR8mrAvC6LdqYGHULKDyWoDCOFzAVd+Lh6hJlRFJbMahkyrR3BfTB3stoEHwSbHImZ95YWti-O2Kpyh8ScmvNe64mb-3PkpZRh5fYaUatpXSJ164hiXrfAokibglGTGibUjDCo6AtlzLQTkfK3DPooqxLCbECyBuFAOUAwYHRvkbVxECpQWx0c-W2b8ageRQhuPwiYiidANAApRUTgq2J2sLBJotg4J2lknOG6jW6lCXPBFa2ZZIVGmrvdEHiOYuA6OUNwzD9zRL6ILOJ2lElY3FprfG4dI7RxadTTE5tn5FUMHoTEahWYDKckM7mWgxlMhfAweYEAABKwhhDdn+KstxRxaGlFEo8PQLNd7yBOBoVwfh3ZTgsH4IIQQgA */ + id: 'enrichStream', + context: ({ input }) => ({ + definition: input.definition, + initialProcessorsRefs: [], + processorsRefs: [], + }), + initial: 'initializing', + states: { + initializing: { + always: [ + { + target: 'resolvedRootStream', + guard: 'isRootStream', + }, + { target: 'ready' }, + ], + }, + ready: { + id: 'ready', + type: 'parallel', + entry: [ + { type: 'stopProcessors' }, + { + type: 'setupProcessors', + params: ({ context }) => ({ definition: context.definition }), + }, + ], + on: { + 'stream.received': { + target: '#ready', + actions: [{ type: 'storeDefinition', params: ({ event }) => event }], + reenter: true, + }, + }, + states: { + stream: { + initial: 'idle', + states: { + idle: { + on: { + 'stream.reset': { + guard: 'hasStagedChanges', + target: '#ready', + actions: [{ type: 'sendResetEventToSimulator' }], + reenter: true, + }, + 'stream.update': { + guard: 'canUpdateStream', + actions: [{ type: 'sendResetEventToSimulator' }], + target: 'updating', + }, + }, + }, + updating: { + invoke: { + id: 'upsertStreamActor', + src: 'upsertStream', + input: ({ context }) => ({ + definition: context.definition, + processors: context.processorsRefs + .map((proc) => proc.getSnapshot()) + .filter((proc) => proc.matches('configured')) + .map((proc) => proc.context.processor), + fields: undefined, // TODO: implementing in follow-up PR + }), + onDone: { + target: 'idle', + actions: [{ type: 'notifyUpsertStreamSuccess' }, { type: 'refreshDefinition' }], + }, + onError: { + target: 'idle', + actions: [{ type: 'notifyUpsertStreamFailure' }], + }, + }, + }, + }, + }, + enrichment: { + type: 'parallel', + states: { + displayingProcessors: { + on: { + 'processors.add': { + guard: '!hasPendingDraft', + actions: [{ type: 'addProcessor', params: ({ event }) => event }], + }, + 'processors.reorder': { + guard: 'hasMultipleProcessors', + actions: [{ type: 'reorderProcessors', params: ({ event }) => event }], + }, + 'processor.delete': { + actions: [ + { type: 'stopProcessor', params: ({ event }) => event }, + { type: 'deleteProcessor', params: ({ event }) => event }, + ], + }, + 'processor.stage': { + actions: [{ type: 'reassignProcessors' }], + }, + 'processor.update': { + actions: [{ type: 'reassignProcessors' }], + }, + }, + }, + displayingSimulation: { + entry: [{ type: 'spawnSimulationMachine' }], + initial: 'viewDataPreview', + on: { + 'processor.change': { + guard: { type: 'isStagedProcessor', params: ({ event }) => event }, + actions: [ + { type: 'forwardProcessorsEventToSimulator', params: ({ event }) => event }, + ], + }, + 'processor.*': { + actions: [ + { type: 'forwardProcessorsEventToSimulator', params: ({ event }) => event }, + ], + }, + 'processors.*': { + actions: [ + { type: 'forwardProcessorsEventToSimulator', params: ({ event }) => event }, + ], + }, + }, + states: { + viewDataPreview: { + on: { + 'simulation.viewDetectedFields': 'viewDetectedFields', + 'simulation.changePreviewDocsFilter': { + actions: [forwardTo('simulator')], + }, + }, + }, + viewDetectedFields: { + on: { + 'simulation.viewDataPreview': 'viewDataPreview', + }, + }, + }, + }, + }, + }, + }, + }, + resolvedRootStream: { + type: 'final', + }, + }, +}); + +export const createStreamEnrichmentMachineImplementations = ({ + refreshDefinition, + streamsRepositoryClient, + core, + data, +}: StreamEnrichmentServiceDependencies): MachineImplementationsFrom< + typeof streamEnrichmentMachine +> => ({ + actors: { + upsertStream: createUpsertStreamActor({ streamsRepositoryClient }), + processorMachine, + simulationMachine: simulationMachine.provide( + createSimulationMachineImplementations({ + data, + streamsRepositoryClient, + toasts: core.notifications.toasts, + }) + ), + }, + actions: { + refreshDefinition, + notifyUpsertStreamSuccess: createUpsertStreamSuccessNofitier({ + toasts: core.notifications.toasts, + }), + notifyUpsertStreamFailure: createUpsertStreamFailureNofitier({ + toasts: core.notifications.toasts, + }), + }, +}); + +function getStagedProcessors(context: StreamEnrichmentContext) { + return context.processorsRefs + .map((proc) => proc.getSnapshot()) + .filter((proc) => proc.context.isNew) + .map((proc) => proc.context.processor); +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts new file mode 100644 index 000000000000..c335d4e63820 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { CoreStart } from '@kbn/core/public'; +import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; +import { IngestStreamGetResponse } from '@kbn/streams-schema'; +import { DataPublicPluginStart } from '@kbn/data-plugin/public'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; +import { ProcessorActorRef, ProcessorToParentEvent } from '../processor_state_machine'; +import { PreviewDocsFilterOption, SimulationActorRef } from '../simulation_state_machine'; + +export interface StreamEnrichmentServiceDependencies { + refreshDefinition: () => void; + streamsRepositoryClient: StreamsRepositoryClient; + core: CoreStart; + data: DataPublicPluginStart; +} + +export interface StreamEnrichmentInput { + definition: IngestStreamGetResponse; +} + +export interface StreamEnrichmentContext { + definition: IngestStreamGetResponse; + initialProcessorsRefs: ProcessorActorRef[]; + processorsRefs: ProcessorActorRef[]; + simulatorRef?: SimulationActorRef; +} + +export type StreamEnrichmentEvent = + | ProcessorToParentEvent + | { type: 'stream.received'; definition: IngestStreamGetResponse } + | { type: 'stream.reset' } + | { type: 'stream.update' } + | { type: 'simulation.viewDataPreview' } + | { type: 'simulation.viewDetectedFields' } + | { type: 'simulation.changePreviewDocsFilter'; filter: PreviewDocsFilterOption } + | { type: 'processors.add'; processor: ProcessorDefinitionWithUIAttributes } + | { type: 'processors.reorder'; processorsRefs: ProcessorActorRef[] }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/upsert_stream_actor.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/upsert_stream_actor.ts new file mode 100644 index 000000000000..057ff7a596d2 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/upsert_stream_actor.ts @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + FieldDefinition, + IngestStreamGetResponse, + isWiredStreamGetResponse, +} from '@kbn/streams-schema'; +import { ErrorActorEvent, fromPromise } from 'xstate5'; +import { errors as esErrors } from '@elastic/elasticsearch'; +import { APIReturnType } from '@kbn/streams-plugin/public/api'; +import { IToasts } from '@kbn/core/public'; +import { i18n } from '@kbn/i18n'; +import { StreamEnrichmentServiceDependencies } from './types'; +import { processorConverter } from '../../utils'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; + +export type UpsertStreamResponse = APIReturnType<'PUT /api/streams/{name}/_ingest'>; + +export interface UpsertStreamInput { + definition: IngestStreamGetResponse; + processors: ProcessorDefinitionWithUIAttributes[]; + fields?: FieldDefinition; +} + +export function createUpsertStreamActor({ + streamsRepositoryClient, +}: Pick) { + return fromPromise(({ input, signal }) => { + return streamsRepositoryClient.fetch(`PUT /api/streams/{name}/_ingest`, { + signal, + params: { + path: { + name: input.definition.stream.name, + }, + body: isWiredStreamGetResponse(input.definition) + ? { + ingest: { + ...input.definition.stream.ingest, + processing: input.processors.map(processorConverter.toAPIDefinition), + ...(input.fields && { + wired: { ...input.definition.stream.ingest.wired, fields: input.fields }, + }), + }, + } + : { + ingest: { + ...input.definition.stream.ingest, + processing: input.processors.map(processorConverter.toAPIDefinition), + }, + }, + }, + }); + }); +} + +export const createUpsertStreamSuccessNofitier = + ({ toasts }: { toasts: IToasts }) => + () => { + toasts.addSuccess( + i18n.translate('xpack.streams.streamDetailView.managementTab.enrichment.saveChangesSuccess', { + defaultMessage: "Stream's processors updated", + }) + ); + }; + +export const createUpsertStreamFailureNofitier = + ({ toasts }: { toasts: IToasts }) => + (params: { event: unknown }) => { + const event = params.event as ErrorActorEvent; + toasts.addError(new Error(event.error.body.message), { + title: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.saveChangesError', + { defaultMessage: "An issue occurred saving processors' changes." } + ), + toastMessage: event.error.body.message, + }); + }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/use_stream_enrichment.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/use_stream_enrichment.tsx new file mode 100644 index 000000000000..45452d251a5b --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/use_stream_enrichment.tsx @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import React, { useEffect, useMemo } from 'react'; +import { createActorContext, useSelector } from '@xstate5/react'; +import { createConsoleInspector } from '@kbn/xstate-utils'; +import { + streamEnrichmentMachine, + createStreamEnrichmentMachineImplementations, +} from './stream_enrichment_state_machine'; +import { StreamEnrichmentInput, StreamEnrichmentServiceDependencies } from './types'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; +import { ProcessorActorRef } from '../processor_state_machine'; +import { PreviewDocsFilterOption, SimulationActorSnapshot } from '../simulation_state_machine'; + +const consoleInspector = createConsoleInspector(); + +const StreamEnrichmentContext = createActorContext(streamEnrichmentMachine); + +export const useStreamsEnrichmentSelector = StreamEnrichmentContext.useSelector; + +export type StreamEnrichmentEvents = ReturnType; + +export const useStreamEnrichmentEvents = () => { + const service = StreamEnrichmentContext.useActorRef(); + + return useMemo( + () => ({ + addProcessor: (processor: ProcessorDefinitionWithUIAttributes) => { + service.send({ type: 'processors.add', processor }); + }, + reorderProcessors: (processorsRefs: ProcessorActorRef[]) => { + service.send({ type: 'processors.reorder', processorsRefs }); + }, + resetChanges: () => { + service.send({ type: 'stream.reset' }); + }, + saveChanges: () => { + service.send({ type: 'stream.update' }); + }, + viewSimulationPreviewData: () => { + service.send({ type: 'simulation.viewDataPreview' }); + }, + viewSimulationDetectedFields: () => { + service.send({ type: 'simulation.viewDetectedFields' }); + }, + changePreviewDocsFilter: (filter: PreviewDocsFilterOption) => { + service.send({ type: 'simulation.changePreviewDocsFilter', filter }); + }, + }), + [service] + ); +}; + +export const StreamEnrichmentContextProvider = ({ + children, + definition, + ...deps +}: React.PropsWithChildren) => { + return ( + + {children} + + ); +}; + +const ListenForDefinitionChanges = ({ + children, + definition, +}: React.PropsWithChildren) => { + const service = StreamEnrichmentContext.useActorRef(); + + useEffect(() => { + service.send({ type: 'stream.received', definition }); + }, [definition, service]); + + return children; +}; + +export const useSimulatorRef = () => { + return useStreamsEnrichmentSelector((state) => state.context.simulatorRef); +}; + +export const useSimulatorSelector = (selector: (snapshot: SimulationActorSnapshot) => T): T => { + const simulationRef = useSimulatorRef(); + + if (!simulationRef) { + throw new Error('useSimulatorSelector must be used within a StreamEnrichmentContextProvider'); + } + + return useSelector(simulationRef, selector); +}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts index 471dba5495d3..deedaa77e0e4 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts @@ -16,7 +16,6 @@ import { export type WithUIAttributes = T & { id: string; type: ProcessorTypeOf; - status: 'draft' | 'saved' | 'updated'; }; export type ProcessorDefinitionWithUIAttributes = WithUIAttributes; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/utils.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/utils.ts index e4817fb9d9f6..6ce2ce2c0a0f 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/utils.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/utils.ts @@ -48,11 +48,13 @@ const defaultProcessorFormStateByType: Record grok: defaultGrokProcessorFormState, }; -export const getDefaultFormState = ( - type: ProcessorType, +export const getDefaultFormStateByType = (type: ProcessorType) => + defaultProcessorFormStateByType[type]; + +export const getFormStateFrom = ( processor?: ProcessorDefinitionWithUIAttributes ): ProcessorFormState => { - if (!processor) return defaultProcessorFormStateByType[type]; + if (!processor) return defaultGrokProcessorFormState; if (isGrokProcessor(processor)) { const { grok } = processor; @@ -73,7 +75,7 @@ export const getDefaultFormState = ( }); } - throw new Error(`Default state not found for unsupported processor type: ${type}`); + throw new Error(`Form state for processor type "${processor.type}" is not implemented.`); }; export const convertFormStateToProcessor = (formState: ProcessorFormState): ProcessorDefinition => { @@ -124,25 +126,22 @@ export const isDissectProcessor = createProcessorGuardByType('dissect'); const createId = htmlIdGenerator(); const toUIDefinition = ( - processor: TProcessorDefinition, - uiAttributes: Partial, 'status'>> = {} + processor: TProcessorDefinition ): ProcessorDefinitionWithUIAttributes => ({ id: createId(), - status: 'saved', type: getProcessorType(processor), - ...uiAttributes, ...processor, }); const toAPIDefinition = (processor: ProcessorDefinitionWithUIAttributes): ProcessorDefinition => { - const { id, status, type, ...processorConfig } = processor; + const { id, type, ...processorConfig } = processor; return processorConfig; }; const toSimulateDefinition = ( processor: ProcessorDefinitionWithUIAttributes ): ProcessorDefinitionWithId => { - const { status, type, ...processorConfig } = processor; + const { type, ...processorConfig } = processor; return processorConfig; }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/index.tsx index 48926e726060..42336d0955cb 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/index.tsx @@ -12,11 +12,9 @@ import { ClassicStreamDetailManagement } from './classic'; export function StreamDetailManagement({ definition, refreshDefinition, - isLoadingDefinition, }: { definition?: IngestStreamGetResponse; refreshDefinition: () => void; - isLoadingDefinition: boolean; }) { if (!definition) { return null; @@ -24,11 +22,7 @@ export function StreamDetailManagement({ if (isWiredStreamGetResponse(definition)) { return ( - + ); } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/wired.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/wired.tsx index 5613a6e697d4..8e9046efa4e5 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/wired.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/wired.tsx @@ -24,11 +24,9 @@ function isValidManagementSubTab(value: string): value is ManagementSubTabs { export function WiredStreamDetailManagement({ definition, refreshDefinition, - isLoadingDefinition, }: { definition?: WiredStreamGetResponse; refreshDefinition: () => void; - isLoadingDefinition: boolean; }) { const { path: { key, subtab }, @@ -53,11 +51,7 @@ export function WiredStreamDetailManagement({ }, schemaEditor: { content: ( - + ), label: i18n.translate('xpack.streams.streamDetailView.schemaEditorTab', { defaultMessage: 'Schema editor', diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx index 50c88f4ae018..8a13a0fde8a1 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx @@ -6,13 +6,13 @@ */ import React from 'react'; import { WiredStreamGetResponse, isRootStreamDefinition } from '@kbn/streams-schema'; +import { useStreamDetail } from '../../../hooks/use_stream_detail'; import { SchemaEditor } from '../schema_editor'; import { useSchemaFields } from '../schema_editor/hooks/use_schema_fields'; interface SchemaEditorProps { definition?: WiredStreamGetResponse; refreshDefinition: () => void; - isLoadingDefinition: boolean; } export function StreamDetailSchemaEditor(props: SchemaEditorProps) { @@ -20,11 +20,9 @@ export function StreamDetailSchemaEditor(props: SchemaEditorProps) { return ; } -const Content = ({ - definition, - refreshDefinition, - isLoadingDefinition, -}: Required) => { +const Content = ({ definition, refreshDefinition }: Required) => { + const { loading } = useStreamDetail(); + const { fields, isLoadingUnmappedFields, refreshFields, unmapField, updateField } = useSchemaFields({ definition, @@ -34,7 +32,7 @@ const Content = ({ return ( { - return streamsRepositoryClient - .fetch('GET /api/streams/{name}', { - signal, - params: { - path: { - name: key, - }, - }, - }) - .then((response) => { - if (isWiredStreamGetResponse(response)) { - return { - dashboards: response.dashboards, - inherited_fields: response.inherited_fields, - elasticsearch_assets: [], - effective_lifecycle: response.effective_lifecycle, - name: key, - stream: { - ...response.stream, - }, - }; - } - - if (isUnwiredStreamGetResponse(response)) { - return { - dashboards: response.dashboards, - elasticsearch_assets: response.elasticsearch_assets, - inherited_fields: {}, - effective_lifecycle: response.effective_lifecycle, - name: key, - data_stream_exists: response.data_stream_exists, - stream: { - ...response.stream, - }, - }; - } - throw new Error('Stream detail only supports IngestStreams.'); - }); - }, - [streamsRepositoryClient, key] + return ( + + + ); +} + +export function StreamDetailViewContent({ name, tab }: { name: string; tab: string }) { + const { definition, refresh } = useStreamDetail(); const entity = { - id: key, - displayName: key, + id: name, + displayName: name, }; const tabs: EntityViewTab[] = [ { name: 'overview', - content: , + content: , label: i18n.translate('xpack.streams.streamDetailView.overviewTab', { defaultMessage: 'Overview', }), }, { name: 'dashboards', - content: , + content: , label: i18n.translate('xpack.streams.streamDetailView.dashboardsTab', { defaultMessage: 'Dashboards', }), }, { name: 'management', - content: ( - - ), + content: , label: i18n.translate('xpack.streams.streamDetailView.managementTab', { defaultMessage: 'Management', }), @@ -118,7 +66,7 @@ export function StreamDetailView() { ); diff --git a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_discard_confirm.ts b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_discard_confirm.ts index 21ddf1e5ee3a..d33148fbf6c5 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_discard_confirm.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_discard_confirm.ts @@ -9,6 +9,10 @@ import { i18n } from '@kbn/i18n'; import { OverlayModalConfirmOptions } from '@kbn/core/public'; import { useKibana } from './use_kibana'; +export interface DiscardPromptOptions extends OverlayModalConfirmOptions { + message: string; +} + const defaultMessage = i18n.translate('xpack.streams.cancelModal.message', { defaultMessage: 'Are you sure you want to discard your changes?', }); diff --git a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_stream_detail.tsx b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_stream_detail.tsx new file mode 100644 index 000000000000..ba251cacfbae --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_stream_detail.tsx @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import React from 'react'; +import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; +import { + IngestStreamGetResponse, + isWiredStreamGetResponse, + isUnwiredStreamGetResponse, +} from '@kbn/streams-schema'; +import { useStreamsAppFetch } from './use_streams_app_fetch'; + +export interface StreamDetailContextProviderProps { + name: string; + streamsRepositoryClient: StreamsRepositoryClient; +} + +export interface StreamDetailContextValue { + definition?: IngestStreamGetResponse; + loading: boolean; + refresh: () => void; +} + +const StreamDetailContext = React.createContext(undefined); + +export function StreamDetailContextProvider({ + name, + streamsRepositoryClient, + children, +}: React.PropsWithChildren) { + const { + value: definition, + loading, + refresh, + } = useStreamsAppFetch( + async ({ signal }) => { + return streamsRepositoryClient + .fetch('GET /api/streams/{name}', { + signal, + params: { + path: { + name, + }, + }, + }) + .then((response) => { + if (isWiredStreamGetResponse(response) || isUnwiredStreamGetResponse(response)) { + return response; + } + + throw new Error('Stream detail only supports IngestStreams.'); + }); + }, + [streamsRepositoryClient, name] + ); + + const context = React.useMemo( + () => ({ definition, loading, refresh }), + [definition, loading, refresh] + ); + + return {children}; +} + +export function useStreamDetail() { + const ctx = React.useContext(StreamDetailContext); + if (!ctx) { + throw new Error('useStreamDetail must be used within a StreamDetailContextProvider'); + } + return ctx; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/date_range_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/date_range_state_machine.ts new file mode 100644 index 000000000000..51bb6b5d4f5e --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/date_range_state_machine.ts @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + MachineImplementationsFrom, + assertEvent, + fromObservable, + enqueueActions, + setup, + assign, + ActorRefFrom, +} from 'xstate5'; +import type { DataPublicPluginStart, TimefilterContract } from '@kbn/data-plugin/public'; +import { getPlaceholderFor } from '@kbn/xstate-utils'; +import { DateRangeContext, DateRangeEvent, DateRangeInput } from './types'; + +export type DateRangeActorRef = ActorRefFrom; + +export const dateRangeMachine = setup({ + types: { + context: {} as DateRangeContext, + events: {} as DateRangeEvent, + input: {} as DateRangeInput, + }, + actors: { + subscribeTimeUpdates: getPlaceholderFor(createTimeUpdatesActor), + }, + actions: { + setTimeUpdates: () => { + throw new Error('Not implemented'); + }, + storeTimeUpdates: () => { + throw new Error('Not implemented'); + }, + notifyDateRangeUpdate: enqueueActions(({ enqueue, context }) => { + if (context.parentRef) { + enqueue.sendTo(context.parentRef, { type: 'dateRange.update' }); + } + }), + }, +}).createMachine({ + /** @xstate-layout N4IgpgJg5mDOIC5QQIYBcwCUUDsYGJUNs8wA6AVwAciwBtABgF1FQqB7WASzS-Z1YgAHogBMANgCsZACwBmAOySANCACeiGQEYGZBQE5R+8QA4lAX3OraJAjdwwyAJzAAzF7AAWjFkhAduXn5BEQQAWjkZMn1jBn1FFXVEOX0FMjktcRkGcVFJSysQHHYIOEF7UkEAnj4BP1CtBVUNBFE5cTITSX15cQY5SS1JXPyCoA */ + id: 'dateRange', + context: ({ input }) => ({ + parentRef: input.parentRef, + timeRange: { + from: '', + to: '', + }, + absoluteTimeRange: { + start: 0, + end: 0, + }, + }), + entry: 'storeTimeUpdates', + invoke: { + id: 'dateRangeSubscriptionActor', + src: 'subscribeTimeUpdates', + onSnapshot: { + actions: [{ type: 'storeTimeUpdates' }, { type: 'notifyDateRangeUpdate' }], + }, + }, + on: { + 'dateRange.update': { + actions: [{ type: 'setTimeUpdates' }], + }, + 'dateRange.refresh': { + actions: [{ type: 'storeTimeUpdates' }, { type: 'notifyDateRangeUpdate' }], + }, + }, +}); + +export const createDateRangeMachineImplementations = ({ + data, +}: { + data: DataPublicPluginStart; +}): MachineImplementationsFrom => ({ + actors: { + subscribeTimeUpdates: createTimeUpdatesActor({ data }), + }, + actions: { + setTimeUpdates: ({ event }: { event: DateRangeEvent }) => { + assertEvent(event, 'dateRange.update'); + data.query.timefilter.timefilter.setTime(event.range); + }, + storeTimeUpdates: assign(() => getTimeContextFromService(data.query.timefilter.timefilter)), + }, +}); + +function createTimeUpdatesActor({ data }: { data: DataPublicPluginStart }) { + return fromObservable(() => data.query.timefilter.timefilter.getTimeUpdate$()); +} + +function getTimeContextFromService(timefilter: TimefilterContract) { + return { + timeRange: timefilter.getTime(), + absoluteTimeRange: { + start: new Date(timefilter.getAbsoluteTime().from).getTime(), + end: new Date(timefilter.getAbsoluteTime().to).getTime(), + }, + }; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/index.ts b/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/index.ts new file mode 100644 index 000000000000..b4fd3c09b9c3 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './date_range_state_machine'; +export * from './types'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/types.ts new file mode 100644 index 000000000000..3e7d2e07ea94 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/types.ts @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { TimeRange } from '@kbn/es-query'; +import { ActorRef, Snapshot } from 'xstate5'; + +export interface DateRangeToParentEvent { + type: 'dateRange.update'; +} + +export type DateRangeParentActor = ActorRef, DateRangeToParentEvent>; + +export interface DateRangeContext { + parentRef?: DateRangeParentActor; + timeRange: TimeRange; + absoluteTimeRange: { + start?: number; + end?: number; + }; +} + +export interface DateRangeInput { + parentRef?: DateRangeParentActor; +} + +export type DateRangeEvent = + | { type: 'dateRange.refresh' } + | { type: 'dateRange.update'; range: TimeRange }; diff --git a/x-pack/platform/plugins/shared/streams_app/tsconfig.json b/x-pack/platform/plugins/shared/streams_app/tsconfig.json index f0c287d627d5..26f45235bf1b 100644 --- a/x-pack/platform/plugins/shared/streams_app/tsconfig.json +++ b/x-pack/platform/plugins/shared/streams_app/tsconfig.json @@ -58,5 +58,6 @@ "@kbn/traced-es-client", "@kbn/licensing-plugin", "@kbn/datemath", + "@kbn/xstate-utils", ] }