From d0c62a20e992675fbb45612b99f43f6c971a0e1a Mon Sep 17 00:00:00 2001 From: Marco Antonio Ghiani Date: Fri, 7 Mar 2025 12:34:30 +0100 Subject: [PATCH] =?UTF-8?q?[Streams=20=F0=9F=8C=8A]=20Enrichment=20state?= =?UTF-8?q?=20management=20improvements=20(#211686)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 📓 Summary Closes https://github.com/elastic/streams-program/issues/102 Closes https://github.com/elastic/streams-program/issues/159 This re-work of the enrichment state management introduces XState as state library to prepare scaling the enrichment part for more processors and improve performance reducing unnecessary side effects. ## 🤓 Reviewers note **There is a lot to digest on this PR, I'm open to any suggestion and I left some notes around to guide the review. This is also far from perfect as there is margin for other minor DX improvements for consuming the state machines, but it will all come in follow-up work after we resolve prioritized work such as integrating the Schema Editor.** Most of the changes on this PR are about the state management for the stream enrichment, but it touches also some other areas to integrate the event-based flow. ### Stream enrichment machine This machine handles the complexity around updating/promoting/deleting processors, and the available simulation states. It's a root level machine that spawns and manages its children machine, one for the **simulation** behaviour and one for each **processor** instantiated. Screenshot 2025-02-27 at 17 10 03 ### Simulation machine This machine handle the flow around sampling -> simulating, handling debouncing and determining once a simulation can run or should refresh. It also spawn a child date range machine to react to the observable time changes and reloads. It also derives all the required table configurations (columns, filters, documents) centralizing the parsing and reducing the cases for re-computing, since we don't rely anymore on the previous live processors copy. Screenshot 2025-02-27 at 17 33 40 ### Processor machine A processor can be in different states depending on the changes, not this tracks each of them independently and send events to the parent machine to react accordingly. It provide a boost in performance compared to the previous approach, as we don't have to rerender the whole page tree since the changes are encapsulated in the machine state. Screenshot 2025-03-04 at 11 34 01 --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> --- .../src/get_placeholder_for.ts | 15 + .../shared/kbn-xstate-utils/src/index.ts | 1 + .../categorize_logs_service.ts | 2 +- .../logs-overview/src/utils/xstate5_utils.ts | 13 - .../public/components/asset_image/index.tsx | 11 +- .../management_bottom_bar/index.tsx | 39 +- .../hooks/use_definition.ts | 221 ----------- .../hooks/use_processing_simulator.ts | 309 --------------- .../stream_detail_enrichment/page_content.tsx | 362 +++++++----------- .../processor_outcome_preview.tsx | 176 ++++----- .../processors/grok/grok_ai_suggestions.tsx | 27 +- .../processors/index.tsx | 255 ++++++------ .../processors/processor_metrics.tsx | 6 +- .../processors/processor_type_selector.tsx | 4 +- .../processors_list.tsx | 5 +- .../simulation_playground.tsx | 99 ++--- .../simulator_context.tsx | 46 --- .../processor_state_machine/index.ts | 9 + .../processor_state_machine.ts | 138 +++++++ .../processor_state_machine/types.ts | 43 +++ .../simulation_state_machine/index.ts | 11 + .../preview_docs_filter.ts | 34 ++ .../samples_fetcher_actor.ts | 55 +++ .../simulation_runner_actor.ts | 51 +++ .../simulation_state_machine.ts | 280 ++++++++++++++ .../simulation_state_machine/types.ts | 53 +++ .../simulation_state_machine/utils.ts | 69 ++++ .../stream_enrichment_state_machine/index.ts | 9 + .../stream_enrichment_state_machine.ts | 354 +++++++++++++++++ .../stream_enrichment_state_machine/types.ts | 43 +++ .../upsert_stream_actor.ts | 82 ++++ .../use_stream_enrichment.tsx | 105 +++++ .../stream_detail_enrichment/types.ts | 1 - .../stream_detail_enrichment/utils.ts | 19 +- .../stream_detail_management/index.tsx | 8 +- .../stream_detail_management/wired.tsx | 8 +- .../stream_detail_schema_editor/index.tsx | 12 +- .../components/stream_detail_view/index.tsx | 88 +---- .../public/hooks/use_discard_confirm.ts | 4 + .../public/hooks/use_stream_detail.tsx | 75 ++++ .../date_range_state_machine.ts | 106 +++++ .../date_range_state_machine/index.ts | 9 + .../date_range_state_machine/types.ts | 32 ++ .../plugins/shared/streams_app/tsconfig.json | 1 + 44 files changed, 2046 insertions(+), 1244 deletions(-) create mode 100644 src/platform/packages/shared/kbn-xstate-utils/src/get_placeholder_for.ts delete mode 100644 x-pack/platform/packages/shared/logs-overview/src/utils/xstate5_utils.ts delete mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_definition.ts delete mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_processing_simulator.ts delete mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulator_context.tsx create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/index.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/processor_state_machine.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/types.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/index.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/preview_docs_filter.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/samples_fetcher_actor.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_runner_actor.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/types.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/index.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/upsert_stream_actor.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/use_stream_enrichment.tsx create mode 100644 x-pack/platform/plugins/shared/streams_app/public/hooks/use_stream_detail.tsx create mode 100644 x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/date_range_state_machine.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/index.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/types.ts diff --git a/src/platform/packages/shared/kbn-xstate-utils/src/get_placeholder_for.ts b/src/platform/packages/shared/kbn-xstate-utils/src/get_placeholder_for.ts new file mode 100644 index 000000000000..bcbcc765fbbf --- /dev/null +++ b/src/platform/packages/shared/kbn-xstate-utils/src/get_placeholder_for.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +export const getPlaceholderFor = any>( + _implementationFactory: ImplementationFactory +): ReturnType => + (() => { + throw new Error('Not implemented'); + }) as ReturnType; diff --git a/src/platform/packages/shared/kbn-xstate-utils/src/index.ts b/src/platform/packages/shared/kbn-xstate-utils/src/index.ts index 3edf83e8a32c..2be37e739949 100644 --- a/src/platform/packages/shared/kbn-xstate-utils/src/index.ts +++ b/src/platform/packages/shared/kbn-xstate-utils/src/index.ts @@ -9,6 +9,7 @@ export * from './actions'; export * from './dev_tools'; +export * from './get_placeholder_for'; export * from './console_inspector'; export * from './notification_channel'; export * from './types'; diff --git a/x-pack/platform/packages/shared/logs-overview/src/services/categorize_logs_service/categorize_logs_service.ts b/x-pack/platform/packages/shared/logs-overview/src/services/categorize_logs_service/categorize_logs_service.ts index deeb758d2d73..c14fcc81e94b 100644 --- a/x-pack/platform/packages/shared/logs-overview/src/services/categorize_logs_service/categorize_logs_service.ts +++ b/x-pack/platform/packages/shared/logs-overview/src/services/categorize_logs_service/categorize_logs_service.ts @@ -6,8 +6,8 @@ */ import { MachineImplementationsFrom, assign, setup } from 'xstate5'; +import { getPlaceholderFor } from '@kbn/xstate-utils'; import { LogCategory } from '../../types'; -import { getPlaceholderFor } from '../../utils/xstate5_utils'; import { categorizeDocuments } from './categorize_documents'; import { countDocuments } from './count_documents'; import { CategorizeLogsServiceDependencies, LogCategorizationParams } from './types'; diff --git a/x-pack/platform/packages/shared/logs-overview/src/utils/xstate5_utils.ts b/x-pack/platform/packages/shared/logs-overview/src/utils/xstate5_utils.ts deleted file mode 100644 index 3df0bf4ea398..000000000000 --- a/x-pack/platform/packages/shared/logs-overview/src/utils/xstate5_utils.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export const getPlaceholderFor = any>( - implementationFactory: ImplementationFactory -): ReturnType => - (() => { - throw new Error('Not implemented'); - }) as ReturnType; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/asset_image/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/asset_image/index.tsx index 4a77974bdca4..93887b1be3ee 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/asset_image/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/asset_image/index.tsx @@ -38,9 +38,18 @@ export function AssetImage({ type = 'welcome', ...props }: AssetImageProps) { const [imageSrc, setImageSrc] = useState(); useEffect(() => { + let isMounted = true; const dynamicImageImport = colorMode === 'LIGHT' ? light() : dark(); - dynamicImageImport.then((module) => setImageSrc(module.default)); + dynamicImageImport.then((module) => { + if (isMounted) { + setImageSrc(module.default); + } + }); + + return () => { + isMounted = false; + }; }, [colorMode, dark, light]); return imageSrc ? : null; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/management_bottom_bar/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/management_bottom_bar/index.tsx index 70671b09d4c9..b484c4f720f1 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/management_bottom_bar/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/management_bottom_bar/index.tsx @@ -6,13 +6,12 @@ */ import React from 'react'; -import { EuiButton, EuiButtonEmpty, EuiFlexGroup, EuiToolTip, EuiToolTipProps } from '@elastic/eui'; +import { EuiButton, EuiButtonEmpty, EuiFlexGroup } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; import { useDiscardConfirm } from '../../../hooks/use_discard_confirm'; interface ManagementBottomBarProps { confirmButtonText?: string; - confirmTooltip?: Partial; disabled?: boolean; isLoading?: boolean; onCancel: () => void; @@ -21,7 +20,6 @@ interface ManagementBottomBarProps { export function ManagementBottomBar({ confirmButtonText = defaultConfirmButtonText, - confirmTooltip, disabled = false, isLoading = false, onCancel, @@ -34,31 +32,11 @@ export function ManagementBottomBar({ cancelButtonText: keepEditingLabel, }); - const confirmButtonContent = ( - - {confirmButtonText} - - ); - - const confirmButton = confirmTooltip ? ( - {confirmButtonContent} - ) : ( - confirmButtonContent - ); - return ( - {confirmButton} + + {confirmButtonText} + ); } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_definition.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_definition.ts deleted file mode 100644 index caab0adf002e..000000000000 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_definition.ts +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { useState, useMemo, useEffect, useRef, useCallback } from 'react'; -import { i18n } from '@kbn/i18n'; -import { useAbortController, useBoolean } from '@kbn/react-hooks'; -import { - IngestStreamGetResponse, - isWiredStreamGetResponse, - FieldDefinition, - WiredStreamGetResponse, - IngestUpsertRequest, - ProcessorDefinition, - getProcessorType, -} from '@kbn/streams-schema'; -import { DetectedField, ProcessorDefinitionWithUIAttributes } from '../types'; -import { useKibana } from '../../../../hooks/use_kibana'; -import { processorConverter } from '../utils'; - -export interface UseDefinitionReturn { - processors: ProcessorDefinitionWithUIAttributes[]; - hasChanges: boolean; - isSavingChanges: boolean; - addProcessor: (newProcessor: ProcessorDefinition, newFields?: DetectedField[]) => void; - updateProcessor: ( - id: string, - processor: ProcessorDefinition, - status?: ProcessorDefinitionWithUIAttributes['status'] - ) => void; - deleteProcessor: (id: string) => void; - reorderProcessors: (processors: ProcessorDefinitionWithUIAttributes[]) => void; - saveChanges: () => Promise; - setProcessors: (processors: ProcessorDefinitionWithUIAttributes[]) => void; - resetChanges: () => void; -} - -export const useDefinition = ( - definition: IngestStreamGetResponse, - refreshDefinition: () => void -): UseDefinitionReturn => { - const { core, dependencies } = useKibana(); - - const { toasts } = core.notifications; - const { processing: existingProcessorDefinitions } = definition.stream.ingest; - const { streamsRepositoryClient } = dependencies.start.streams; - - const abortController = useAbortController(); - const [isSavingChanges, { on: startsSaving, off: endsSaving }] = useBoolean(); - - const [processors, setProcessors] = useState(() => - createProcessorsList(existingProcessorDefinitions) - ); - - const initialProcessors = useRef(processors); - - const [fields, setFields] = useState(() => - isWiredStreamGetResponse(definition) ? definition.stream.ingest.wired.fields : {} - ); - - const nextProcessorDefinitions = useMemo( - () => processors.map(processorConverter.toAPIDefinition), - [processors] - ); - - useEffect(() => { - // Reset processors when definition refreshes - const resetProcessors = createProcessorsList(definition.stream.ingest.processing); - setProcessors(resetProcessors); - initialProcessors.current = resetProcessors; - }, [definition]); - - const hasChanges = useMemo( - () => - processors.length !== initialProcessors.current.length || // Processor count changed, a processor might be deleted - processors.some((proc) => proc.status === 'draft' || proc.status === 'updated') || // New or updated processors - hasOrderChanged(processors, initialProcessors.current), // Processor order changed - [processors] - ); - - const addProcessor = useCallback( - (newProcessor: ProcessorDefinition, newFields?: DetectedField[]) => { - setProcessors((prevProcs) => - prevProcs.concat(processorConverter.toUIDefinition(newProcessor, { status: 'draft' })) - ); - - if (isWiredStreamGetResponse(definition) && newFields) { - setFields((currentFields) => mergeFields(definition, currentFields, newFields)); - } - }, - [definition] - ); - - const updateProcessor = useCallback( - ( - id: string, - processorUpdate: ProcessorDefinition, - status: ProcessorDefinitionWithUIAttributes['status'] = 'updated' - ) => { - setProcessors((prevProcs) => - prevProcs.map((proc) => - proc.id === id - ? { - ...processorUpdate, - id, - type: getProcessorType(processorUpdate), - status, - } - : proc - ) - ); - }, - [] - ); - - const reorderProcessors = setProcessors; - - const deleteProcessor = useCallback((id: string) => { - setProcessors((prevProcs) => prevProcs.filter((proc) => proc.id !== id)); - }, []); - - const resetChanges = () => { - const resetProcessors = createProcessorsList(existingProcessorDefinitions); - setProcessors(resetProcessors); - initialProcessors.current = resetProcessors; - setFields(isWiredStreamGetResponse(definition) ? definition.stream.ingest.wired.fields : {}); - }; - - const saveChanges = async () => { - startsSaving(); - try { - await streamsRepositoryClient.fetch(`PUT /api/streams/{name}/_ingest`, { - signal: abortController.signal, - params: { - path: { - name: definition.stream.name, - }, - body: { - ingest: { - ...definition.stream.ingest, - processing: nextProcessorDefinitions, - ...(isWiredStreamGetResponse(definition) && { - wired: { ...definition.stream.ingest.wired, fields }, - }), - }, - } as IngestUpsertRequest, - }, - }); - - toasts.addSuccess( - i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.saveChangesSuccess', - { defaultMessage: "Stream's processors updated" } - ) - ); - - refreshDefinition(); - } catch (error) { - toasts.addError(new Error(error.body.message), { - title: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.saveChangesError', - { defaultMessage: "An issue occurred saving processors' changes." } - ), - toastMessage: error.body.message, - }); - } finally { - endsSaving(); - } - }; - - return { - // Values - processors, - // Actions - addProcessor, - updateProcessor, - deleteProcessor, - reorderProcessors, - resetChanges, - saveChanges, - setProcessors, - // Flags - hasChanges, - isSavingChanges, - }; -}; - -const createProcessorsList = (processors: ProcessorDefinition[]) => { - return processors.map((processor) => processorConverter.toUIDefinition(processor)); -}; - -const hasOrderChanged = ( - processors: ProcessorDefinitionWithUIAttributes[], - initialProcessors: ProcessorDefinitionWithUIAttributes[] -) => { - return processors.some((processor, index) => processor.id !== initialProcessors[index].id); -}; - -const mergeFields = ( - definition: WiredStreamGetResponse, - currentFields: FieldDefinition, - newFields: DetectedField[] -) => { - return { - ...definition.stream.ingest.wired.fields, - ...newFields.reduce((acc, field) => { - // Add only new fields and ignore unmapped ones - if ( - !(field.name in currentFields) && - !(field.name in definition.inherited_fields) && - field.type !== undefined - ) { - acc[field.name] = { type: field.type }; - } - return acc; - }, {} as FieldDefinition), - }; -}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_processing_simulator.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_processing_simulator.ts deleted file mode 100644 index 0f219f92e858..000000000000 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_processing_simulator.ts +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { useEffect, useMemo, useRef, useState } from 'react'; -import { debounce, isEmpty, isEqual, uniq, uniqBy } from 'lodash'; -import { - IngestStreamGetResponse, - getProcessorConfig, - UnaryOperator, - Condition, - processorDefinitionSchema, - isSchema, - FlattenRecord, -} from '@kbn/streams-schema'; -import { IHttpFetchError, ResponseErrorBody } from '@kbn/core/public'; -import { APIReturnType } from '@kbn/streams-plugin/public/api'; -import { i18n } from '@kbn/i18n'; -import { flattenObjectNestedLast } from '@kbn/object-utils'; -import { useStreamsAppFetch } from '../../../../hooks/use_streams_app_fetch'; -import { useKibana } from '../../../../hooks/use_kibana'; -import { DetectedField, ProcessorDefinitionWithUIAttributes } from '../types'; -import { processorConverter } from '../utils'; - -export type Simulation = APIReturnType<'POST /api/streams/{name}/processing/_simulate'>; -export type ProcessorMetrics = - Simulation['processors_metrics'][keyof Simulation['processors_metrics']]; - -export interface TableColumn { - name: string; - origin: 'processor' | 'detected'; -} - -export const docsFilterOptions = { - outcome_filter_all: { - id: 'outcome_filter_all', - label: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.all', - { defaultMessage: 'All samples' } - ), - }, - outcome_filter_matched: { - id: 'outcome_filter_matched', - label: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.matched', - { defaultMessage: 'Matched' } - ), - }, - outcome_filter_unmatched: { - id: 'outcome_filter_unmatched', - label: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.unmatched', - { defaultMessage: 'Unmatched' } - ), - }, -} as const; - -export type DocsFilterOption = keyof typeof docsFilterOptions; - -export interface UseProcessingSimulatorProps { - definition: IngestStreamGetResponse; - processors: ProcessorDefinitionWithUIAttributes[]; -} - -export interface UseProcessingSimulatorReturn { - hasLiveChanges: boolean; - error?: IHttpFetchError; - isLoading: boolean; - samples: FlattenRecord[]; - filteredSamples: FlattenRecord[]; - simulation?: Simulation | null; - tableColumns: TableColumn[]; - refreshSamples: () => void; - watchProcessor: ( - processor: ProcessorDefinitionWithUIAttributes | { id: string; deleteIfExists: true } - ) => void; - refreshSimulation: () => void; - selectedDocsFilter: DocsFilterOption; - setSelectedDocsFilter: (filter: DocsFilterOption) => void; -} - -export const useProcessingSimulator = ({ - definition, - processors, -}: UseProcessingSimulatorProps): UseProcessingSimulatorReturn => { - const { dependencies } = useKibana(); - const { - data, - streams: { streamsRepositoryClient }, - } = dependencies.start; - - const { - absoluteTimeRange: { start, end }, - } = data.query.timefilter.timefilter.useTimefilter(); - - const draftProcessors = useMemo( - () => processors.filter((processor) => processor.status === 'draft'), - [processors] - ); - - const [liveDraftProcessors, setLiveDraftProcessors] = useState(draftProcessors); - - useEffect(() => { - setLiveDraftProcessors((prevLiveProcessors) => { - const inProgressDraft = prevLiveProcessors.find((proc) => proc.id === 'draft'); - return inProgressDraft ? [...draftProcessors, inProgressDraft] : draftProcessors; - }); - }, [draftProcessors]); - - const watchProcessor = useMemo( - () => - debounce( - (processor: ProcessorDefinitionWithUIAttributes | { id: string; deleteIfExists: true }) => { - if ('deleteIfExists' in processor) { - return setLiveDraftProcessors((prevLiveDraftProcessors) => - prevLiveDraftProcessors.filter((proc) => proc.id !== processor.id) - ); - } - - if (processor.status === 'draft') { - setLiveDraftProcessors((prevLiveDraftProcessors) => { - const newLiveDraftProcessors = prevLiveDraftProcessors.slice(); - - const existingIndex = prevLiveDraftProcessors.findIndex( - (proc) => proc.id === processor.id - ); - - if (existingIndex !== -1) { - newLiveDraftProcessors[existingIndex] = processor; - } else { - newLiveDraftProcessors.push(processor); - } - - return newLiveDraftProcessors; - }); - } - }, - 800 - ), - [] - ); - - const memoizedSamplingCondition = useRef(); - - const samplingCondition = useMemo(() => { - const newSamplingCondition = composeSamplingCondition(liveDraftProcessors); - if (isEqual(newSamplingCondition, memoizedSamplingCondition.current)) { - return memoizedSamplingCondition.current; - } - memoizedSamplingCondition.current = newSamplingCondition; - return newSamplingCondition; - }, [liveDraftProcessors]); - - const { - loading: isLoadingSamples, - value: sampleDocs, - refresh: refreshSamples, - } = useStreamsAppFetch( - async ({ signal }) => { - if (!definition) { - return []; - } - - const samplesBody = await streamsRepositoryClient.fetch('POST /api/streams/{name}/_sample', { - signal, - params: { - path: { name: definition.stream.name }, - body: { - if: samplingCondition, - start: start?.valueOf(), - end: end?.valueOf(), - size: 100, - }, - }, - }); - - return samplesBody.documents.map((doc) => flattenObjectNestedLast(doc)) as FlattenRecord[]; - }, - [definition, streamsRepositoryClient, start, end, samplingCondition], - { disableToastOnError: true } - ); - - const { - loading: isLoadingSimulation, - value: simulation, - error: simulationError, - refresh: refreshSimulation, - } = useStreamsAppFetch( - ({ signal }): Promise => { - if (!definition || isEmpty(sampleDocs) || isEmpty(liveDraftProcessors)) { - // This is a hack to avoid losing the previous value of the simulation once the conditions are not met. The state management refactor will fix this. - return Promise.resolve(simulation!); - } - - const processing = liveDraftProcessors.map(processorConverter.toAPIDefinition); - - const hasValidProcessors = processing.every((processor) => - isSchema(processorDefinitionSchema, processor) - ); - - // Each processor should meet the minimum schema requirements to run the simulation - if (!hasValidProcessors) { - // This is a hack to avoid losing the previous value of the simulation once the conditions are not met. The state management refactor will fix this. - return Promise.resolve(simulation!); - } - - return streamsRepositoryClient.fetch('POST /api/streams/{name}/processing/_simulate', { - signal, - params: { - path: { name: definition.stream.name }, - body: { - documents: sampleDocs, - processing: liveDraftProcessors.map(processorConverter.toSimulateDefinition), - }, - }, - }); - }, - [definition, sampleDocs, liveDraftProcessors, streamsRepositoryClient], - { disableToastOnError: true } - ); - - const tableColumns = useMemo(() => { - // If there is an error, we only want the source fields - const detectedFields = simulationError ? [] : simulation?.detected_fields ?? []; - - return getTableColumns(liveDraftProcessors, detectedFields); - }, [liveDraftProcessors, simulation, simulationError]); - - const hasLiveChanges = !isEmpty(liveDraftProcessors); - - const [selectedDocsFilter, setSelectedDocsFilter] = - useState('outcome_filter_all'); - - const filteredSamples = useMemo(() => { - if (!simulation?.documents) { - return sampleDocs?.map((doc) => flattenObjectNestedLast(doc)) as FlattenRecord[]; - } - - const filterDocuments = (filter: DocsFilterOption) => { - switch (filter) { - case 'outcome_filter_matched': - return simulation.documents.filter((doc) => doc.status === 'parsed'); - case 'outcome_filter_unmatched': - return simulation.documents.filter((doc) => doc.status !== 'parsed'); - case 'outcome_filter_all': - default: - return simulation.documents; - } - }; - - return filterDocuments(selectedDocsFilter).map((doc) => doc.value); - }, [sampleDocs, simulation?.documents, selectedDocsFilter]); - - return { - hasLiveChanges, - isLoading: isLoadingSamples || isLoadingSimulation, - error: simulationError as IHttpFetchError | undefined, - refreshSamples, - simulation, - samples: sampleDocs ?? [], - filteredSamples: filteredSamples ?? [], - tableColumns, - watchProcessor, - refreshSimulation, - selectedDocsFilter, - setSelectedDocsFilter, - }; -}; - -const composeSamplingCondition = ( - processors: ProcessorDefinitionWithUIAttributes[] -): Condition | undefined => { - if (isEmpty(processors)) { - return undefined; - } - - const uniqueFields = uniq(getSourceFields(processors)); - - const conditions = uniqueFields.map((field) => ({ - field, - operator: 'exists' as UnaryOperator, - })); - - return { or: conditions }; -}; - -const getSourceFields = (processors: ProcessorDefinitionWithUIAttributes[]): string[] => { - return processors.map((processor) => getProcessorConfig(processor).field); -}; - -const getTableColumns = ( - processors: ProcessorDefinitionWithUIAttributes[], - fields: DetectedField[] -) => { - const uniqueProcessorsFields = getSourceFields(processors).map((name) => ({ - name, - origin: 'processor', - })); - - const uniqueDetectedFields = fields.map((field) => ({ - name: field.name, - origin: 'detected', - })); - - return uniqBy([...uniqueProcessorsFields, ...uniqueDetectedFields], 'name') as TableColumn[]; -}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/page_content.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/page_content.tsx index 3f8d67440945..2eaaa1ca604c 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/page_content.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/page_content.tsx @@ -18,23 +18,22 @@ import { useEuiTheme, } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; -import { IngestStreamGetResponse, isRootStreamDefinition } from '@kbn/streams-schema'; +import { IngestStreamGetResponse } from '@kbn/streams-schema'; import { useUnsavedChangesPrompt } from '@kbn/unsaved-changes-prompt'; import { css } from '@emotion/react'; import { isEmpty } from 'lodash'; -import { UseDefinitionReturn, useDefinition } from './hooks/use_definition'; import { useKibana } from '../../../hooks/use_kibana'; -import { RootStreamEmptyPrompt } from './root_stream_empty_prompt'; import { DraggableProcessorListItem } from './processors_list'; import { SortableList } from './sortable_list'; import { ManagementBottomBar } from '../management_bottom_bar'; import { AddProcessorPanel } from './processors'; import { SimulationPlayground } from './simulation_playground'; import { - UseProcessingSimulatorReturn, - useProcessingSimulator, -} from './hooks/use_processing_simulator'; -import { SimulatorContextProvider } from './simulator_context'; + StreamEnrichmentContextProvider, + useSimulatorSelector, + useStreamEnrichmentEvents, + useStreamsEnrichmentSelector, +} from './state_management/stream_enrichment_state_machine'; const MemoSimulationPlayground = React.memo(SimulationPlayground); @@ -43,238 +42,171 @@ interface StreamDetailEnrichmentContentProps { refreshDefinition: () => void; } -export function StreamDetailEnrichmentContent({ - definition, - refreshDefinition, -}: StreamDetailEnrichmentContentProps) { +export function StreamDetailEnrichmentContent(props: StreamDetailEnrichmentContentProps) { + const { core, dependencies } = useKibana(); + const { + data, + streams: { streamsRepositoryClient }, + } = dependencies.start; + + return ( + + + + ); +} + +export function StreamDetailEnrichmentContentImpl() { const { appParams, core } = useKibana(); - const { - processors, - addProcessor, - updateProcessor, - deleteProcessor, - resetChanges, - saveChanges, - reorderProcessors, - hasChanges, - isSavingChanges, - } = useDefinition(definition, refreshDefinition); + const { resetChanges, saveChanges } = useStreamEnrichmentEvents(); - const processingSimulator = useProcessingSimulator({ definition, processors }); - - const { - hasLiveChanges, - isLoading, - refreshSamples, - filteredSamples, - simulation, - tableColumns, - watchProcessor, - selectedDocsFilter, - setSelectedDocsFilter, - } = processingSimulator; + const hasChanges = useStreamsEnrichmentSelector((state) => state.can({ type: 'stream.update' })); + const isSavingChanges = useStreamsEnrichmentSelector((state) => + state.matches({ ready: { stream: 'updating' } }) + ); useUnsavedChangesPrompt({ - hasUnsavedChanges: hasChanges || hasLiveChanges, + hasUnsavedChanges: hasChanges, history: appParams.history, http: core.http, navigateToUrl: core.application.navigateToUrl, openConfirm: core.overlays.openConfirm, }); - if (isRootStreamDefinition(definition.stream)) { - return ; - } - - const isNonAdditiveSimulation = simulation && simulation.is_non_additive_simulation; - const isSubmitDisabled = Boolean(!hasChanges || isNonAdditiveSimulation); - - const confirmTooltip = isNonAdditiveSimulation - ? { - title: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.nonAdditiveProcessorsTooltip.title', - { defaultMessage: 'Non additive simulation detected' } - ), - content: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.nonAdditiveProcessorsTooltip.content', - { - defaultMessage: - 'We currently prevent adding processors that change/remove existing data. Please update your processor configurations to continue.', - } - ), - } - : undefined; - return ( - - - - - - - - + + + + + + ); } -interface ProcessorsEditorProps { - definition: IngestStreamGetResponse; - processors: UseDefinitionReturn['processors']; - onAddProcessor: UseDefinitionReturn['addProcessor']; - onDeleteProcessor: UseDefinitionReturn['deleteProcessor']; - onReorderProcessor: UseDefinitionReturn['reorderProcessors']; - onUpdateProcessor: UseDefinitionReturn['updateProcessor']; - onWatchProcessor: UseProcessingSimulatorReturn['watchProcessor']; - simulation: UseProcessingSimulatorReturn['simulation']; -} +const ProcessorsEditor = React.memo(() => { + const { euiTheme } = useEuiTheme(); -const ProcessorsEditor = React.memo( - ({ - definition, - processors, - onAddProcessor, - onDeleteProcessor, - onReorderProcessor, - onUpdateProcessor, - onWatchProcessor, - simulation, - }: ProcessorsEditorProps) => { - const { euiTheme } = useEuiTheme(); + const { reorderProcessors } = useStreamEnrichmentEvents(); - const handlerItemDrag: DragDropContextProps['onDragEnd'] = ({ source, destination }) => { - if (source && destination) { - const items = euiDragDropReorder(processors, source.index, destination.index); - onReorderProcessor(items); - } - }; + const processorsRefs = useStreamsEnrichmentSelector((state) => + state.context.processorsRefs.filter((processorRef) => + processorRef.getSnapshot().matches('configured') + ) + ); - const hasProcessors = !isEmpty(processors); + const simulationSnapshot = useSimulatorSelector((s) => s); - return ( - <> - - -

- {i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.headingTitle', - { - defaultMessage: 'Processors for field extraction', - } - )} -

-
- + const handlerItemDrag: DragDropContextProps['onDragEnd'] = ({ source, destination }) => { + if (source && destination) { + const items = euiDragDropReorder(processorsRefs, source.index, destination.index); + reorderProcessors(items); + } + }; + + const hasProcessors = !isEmpty(processorsRefs); + + return ( + <> + + +

{i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.headingSubtitle', + 'xpack.streams.streamDetailView.managementTab.enrichment.headingTitle', { - defaultMessage: - 'Drag and drop existing processors to update their execution order.', + defaultMessage: 'Processors for field extraction', } )} - - - - {hasProcessors && ( - - {processors.map((processor, idx) => ( - - ))} - +

+
+ + {i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.headingSubtitle', + { + defaultMessage: 'Drag and drop existing processors to update their execution order.', + } )} - -
- - ); - } -); +
+
+ + {hasProcessors && ( + + {processorsRefs.map((processorRef, idx) => ( + + ))} + + )} + + + + ); +}); const verticalFlexCss = css` display: flex; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx index f3c7b22c8ef3..d21752045221 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx @@ -16,121 +16,74 @@ import { EuiProgress, } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; -import { TimeRange } from '@kbn/es-query'; -import { isEmpty } from 'lodash'; -import { SampleDocument } from '@kbn/streams-schema'; -import { useKibana } from '../../../hooks/use_kibana'; +import { useSelector } from '@xstate5/react'; +import { isEmpty, isEqual } from 'lodash'; import { StreamsAppSearchBar, StreamsAppSearchBarProps } from '../../streams_app_search_bar'; import { PreviewTable } from '../preview_table'; -import { - DocsFilterOption, - TableColumn, - UseProcessingSimulatorReturn, - docsFilterOptions, -} from './hooks/use_processing_simulator'; import { AssetImage } from '../../asset_image'; +import { + useSimulatorSelector, + useStreamEnrichmentEvents, +} from './state_management/stream_enrichment_state_machine'; +import { + PreviewDocsFilterOption, + getTableColumns, + previewDocsFilterOptions, +} from './state_management/simulation_state_machine'; -interface ProcessorOutcomePreviewProps { - columns: TableColumn[]; - isLoading: UseProcessingSimulatorReturn['isLoading']; - simulation: UseProcessingSimulatorReturn['simulation']; - filteredSamples: UseProcessingSimulatorReturn['samples']; - onRefreshSamples: UseProcessingSimulatorReturn['refreshSamples']; - selectedDocsFilter: UseProcessingSimulatorReturn['selectedDocsFilter']; - setSelectedDocsFilter: UseProcessingSimulatorReturn['setSelectedDocsFilter']; -} - -export const ProcessorOutcomePreview = ({ - columns, - isLoading, - simulation, - filteredSamples, - onRefreshSamples, - selectedDocsFilter, - setSelectedDocsFilter, -}: ProcessorOutcomePreviewProps) => { - const { dependencies } = useKibana(); - const { data } = dependencies.start; - - const { timeRange, setTimeRange } = data.query.timefilter.timefilter.useTimefilter(); - - const tableColumns = useMemo(() => { - switch (selectedDocsFilter) { - case 'outcome_filter_unmatched': - return columns - .filter((column) => column.origin === 'processor') - .map((column) => column.name); - case 'outcome_filter_matched': - case 'outcome_filter_all': - default: - return columns.map((column) => column.name); - } - }, [columns, selectedDocsFilter]); - - const simulationFailureRate = simulation - ? simulation?.failure_rate + simulation?.skipped_rate - : undefined; - const simulationSuccessRate = simulation?.success_rate; +export const ProcessorOutcomePreview = () => { + const isLoading = useSimulatorSelector( + (state) => + state.matches('debouncingChanges') || + state.matches('loadingSamples') || + state.matches('runningSimulation') + ); return ( <> - + - + {isLoading && } ); }; -interface OutcomeControlsProps { - docsFilter: DocsFilterOption; - timeRange: TimeRange; - onDocsFilterChange: (filter: DocsFilterOption) => void; - onTimeRangeChange: (timeRange: TimeRange) => void; - onTimeRangeRefresh: () => void; - simulationFailureRate?: number; - simulationSuccessRate?: number; -} +const OutcomeControls = () => { + const { changePreviewDocsFilter } = useStreamEnrichmentEvents(); + + const previewDocsFilter = useSimulatorSelector((state) => state.context.previewDocsFilter); + const simulationFailureRate = useSimulatorSelector((state) => + state.context.simulation + ? state.context.simulation.failure_rate + state.context.simulation.skipped_rate + : undefined + ); + const simulationSuccessRate = useSimulatorSelector( + (state) => state.context.simulation?.success_rate + ); + + const dateRangeRef = useSimulatorSelector((state) => state.context.dateRangeRef); + const timeRange = useSelector(dateRangeRef, (state) => state.context.timeRange); + const handleRefresh = () => dateRangeRef.send({ type: 'dateRange.refresh' }); -const OutcomeControls = ({ - docsFilter, - timeRange, - onDocsFilterChange, - onTimeRangeChange, - onTimeRangeRefresh, - simulationFailureRate, - simulationSuccessRate, -}: OutcomeControlsProps) => { const handleQuerySubmit: StreamsAppSearchBarProps['onQuerySubmit'] = ( { dateRange }, isUpdate ) => { if (!isUpdate) { - return onTimeRangeRefresh(); + return handleRefresh(); } if (dateRange) { - onTimeRangeChange({ - from: dateRange.from, - to: dateRange?.to, - mode: dateRange.mode, - }); + dateRangeRef.send({ type: 'dateRange.update', range: dateRange }); } }; - const getFilterButtonPropsFor = (filterId: DocsFilterOption) => ({ - hasActiveFilters: docsFilter === filterId, - onClick: () => onDocsFilterChange(filterId), + const getFilterButtonPropsFor = (filter: PreviewDocsFilterOption) => ({ + hasActiveFilters: previewDocsFilter === filter, + onClick: () => changePreviewDocsFilter(filter), }); return ( @@ -141,45 +94,60 @@ const OutcomeControls = ({ { defaultMessage: 'Filter for all, matching or unmatching previewed documents.' } )} > - - {docsFilterOptions.outcome_filter_all.label} + + {previewDocsFilterOptions.outcome_filter_all.label} - {docsFilterOptions.outcome_filter_matched.label} + {previewDocsFilterOptions.outcome_filter_matched.label} - {docsFilterOptions.outcome_filter_unmatched.label} + {previewDocsFilterOptions.outcome_filter_unmatched.label} ); }; -interface OutcomePreviewTableProps { - documents: SampleDocument[]; - columns: string[]; -} +const MemoPreviewTable = React.memo(PreviewTable, (prevProps, nextProps) => { + // Need to specify the props to compare since the columns might be the same even if the useMemo call returns a new array + return ( + prevProps.documents === nextProps.documents && + isEqual(prevProps.displayColumns, nextProps.displayColumns) + ); +}); -const OutcomePreviewTable = ({ documents, columns }: OutcomePreviewTableProps) => { - if (isEmpty(documents)) { +const OutcomePreviewTable = () => { + const processors = useSimulatorSelector((state) => state.context.processors); + const detectedFields = useSimulatorSelector((state) => state.context.simulation?.detected_fields); + const previewDocsFilter = useSimulatorSelector((state) => state.context.previewDocsFilter); + const previewDocuments = useSimulatorSelector((state) => state.context.previewDocuments); + + const previewColumns = useMemo( + () => getTableColumns(processors, detectedFields ?? [], previewDocsFilter), + [detectedFields, previewDocsFilter, processors] + ); + + if (!previewDocuments || isEmpty(previewDocuments)) { return ( ; + return ; }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/grok/grok_ai_suggestions.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/grok/grok_ai_suggestions.tsx index 6a61317c01c6..ab5fd8579f92 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/grok/grok_ai_suggestions.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/grok/grok_ai_suggestions.tsx @@ -29,10 +29,10 @@ import type { FindActionResult } from '@kbn/actions-plugin/server'; import { UseGenAIConnectorsResult } from '@kbn/observability-ai-assistant-plugin/public/hooks/use_genai_connectors'; import { useAbortController, useBoolean } from '@kbn/react-hooks'; import useObservable from 'react-use/lib/useObservable'; +import { useStreamDetail } from '../../../../../hooks/use_stream_detail'; import { useKibana } from '../../../../../hooks/use_kibana'; import { GrokFormState, ProcessorFormState } from '../../types'; -import { UseProcessingSimulatorReturn } from '../../hooks/use_processing_simulator'; -import { useSimulatorContext } from '../../simulator_context'; +import { useSimulatorSelector } from '../../state_management/stream_enrichment_state_machine'; const RefreshButton = ({ generatePatterns, @@ -148,12 +148,10 @@ function useAiEnabled() { } function InnerGrokAiSuggestions({ - refreshSimulation, - filteredSamples, + previewDocuments, definition, }: { - refreshSimulation: UseProcessingSimulatorReturn['refreshSimulation']; - filteredSamples: FlattenRecord[]; + previewDocuments: FlattenRecord[]; definition: IngestStreamGetResponse; }) { const { dependencies } = useKibana(); @@ -193,7 +191,7 @@ function InnerGrokAiSuggestions({ body: { field: fieldValue, connectorId: currentConnector, - samples: filteredSamples, + samples: previewDocuments, }, }, }) @@ -210,7 +208,7 @@ function InnerGrokAiSuggestions({ currentConnector, definition.stream.name, fieldValue, - filteredSamples, + previewDocuments, streamsRepositoryClient, ]); @@ -225,11 +223,11 @@ function InnerGrokAiSuggestions({ const hasValidField = useMemo(() => { return Boolean( currentFieldName && - filteredSamples.some( + previewDocuments.some( (sample) => sample[currentFieldName] && typeof sample[currentFieldName] === 'string' ) ); - }, [filteredSamples, currentFieldName]); + }, [previewDocuments, currentFieldName]); const filteredSuggestions = suggestions?.patterns .map((pattern, i) => ({ @@ -304,7 +302,6 @@ function InnerGrokAiSuggestions({ { value: suggestion.pattern }, ]); } - refreshSimulation(); }} data-test-subj="streamsAppGrokAiSuggestionsButton" iconType="plusInCircle" @@ -360,7 +357,8 @@ export function GrokAiSuggestions() { core: { http }, } = useKibana(); const { enabled: isAiEnabled, couldBeEnabled } = useAiEnabled(); - const props = useSimulatorContext(); + const { definition } = useStreamDetail(); + const previewDocuments = useSimulatorSelector((state) => state.context.previewDocuments); if (!isAiEnabled && couldBeEnabled) { return ( @@ -390,8 +388,9 @@ export function GrokAiSuggestions() { ); } - if (!isAiEnabled) { + if (!isAiEnabled || !definition) { return null; } - return ; + + return ; } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/index.tsx index dce409d449a6..46f977776003 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/index.tsx @@ -20,94 +20,84 @@ import { EuiText, EuiBadge, } from '@elastic/eui'; +import { useSelector } from '@xstate5/react'; import { i18n } from '@kbn/i18n'; -import { ProcessorType, IngestStreamGetResponse } from '@kbn/streams-schema'; -import { isEmpty, isEqual } from 'lodash'; -import React, { useEffect, useMemo, useState } from 'react'; +import { isEmpty } from 'lodash'; +import React, { useEffect, useMemo } from 'react'; import { useForm, SubmitHandler, FormProvider, useWatch } from 'react-hook-form'; import { css } from '@emotion/react'; -import { useBoolean } from '@kbn/react-hooks'; +import { DiscardPromptOptions, useDiscardConfirm } from '../../../../hooks/use_discard_confirm'; import { DissectProcessorForm } from './dissect'; import { GrokProcessorForm } from './grok'; import { ProcessorTypeSelector } from './processor_type_selector'; import { ProcessorFormState, ProcessorDefinitionWithUIAttributes } from '../types'; import { - getDefaultFormState, + getFormStateFrom, convertFormStateToProcessor, isGrokProcessor, isDissectProcessor, + getDefaultFormStateByType, } from '../utils'; -import { useDiscardConfirm } from '../../../../hooks/use_discard_confirm'; -import { ProcessorMetrics, UseProcessingSimulatorReturn } from '../hooks/use_processing_simulator'; import { ProcessorErrors, ProcessorMetricBadges } from './processor_metrics'; -import { UseDefinitionReturn } from '../hooks/use_definition'; +import { + useStreamEnrichmentEvents, + useStreamsEnrichmentSelector, + useSimulatorSelector, + StreamEnrichmentContext, +} from '../state_management/stream_enrichment_state_machine'; +import { ProcessorMetrics } from '../state_management/simulation_state_machine'; -export interface ProcessorPanelProps { - definition: IngestStreamGetResponse; - processorMetrics?: ProcessorMetrics; - onWatchProcessor: UseProcessingSimulatorReturn['watchProcessor']; -} - -export interface AddProcessorPanelProps extends ProcessorPanelProps { - isInitiallyOpen?: boolean; - onAddProcessor: UseDefinitionReturn['addProcessor']; -} - -export interface EditProcessorPanelProps extends ProcessorPanelProps { - processor: ProcessorDefinitionWithUIAttributes; - onDeleteProcessor: UseDefinitionReturn['deleteProcessor']; - onUpdateProcessor: UseDefinitionReturn['updateProcessor']; -} - -export function AddProcessorPanel({ - onAddProcessor, - onWatchProcessor, - processorMetrics, -}: AddProcessorPanelProps) { +export function AddProcessorPanel() { const { euiTheme } = useEuiTheme(); - const [hasChanges, setHasChanges] = useState(false); - const [isOpen, { on: openPanel, off: closePanel }] = useBoolean(false); + const { addProcessor } = useStreamEnrichmentEvents(); - const defaultValues = useMemo(() => getDefaultFormState('grok'), []); + const processorRef = useStreamsEnrichmentSelector((state) => + state.context.processorsRefs.find((p) => p.getSnapshot().matches('draft')) + ); + const processorMetrics = useSimulatorSelector( + (state) => processorRef && state.context.simulation?.processors_metrics[processorRef.id] + ); + + const isOpen = Boolean(processorRef); + + const defaultValues = useMemo(() => getDefaultFormStateByType('grok'), []); const methods = useForm({ defaultValues, mode: 'onChange' }); const type = useWatch({ control: methods.control, name: 'type' }); useEffect(() => { - if (isOpen) { + if (!processorRef) { + methods.reset(defaultValues); + } + }, [defaultValues, methods, processorRef]); + + useEffect(() => { + if (processorRef) { const { unsubscribe } = methods.watch((value) => { - const draftProcessor = createDraftProcessorFromForm(value as ProcessorFormState); - onWatchProcessor(draftProcessor); - setHasChanges(!isEqual(defaultValues, value)); + const processor = convertFormStateToProcessor(value as ProcessorFormState); + processorRef.send({ type: 'processor.change', processor }); }); + return () => unsubscribe(); } - }, [defaultValues, isOpen, methods, onWatchProcessor]); + }, [methods, processorRef]); - const handleSubmit: SubmitHandler = async (data) => { - const processingDefinition = convertFormStateToProcessor(data); + const handleCancel = useDiscardConfirm( + () => processorRef?.send({ type: 'processor.cancel' }), + discardChangesPromptOptions + ); - onWatchProcessor({ id: 'draft', deleteIfExists: true }); - onAddProcessor(processingDefinition, data.detected_fields); - closePanel(); - }; - - const handleCancel = () => { - closePanel(); - methods.reset(); - onWatchProcessor({ id: 'draft', deleteIfExists: true }); + const handleSubmit: SubmitHandler = async () => { + processorRef?.send({ type: 'processor.stage' }); }; const handleOpen = () => { const draftProcessor = createDraftProcessorFromForm(defaultValues); - onWatchProcessor(draftProcessor); - openPanel(); + addProcessor(draftProcessor); }; - const confirmDiscardAndClose = useDiscardConfirm(handleCancel); - const buttonContent = isOpen ? ( i18n.translate( 'xpack.streams.streamDetailView.managementTab.enrichment.processorPanel.addingProcessor', @@ -146,7 +136,7 @@ export function AddProcessorPanel({ {i18n.translate( @@ -195,74 +185,74 @@ const createDraftProcessorFromForm = ( return { id: 'draft', - status: 'draft', type: formState.type, ...processingDefinition, }; }; -export function EditProcessorPanel({ - onDeleteProcessor, - onUpdateProcessor, - onWatchProcessor, - processor, - processorMetrics, -}: EditProcessorPanelProps) { - const { euiTheme } = useEuiTheme(); +export interface EditProcessorPanelProps { + processorRef: StreamEnrichmentContext['processorsRefs'][number]; + processorMetrics?: ProcessorMetrics; +} - const [hasChanges, setHasChanges] = useState(false); - const [isOpen, { on: openPanel, off: closePanel }] = useBoolean(); +export function EditProcessorPanel({ processorRef, processorMetrics }: EditProcessorPanelProps) { + const { euiTheme } = useEuiTheme(); + const state = useSelector(processorRef, (s) => s); + const previousProcessor = state.context.previousProcessor; + const processor = state.context.processor; const processorDescription = getProcessorDescription(processor); - const isDraft = processor.status === 'draft'; - const isUnsaved = isDraft || processor.status === 'updated'; + const isOpen = state.matches({ configured: 'edit' }); + const isNew = state.context.isNew; + const isUnsaved = isNew || state.context.isUpdated; - const defaultValues = useMemo(() => getDefaultFormState(processor.type, processor), [processor]); + const defaultValues = useMemo(() => getFormStateFrom(processor), [processor]); - const methods = useForm({ defaultValues, mode: 'onChange' }); + const methods = useForm({ + defaultValues, + mode: 'onChange', + }); const type = useWatch({ control: methods.control, name: 'type' }); useEffect(() => { const { unsubscribe } = methods.watch((value) => { const processingDefinition = convertFormStateToProcessor(value as ProcessorFormState); - onWatchProcessor({ - id: processor.id, - status: processor.status, - type: value.type as ProcessorType, - ...processingDefinition, + processorRef.send({ + type: 'processor.change', + processor: processingDefinition, }); - setHasChanges(!isEqual(defaultValues, value)); }); return () => unsubscribe(); - }, [defaultValues, methods, onWatchProcessor, processor.id, processor.status]); + }, [methods, processorRef]); - const handleSubmit: SubmitHandler = (data) => { - const processorDefinition = convertFormStateToProcessor(data); + useEffect(() => { + const subscription = processorRef.on('processor.changesDiscarded', () => { + methods.reset(getFormStateFrom(previousProcessor)); + }); - onUpdateProcessor(processor.id, processorDefinition, isDraft ? 'draft' : 'updated'); - closePanel(); + return () => subscription.unsubscribe(); + }, [methods, previousProcessor, processorRef]); + + const handleCancel = useDiscardConfirm( + () => processorRef?.send({ type: 'processor.cancel' }), + discardChangesPromptOptions + ); + + const handleProcessorDelete = useDiscardConfirm( + () => processorRef?.send({ type: 'processor.delete' }), + deleteProcessorPromptOptions + ); + + const handleSubmit: SubmitHandler = () => { + processorRef.send({ type: 'processor.update' }); }; - const handleProcessorDelete = () => { - onDeleteProcessor(processor.id); - closePanel(); + const handleOpen = () => { + processorRef.send({ type: 'processor.edit' }); }; - const handleCancel = () => { - methods.reset(); - closePanel(); - }; - - const confirmDiscardAndClose = useDiscardConfirm(handleCancel); - const confirmDeletionAndClose = useDiscardConfirm(handleProcessorDelete, { - title: deleteProcessorTitle, - message: deleteProcessorMessage, - confirmButtonText: deleteProcessorLabel, - cancelButtonText: deleteProcessorCancelLabel, - }); - const buttonContent = isOpen ? ( {processor.type.toUpperCase()} ) : ( @@ -278,7 +268,7 @@ export function EditProcessorPanel({ return ( {i18n.translate( @@ -321,7 +311,7 @@ export function EditProcessorPanel({ size="s" fill onClick={methods.handleSubmit(handleSubmit)} - disabled={!methods.formState.isValid} + disabled={!methods.formState.isValid || !state.can({ type: 'processor.update' })} > {i18n.translate( 'xpack.streams.streamDetailView.managementTab.enrichment.processorPanel.confirmEditProcessor', @@ -342,7 +332,7 @@ export function EditProcessorPanel({ )} {type === 'grok' && } {type === 'dissect' && } - - - {deleteProcessorLabel} - + + + {deleteProcessorLabel} + {processorMetrics && !isEmpty(processorMetrics.errors) && ( )} @@ -397,21 +387,6 @@ const deleteProcessorLabel = i18n.translate( { defaultMessage: 'Delete processor' } ); -const deleteProcessorCancelLabel = i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.deleteProcessorCancelLabel', - { defaultMessage: 'Cancel' } -); - -const deleteProcessorTitle = i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.deleteProcessorTitle', - { defaultMessage: 'Are you sure you want to delete this processor?' } -); - -const deleteProcessorMessage = i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.deleteProcessorMessage', - { defaultMessage: 'Deleting this processor will permanently impact the field configuration.' } -); - const getProcessorDescription = (processor: ProcessorDefinitionWithUIAttributes) => { if (isGrokProcessor(processor)) { return processor.grok.patterns.join(' • '); @@ -421,3 +396,37 @@ const getProcessorDescription = (processor: ProcessorDefinitionWithUIAttributes) return ''; }; + +export const discardChangesPromptOptions: DiscardPromptOptions = { + message: i18n.translate('xpack.streams.enrichment.processor.discardChanges.message', { + defaultMessage: 'Are you sure you want to discard your changes?', + }), + title: i18n.translate('xpack.streams.enrichment.processor.discardChanges.title', { + defaultMessage: 'Discard changes?', + }), + confirmButtonText: i18n.translate( + 'xpack.streams.enrichment.processor.discardChanges.confirmButtonText', + { defaultMessage: 'Discard' } + ), + cancelButtonText: i18n.translate( + 'xpack.streams.enrichment.processor.discardChanges.cancelButtonText', + { defaultMessage: 'Keep editing' } + ), +}; + +export const deleteProcessorPromptOptions: DiscardPromptOptions = { + message: i18n.translate('xpack.streams.enrichment.processor.deleteProcessor.message', { + defaultMessage: 'Deleting this processor will permanently impact the field configuration.', + }), + title: i18n.translate('xpack.streams.enrichment.processor.deleteProcessor.title', { + defaultMessage: 'Are you sure you want to delete this processor?', + }), + confirmButtonText: i18n.translate( + 'xpack.streams.enrichment.processor.deleteProcessor.confirmButtonText', + { defaultMessage: 'Delete processor' } + ), + cancelButtonText: i18n.translate( + 'xpack.streams.enrichment.processor.deleteProcessor.cancelButtonText', + { defaultMessage: 'Cancel' } + ), +}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx index edc211ddec06..f2fbd7a8e074 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx @@ -20,7 +20,7 @@ import React from 'react'; import { i18n } from '@kbn/i18n'; import useToggle from 'react-use/lib/useToggle'; import { css } from '@emotion/react'; -import { ProcessorMetrics } from '../hooks/use_processing_simulator'; +import { ProcessorMetrics } from '../state_management/simulation_state_machine'; type ProcessorMetricBadgesProps = ProcessorMetrics; @@ -116,7 +116,9 @@ export const ProcessorErrors = ({ metrics }: { metrics: ProcessorMetrics }) => { const shouldDisplayErrorToggle = remainingCount > 0; const getCalloutProps = (type: ProcessorMetrics['errors'][number]['type']): EuiCallOutProps => { - const isWarningError = type === 'generic_processor_failure' && success_rate > 0; + const isWarningError = + type === 'non_additive_processor_failure' || + (type === 'generic_processor_failure' && success_rate > 0); return { color: isWarningError ? 'warning' : 'danger', diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_type_selector.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_type_selector.tsx index a0d436003aef..01e765ccfa84 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_type_selector.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_type_selector.tsx @@ -12,7 +12,7 @@ import { FormattedMessage } from '@kbn/i18n-react'; import { useController, useFormContext, useWatch } from 'react-hook-form'; import { ProcessorType } from '@kbn/streams-schema'; import { useKibana } from '../../../../hooks/use_kibana'; -import { getDefaultFormState } from '../utils'; +import { getDefaultFormStateByType } from '../utils'; import { ProcessorFormState } from '../types'; interface TAvailableProcessor { @@ -38,7 +38,7 @@ export const ProcessorTypeSelector = ({ const processorType = useWatch<{ type: ProcessorType }>({ name: 'type' }); const handleChange = (type: ProcessorType) => { - const formState = getDefaultFormState(type); + const formState = getDefaultFormStateByType(type); reset(formState); }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors_list.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors_list.tsx index 5334fc95403e..35b19a1f5734 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors_list.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors_list.tsx @@ -10,20 +10,19 @@ import { EuiDraggable } from '@elastic/eui'; import { EditProcessorPanel, type EditProcessorPanelProps } from './processors'; export const DraggableProcessorListItem = ({ - processor, idx, ...props }: EditProcessorPanelProps & { idx: number }) => ( - {() => } + {() => } ); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulation_playground.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulation_playground.tsx index bd38edb95efe..11dab5583283 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulation_playground.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulation_playground.tsx @@ -5,83 +5,56 @@ * 2.0. */ -import React, { useState } from 'react'; +import React from 'react'; import { i18n } from '@kbn/i18n'; import { EuiFlexItem, EuiSpacer, EuiTab, EuiTabs } from '@elastic/eui'; -import { IngestStreamGetResponse, isWiredStreamGetResponse } from '@kbn/streams-schema'; +import { isWiredStreamGetResponse } from '@kbn/streams-schema'; import { ProcessorOutcomePreview } from './processor_outcome_preview'; -import { TableColumn, UseProcessingSimulatorReturn } from './hooks/use_processing_simulator'; +import { + useStreamEnrichmentEvents, + useStreamsEnrichmentSelector, +} from './state_management/stream_enrichment_state_machine'; -interface SimulationPlaygroundProps { - definition: IngestStreamGetResponse; - columns: TableColumn[]; - isLoading: UseProcessingSimulatorReturn['isLoading']; - simulation: UseProcessingSimulatorReturn['simulation']; - filteredSamples: UseProcessingSimulatorReturn['filteredSamples']; - selectedDocsFilter: UseProcessingSimulatorReturn['selectedDocsFilter']; - setSelectedDocsFilter: UseProcessingSimulatorReturn['setSelectedDocsFilter']; - onRefreshSamples: UseProcessingSimulatorReturn['refreshSamples']; -} +export const SimulationPlayground = () => { + const isViewingDataPreview = useStreamsEnrichmentSelector((state) => + state.matches({ + ready: { enrichment: { displayingSimulation: 'viewDataPreview' } }, + }) + ); + const isViewingDetectedFields = useStreamsEnrichmentSelector((state) => + state.matches({ + ready: { enrichment: { displayingSimulation: 'viewDetectedFields' } }, + }) + ); + const canViewDetectedFields = useStreamsEnrichmentSelector((state) => + isWiredStreamGetResponse(state.context.definition) + ); -export const SimulationPlayground = (props: SimulationPlaygroundProps) => { - const { - definition, - columns, - isLoading, - simulation, - filteredSamples, - onRefreshSamples, - setSelectedDocsFilter, - selectedDocsFilter, - } = props; - - const tabs = { - dataPreview: { - name: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.dataPreview', - { defaultMessage: 'Data preview' } - ), - }, - ...(isWiredStreamGetResponse(definition) && { - detectedFields: { - name: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFields', - { defaultMessage: 'Detected fields' } - ), - }, - }), - } as const; - - const [selectedTabId, setSelectedTabId] = useState('dataPreview'); + const { viewSimulationPreviewData, viewSimulationDetectedFields } = useStreamEnrichmentEvents(); return ( <> - {Object.entries(tabs).map(([tabId, tab]) => ( - setSelectedTabId(tabId as keyof typeof tabs)} - > - {tab.name} + + {i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.dataPreview', + { defaultMessage: 'Data preview' } + )} + + {canViewDetectedFields && ( + + {i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFields', + { defaultMessage: 'Detected fields' } + )} - ))} + )} - {selectedTabId === 'dataPreview' && ( - - )} - {selectedTabId === 'detectedFields' && + {isViewingDataPreview && } + {isViewingDetectedFields && i18n.translate('xpack.streams.simulationPlayground.div.detectedFieldsLabel', { defaultMessage: 'WIP', })} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulator_context.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulator_context.tsx deleted file mode 100644 index 57a9d9cf3388..000000000000 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulator_context.tsx +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import React, { useMemo } from 'react'; -import { createContext } from 'react'; -import { IngestStreamGetResponse } from '@kbn/streams-schema'; -import { UseProcessingSimulatorReturn } from './hooks/use_processing_simulator'; - -export const context = createContext(undefined); - -export interface SimulatorContextValue extends UseProcessingSimulatorReturn { - definition: IngestStreamGetResponse; -} - -export function SimulatorContextProvider({ - processingSimulator, - definition, - - children, -}: { - processingSimulator: UseProcessingSimulatorReturn; - definition: IngestStreamGetResponse; - children: React.ReactNode; -}) { - const contextValue = useMemo(() => { - return { - definition, - ...processingSimulator, - }; - }, [definition, processingSimulator]); - return {children}; -} - -export function useSimulatorContext() { - const ctx = React.useContext(context); - if (!ctx) { - throw new Error( - 'useStreamsEnrichmentContext must be used within a StreamsEnrichmentContextProvider' - ); - } - return ctx; -} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/index.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/index.ts new file mode 100644 index 000000000000..11bed7d7d2f2 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './processor_state_machine'; +export * from './types'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/processor_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/processor_state_machine.ts new file mode 100644 index 000000000000..e14ab48b60a7 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/processor_state_machine.ts @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { ActorRefFrom, assign, emit, forwardTo, sendTo, setup } from 'xstate5'; +import { isEqual } from 'lodash'; +import { ProcessorDefinition, getProcessorType } from '@kbn/streams-schema'; +import { ProcessorInput, ProcessorContext, ProcessorEvent, ProcessorEmittedEvent } from './types'; + +export type ProcessorActorRef = ActorRefFrom; + +export const processorMachine = setup({ + types: { + input: {} as ProcessorInput, + context: {} as ProcessorContext, + events: {} as ProcessorEvent, + emitted: {} as ProcessorEmittedEvent, + }, + actions: { + changeProcessor: assign(({ context }, params: { processor: ProcessorDefinition }) => ({ + processor: { + id: context.processor.id, + type: getProcessorType(params.processor), + ...params.processor, + }, + })), + resetToPrevious: assign(({ context }) => ({ + processor: context.previousProcessor, + })), + markAsUpdated: assign(({ context }) => ({ + previousProcessor: context.processor, + isUpdated: true, + })), + forwardEventToParent: forwardTo(({ context }) => context.parentRef), + forwardChangeEventToParent: sendTo( + ({ context }) => context.parentRef, + ({ context }) => ({ + type: 'processor.change', + id: context.processor.id, + }) + ), + notifyProcessorDelete: sendTo( + ({ context }) => context.parentRef, + ({ context }) => ({ + type: 'processor.delete', + id: context.processor.id, + }) + ), + emitChangesDiscarded: emit({ type: 'processor.changesDiscarded' }), + }, + guards: { + isDraft: ({ context }) => context.isNew, + hasEditingChanges: ({ context }) => !isEqual(context.previousProcessor, context.processor), + }, +}).createMachine({ + /** @xstate-layout N4IgpgJg5mDOIC5QAcBOB7AxnW7UDoBXAO1TnQBsA3SAYgG0AGAXURXVgEsAXT9YtiAAeiAMz4AbACYJAFgDsATkWN5jCfNETRUgDQgAnogCM8gBz4ArMbOjjsxossalAX1f60WHHiKly1HT0xqxIIMgcPHwCYSIIUrKW+IxSlimpEtrOorL6RgimFta2jLLKMlJSivLunhjYsLgEEKgAhgBm3PiQUcRQtF4NTfiw3K0wTKHsXLz8gnGVsviilopmxpWMxnYSeYiJ4hLV8kpSovLGpme14fU+zW2d3RC9-YP3+JitxNgUk4IRGbReaIRT4RSVUTrMpmI7yByWPYICSlfC2RSiHSMSzyKo1Dy3byNXwtDpdHq8PoDO7EgiYAAW3wmLABkVmMVAcQAtNtGOC7NYqpYpIxHCckVolkdZMZnBoUvJLGZLDd3rTPvx2pwoIQyBB8JwIBQwNSicMKf8woConNYohpBJwZopOszGZZEdFMYJal8IqUplTMUpMZVTThphNdrdZBnjw45S3uHfIRkBBWtwwJbpjaOcJ7VDwcYvfI5KkVhIzEiDpJjqdzpdcWGzb5I8QtTq9QmE5wqWqI99ftnwmzgXaEG7HbI7FD7NKId7DIgqnydJZrKVKhJ7GZm0NW1HO7GKT2+8nmmBjZnh9b2SCEJjjH715iZVpMW6kfC+TJZZdjJWZSKHuHxth2Mb6ieFK9kmLZ0oyfRZiyVqjranKIGYLr4KkLpmBiphQkcSJmFsfpKE4UjmCKGLuASxDoBAcAAuerJAmh+YIFylQWIksJQgotjGIwUJfksEJbDkKzFriOQgeqJBkLggQQKxub3jyJH4Lx2juuYdjCVWS4FBcyQqCGIYnMoSoqgS-Yko83CqXe448uY-J-kKIpivISKUfI+DFjKsjTmUlarHJwykk80F9E5Y7ocifKyBUiruvpsi2NWKgBWkbprKIGLSFIEUHu20Z6nF7FxDiFjVDorrup6i75ABUhaSKc4aBklQ2XUcEamVR76oaxqVXmcQnEkthKMKUINuKRkJGC4n2KoZhqMlmIlXSh4QQmY33pkYJaC6Xp4phOhIhIwp+tulgrDIWyndtA3gV2UEvImB3jhc-knXhGzVBdehGd+2F3f+gHKC9jFXpA30JVyFi2GkKzVPd8JQiD+S4kkXp4dkQmlIktGuEAA */ + id: 'processor', + context: ({ input }) => ({ + parentRef: input.parentRef, + previousProcessor: input.processor, + processor: input.processor, + isNew: input.isNew ?? false, + }), + initial: 'unresolved', + states: { + unresolved: { + always: [{ target: 'draft', guard: 'isDraft' }, { target: 'configured' }], + }, + draft: { + initial: 'editing', + states: { + editing: { + on: { + 'processor.stage': { + target: '#configured', + actions: [{ type: 'markAsUpdated' }, { type: 'forwardEventToParent' }], + }, + 'processor.cancel': { + target: '#deleted', + actions: [{ type: 'resetToPrevious' }], + }, + 'processor.change': { + actions: [ + { type: 'changeProcessor', params: ({ event }) => event }, + { type: 'forwardChangeEventToParent' }, + ], + }, + }, + }, + }, + }, + configured: { + id: 'configured', + initial: 'idle', + states: { + idle: { + on: { 'processor.edit': 'edit' }, + }, + edit: { + initial: 'editing', + states: { + editing: { + on: { + 'processor.update': { + guard: 'hasEditingChanges', + target: '#configured.idle', + actions: [{ type: 'markAsUpdated' }, { type: 'forwardEventToParent' }], + }, + 'processor.cancel': { + target: '#configured.idle', + actions: [ + { type: 'emitChangesDiscarded' }, + { type: 'resetToPrevious' }, + { type: 'forwardEventToParent' }, + ], + }, + 'processor.delete': '#deleted', + 'processor.change': { + actions: [ + { type: 'changeProcessor', params: ({ event }) => event }, + { type: 'forwardChangeEventToParent' }, + ], + }, + }, + }, + }, + }, + }, + }, + deleted: { + id: 'deleted', + type: 'final', + entry: [{ type: 'notifyProcessorDelete' }], + }, + }, +}); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/types.ts new file mode 100644 index 000000000000..d6bb62889f17 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/processor_state_machine/types.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ProcessorDefinition } from '@kbn/streams-schema'; +import { ActorRef, Snapshot } from 'xstate5'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; + +export type ProcessorToParentEvent = + | { type: 'processor.change'; id: string } + | { type: 'processor.delete'; id: string } + | { type: 'processor.stage' }; + +export interface ProcessorInput { + parentRef: ProcessorParentActor; + processor: ProcessorDefinitionWithUIAttributes; + isNew?: boolean; +} + +export type ProcessorParentActor = ActorRef, ProcessorToParentEvent>; + +export interface ProcessorContext { + parentRef: ProcessorParentActor; + previousProcessor: ProcessorDefinitionWithUIAttributes; + processor: ProcessorDefinitionWithUIAttributes; + isNew: boolean; + isUpdated?: boolean; +} + +export type ProcessorEvent = + | { type: 'processor.cancel' } + | { type: 'processor.change'; processor: ProcessorDefinition } + | { type: 'processor.delete' } + | { type: 'processor.edit' } + | { type: 'processor.stage' } + | { type: 'processor.update' }; + +export interface ProcessorEmittedEvent { + type: 'processor.changesDiscarded'; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/index.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/index.ts new file mode 100644 index 000000000000..2e33f97d5674 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './preview_docs_filter'; +export * from './simulation_state_machine'; +export * from './types'; +export * from './utils'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/preview_docs_filter.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/preview_docs_filter.ts new file mode 100644 index 000000000000..f27ad9078283 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/preview_docs_filter.ts @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { i18n } from '@kbn/i18n'; + +export const previewDocsFilterOptions = { + outcome_filter_all: { + id: 'outcome_filter_all', + label: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.all', + { defaultMessage: 'All samples' } + ), + }, + outcome_filter_matched: { + id: 'outcome_filter_matched', + label: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.matched', + { defaultMessage: 'Matched' } + ), + }, + outcome_filter_unmatched: { + id: 'outcome_filter_unmatched', + label: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.unmatched', + { defaultMessage: 'Unmatched' } + ), + }, +} as const; + +export type PreviewDocsFilterOption = keyof typeof previewDocsFilterOptions; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/samples_fetcher_actor.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/samples_fetcher_actor.ts new file mode 100644 index 000000000000..9b4cee2ad2ce --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/samples_fetcher_actor.ts @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { i18n } from '@kbn/i18n'; +import { flattenObjectNestedLast } from '@kbn/object-utils'; +import { Condition, FlattenRecord } from '@kbn/streams-schema'; +import { fromPromise, ErrorActorEvent } from 'xstate5'; +import { errors as esErrors } from '@elastic/elasticsearch'; +import { DateRangeContext } from '../../../../../state_management/date_range_state_machine'; +import { SimulationMachineDeps } from './types'; + +export interface SamplesFetchInput { + condition?: Condition; + streamName: string; + absoluteTimeRange: DateRangeContext['absoluteTimeRange']; +} + +export function createSamplesFetchActor({ + streamsRepositoryClient, +}: Pick) { + return fromPromise(async ({ input, signal }) => { + const samplesBody = await streamsRepositoryClient.fetch('POST /api/streams/{name}/_sample', { + signal, + params: { + path: { name: input.streamName }, + body: { + if: input.condition, + start: input.absoluteTimeRange.start, + end: input.absoluteTimeRange.end, + size: 100, + }, + }, + }); + + return samplesBody.documents.map(flattenObjectNestedLast) as FlattenRecord[]; + }); +} + +export function createSamplesFetchFailureNofitier({ + toasts, +}: Pick) { + return (params: { event: unknown }) => { + const event = params.event as ErrorActorEvent; + toasts.addError(new Error(event.error.body.message), { + title: i18n.translate('xpack.streams.enrichment.simulation.samplesFetchError', { + defaultMessage: 'An issue occurred retrieving samples.', + }), + toastMessage: event.error.body.message, + }); + }; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_runner_actor.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_runner_actor.ts new file mode 100644 index 000000000000..f2e691022805 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_runner_actor.ts @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { i18n } from '@kbn/i18n'; +import { FlattenRecord } from '@kbn/streams-schema'; +import { fromPromise, ErrorActorEvent } from 'xstate5'; +import { errors as esErrors } from '@elastic/elasticsearch'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; +import { processorConverter } from '../../utils'; +import { Simulation, SimulationMachineDeps } from './types'; + +export interface SimulationRunnerInput { + streamName: string; + documents: FlattenRecord[]; + processors: ProcessorDefinitionWithUIAttributes[]; +} + +export function createSimulationRunnerActor({ + streamsRepositoryClient, +}: Pick) { + return fromPromise(({ input, signal }) => + streamsRepositoryClient.fetch('POST /api/streams/{name}/processing/_simulate', { + signal, + params: { + path: { name: input.streamName }, + body: { + documents: input.documents, + processing: input.processors.map(processorConverter.toSimulateDefinition), + }, + }, + }) + ); +} + +export function createSimulationRunFailureNofitier({ + toasts, +}: Pick) { + return (params: { event: unknown }) => { + const event = params.event as ErrorActorEvent; + toasts.addError(new Error(event.error.body.message), { + title: i18n.translate('xpack.streams.enrichment.simulation.simulationRunError', { + defaultMessage: 'An issue occurred running the simulation.', + }), + toastMessage: event.error.body.message, + }); + }; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts new file mode 100644 index 000000000000..c250e35ef2c4 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts @@ -0,0 +1,280 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { ActorRefFrom, MachineImplementationsFrom, SnapshotFrom, assign, setup } from 'xstate5'; +import { getPlaceholderFor } from '@kbn/xstate-utils'; +import { FlattenRecord, isSchema, processorDefinitionSchema } from '@kbn/streams-schema'; +import { isEmpty, isEqual } from 'lodash'; +import { + dateRangeMachine, + createDateRangeMachineImplementations, +} from '../../../../../state_management/date_range_state_machine'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; +import { processorConverter } from '../../utils'; +import { + SimulationInput, + SimulationContext, + SimulationEvent, + Simulation, + SimulationMachineDeps, +} from './types'; +import { PreviewDocsFilterOption } from './preview_docs_filter'; +import { + createSamplesFetchActor, + createSamplesFetchFailureNofitier, +} from './samples_fetcher_actor'; +import { + createSimulationRunnerActor, + createSimulationRunFailureNofitier, +} from './simulation_runner_actor'; +import { filterSimulationDocuments, composeSamplingCondition } from './utils'; + +export type SimulationActorRef = ActorRefFrom; +export type SimulationActorSnapshot = SnapshotFrom; +export interface ProcessorEventParams { + processors: ProcessorDefinitionWithUIAttributes[]; +} + +const hasSamples = (samples: FlattenRecord[]) => !isEmpty(samples); + +const isValidProcessor = (processor: ProcessorDefinitionWithUIAttributes) => + isSchema(processorDefinitionSchema, processorConverter.toAPIDefinition(processor)); +const hasValidProcessors = (processors: ProcessorDefinitionWithUIAttributes[]) => + processors.every(isValidProcessor); + +export const simulationMachine = setup({ + types: { + input: {} as SimulationInput, + context: {} as SimulationContext, + events: {} as SimulationEvent, + }, + actors: { + fetchSamples: getPlaceholderFor(createSamplesFetchActor), + runSimulation: getPlaceholderFor(createSimulationRunnerActor), + dateRangeMachine: getPlaceholderFor(() => dateRangeMachine), + }, + actions: { + notifySamplesFetchFailure: getPlaceholderFor(createSamplesFetchFailureNofitier), + notifySimulationRunFailure: getPlaceholderFor(createSimulationRunFailureNofitier), + storeTimeUpdated: getPlaceholderFor(createSimulationRunFailureNofitier), + storePreviewDocsFilter: assign((_, params: { filter: PreviewDocsFilterOption }) => ({ + previewDocsFilter: params.filter, + })), + storeProcessors: assign((_, params: ProcessorEventParams) => ({ + processors: params.processors, + })), + storeSamples: assign((_, params: { samples: FlattenRecord[] }) => ({ + samples: params.samples, + })), + storeSimulation: assign((_, params: { simulation: Simulation | undefined }) => ({ + simulation: params.simulation, + })), + derivePreviewDocuments: assign(({ context }) => { + return { + previewDocuments: context.simulation + ? filterSimulationDocuments(context.simulation.documents, context.previewDocsFilter) + : context.samples, + }; + }), + deriveSamplingCondition: assign(({ context }) => ({ + samplingCondition: composeSamplingCondition(context.processors), + })), + resetSimulation: assign({ + processors: [], + simulation: undefined, + samplingCondition: composeSamplingCondition([]), + previewDocsFilter: 'outcome_filter_all', + }), + }, + delays: { + debounceTime: 800, + }, + guards: { + canSimulate: ({ context }, params: ProcessorEventParams) => + hasSamples(context.samples) && hasValidProcessors(params.processors), + hasProcessors: (_, params: ProcessorEventParams) => !isEmpty(params.processors), + hasSamples: ({ context }) => hasSamples(context.samples), + hasValidProcessors: (_, params: ProcessorEventParams) => hasValidProcessors(params.processors), + shouldRefetchSamples: ({ context }) => + Boolean( + context.samplingCondition && + !isEqual(context.samplingCondition, composeSamplingCondition(context.processors)) + ), + }, +}).createMachine({ + /** @xstate-layout N4IgpgJg5mDOIC5SwJYFsCuAbAhgFxQHsA7AYgnzACUdiYA6DABwrzAG0AGAXUVCcKoCJPiAAeiAIwB2AGwAWetICsnTgA5Js2cuUBOZQCYANCACeiALSTOh+rfUBmWdL3TpzzkYC+306kxcYTIA7HwiYnoAYwALWhgABQAnMAA3FDAAdwARQijYADEULDYkrl4kEAEhCNEJBFlHSXplFXVDPRl1ZVlNUwsES2VJR3oRw0dDeXUXR04ZX390MODSUKCI+hTYMDxy0WqUYLrEKdcx9T15ecNJPVlO6X6rQ3V6SddDdzd5K-llRyLEDrcIkUhMJJ5OCwQhJWD0ABU+0qh2OlXqkhmikc8kkhmU6hm+n08meg3xzU4-3aUy0kmmyiBINWEKhsBhSWitCiYCwyP4giOtXRpxGynoBhmhkMsipLlJ5isb2U13u8mk6qmRl0TOWGzBrJ57Nh0TidA4PAOgrRoHqhL07w6elpBL0GjJlkcDoBvWkcr0k0ahl1gVBZEN0JNEF5uwtFQFNREIoQ0oU9GmOlusr9KrJNkULlkmLdk2l6g0IZWEXBkKNHPo0awsfYknjVWtwttpzmdnkuNubk6KscZL7dkL8g+-3V0hllf1kRQxCFOCwKAAXkuoKR+e3E8QTggRo43r9NGOs6m87J6L0epxnZcFPIOvOw-Qlyu15u6DvW1b90PMVmnUdwvC8GxnSuMkXCUSCvE6B4mimN9ggbMAACNCAwYgoi3ABhM0YFgGs2XrWJ4jjAChSTLsEF+aQJXuTgNT0J9Og9HF7FVV4Az9RwCVkVDNmjLCcLwuhCMokixFgPBKHoHAADNSgAClE7DcLAAAVdAwAASjWPV3w08SCKIuBd1RTtxCkWwQJfRxpFAjwNUmPNOBvGQix0dUNHuV5hJIdCxNw8zpNIWT5LYRSVLAJJ1MwzSeV0tADKM0M0NMsLJIs2AWzbazaNso8WJvbR-leTRhlkExFRTXp7G0bQOmkcZXCEvxgWMtCsEIHAIC3ABlHA0CYJsSIgEgwA-YhUkIABrGbYFG8a4AKXZYnigBBKI8FhKyO2KjEVE4JrGnmRwvSuiYPLsVRJ1eB9XAEq4gsiPqBuG1aJtIeLIU5cb8CU2E0HoFaxomja8C2pJdv2spLRRI6D2TQx5jeSQeh6X4XxkBUBhse6qUmctB1e+R3sU9l4oIOghp6iIqDAABHDAUBSNLiDwEjDsA5N6SaehbhYr08SlOqBiLM7CU8kYri+Nr1CpnAaaSOmoAZzKmdZ9nObAbnef-ZH+bowWzvslVZVuFVMRgnRhZmOR3AZS6qaSHDl3pxmwSm4gZqXealvBn3iCoT2dr2g6kYTGjUbouQb1UVQ5g8IwVFkUdJiUUDdCcAE3Wmd3PeG0O-qSAH6CBvAQaSMHmSZiO4ajxHCpRw9nbGDx5kHbQvT0MkCUYrHrnkHR3C0QEgWIQho3gSoG+Kor45KvsHT0ZjWPYyROJPFpqSxhRug1Tgp6WbXgs-AhVw3LdqJtEqxRvOZBJUMeRnUAmlRvNr+-+XRMTlmkFTFAEAmz3xsvUQs9BP7qjxP8DeFIyQakUAhF2fxdAbyptlCSUApLmnnrHB+doRhKF6P8ekWhuh23qpILG6YaqT0QqfZ0VNPqDXpj9OAEDjpSF6M0CYk41StC9EYD01wmq2FlAGB4rw+yUy6ovSIqsdjq1LhfMOusOZgC5jzHhK8MQPAdNoDebhpTox6E8eqXxRi3BGM5U+E8R7F2IF7TWod9FAVsGdAk-9nSunlIPboYw5jZjmE5ekytfDeCAA */ + id: 'simulation', + context: ({ input, self, spawn }) => ({ + dateRangeRef: spawn('dateRangeMachine', { + id: 'dateRange', + input: { + parentRef: self, + }, + }), + previewDocsFilter: 'outcome_filter_all', + previewDocuments: [], + processors: input.processors, + samples: [], + samplingCondition: composeSamplingCondition(input.processors), + streamName: input.streamName, + }), + initial: 'initializing', + on: { + 'dateRange.update': '.loadingSamples', + 'simulation.changePreviewDocsFilter': { + actions: [ + { type: 'storePreviewDocsFilter', params: ({ event }) => event }, + { type: 'derivePreviewDocuments' }, + ], + }, + 'simulation.reset': { + target: '.idle', + actions: [{ type: 'resetSimulation' }, { type: 'derivePreviewDocuments' }], + }, + // Handle adding/reordering processors + 'processors.*': { + target: '.assertingSimulationRequirements', + actions: [{ type: 'storeProcessors', params: ({ event }) => event }], + }, + 'processor.cancel': { + target: '.assertingSimulationRequirements', + actions: [{ type: 'storeProcessors', params: ({ event }) => event }], + }, + 'processor.change': { + target: '.debouncingChanges', + actions: [{ type: 'storeProcessors', params: ({ event }) => event }], + }, + 'processor.delete': [ + { + guard: { + type: 'hasProcessors', + params: ({ event }) => ({ processors: event.processors }), + }, + target: '.assertingSimulationRequirements', + actions: [{ type: 'storeProcessors', params: ({ event }) => event }], + }, + { + target: '.idle', + actions: [{ type: 'resetSimulation' }, { type: 'derivePreviewDocuments' }], + }, + ], + }, + states: { + initializing: { + always: [ + { + guard: { + type: 'hasProcessors', + params: ({ context }) => ({ processors: context.processors }), + }, + target: 'loadingSamples', + }, + { target: 'idle' }, + ], + }, + + idle: {}, + + debouncingChanges: { + on: { + 'processor.change': { + target: 'debouncingChanges', + actions: [{ type: 'storeProcessors', params: ({ event }) => event }], + description: 'Re-enter debouncing state and reinitialize the delayed processing.', + reenter: true, + }, + }, + after: { + debounceTime: [ + { + guard: 'shouldRefetchSamples', + target: 'loadingSamples', + actions: [{ type: 'deriveSamplingCondition' }], + }, + { target: 'assertingSimulationRequirements' }, + ], + }, + }, + + loadingSamples: { + invoke: { + id: 'samplesFetcherActor', + src: 'fetchSamples', + input: ({ context }) => ({ + condition: context.samplingCondition, + streamName: context.streamName, + absoluteTimeRange: context.dateRangeRef.getSnapshot().context.absoluteTimeRange, + }), + onDone: { + target: 'assertingSimulationRequirements', + actions: [ + { type: 'storeSamples', params: ({ event }) => ({ samples: event.output }) }, + { type: 'derivePreviewDocuments' }, + ], + }, + onError: { + target: 'idle', + actions: [ + { type: 'storeSamples', params: () => ({ samples: [] }) }, + { type: 'notifySamplesFetchFailure' }, + ], + }, + }, + }, + + assertingSimulationRequirements: { + always: [ + { + guard: { + type: 'canSimulate', + params: ({ context }) => ({ processors: context.processors }), + }, + target: 'runningSimulation', + }, + { target: 'idle' }, + ], + }, + + runningSimulation: { + invoke: { + id: 'simulationRunnerActor', + src: 'runSimulation', + input: ({ context }) => ({ + streamName: context.streamName, + documents: context.samples, + processors: context.processors, + }), + onDone: { + target: 'idle', + actions: [ + { type: 'storeSimulation', params: ({ event }) => ({ simulation: event.output }) }, + { type: 'derivePreviewDocuments' }, + ], + }, + onError: { + target: 'idle', + actions: [{ type: 'notifySimulationRunFailure' }], + }, + }, + }, + }, +}); + +export const createSimulationMachineImplementations = ({ + data, + streamsRepositoryClient, + toasts, +}: SimulationMachineDeps): MachineImplementationsFrom => ({ + actors: { + fetchSamples: createSamplesFetchActor({ streamsRepositoryClient }), + runSimulation: createSimulationRunnerActor({ streamsRepositoryClient }), + dateRangeMachine: dateRangeMachine.provide(createDateRangeMachineImplementations({ data })), + }, + actions: { + notifySamplesFetchFailure: createSamplesFetchFailureNofitier({ toasts }), + notifySimulationRunFailure: createSimulationRunFailureNofitier({ toasts }), + }, +}); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/types.ts new file mode 100644 index 000000000000..1aedbd31b363 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/types.ts @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Condition, FlattenRecord } from '@kbn/streams-schema'; +import { APIReturnType, StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; +import { IToasts } from '@kbn/core/public'; +import { DataPublicPluginStart } from '@kbn/data-plugin/public'; +import { + DateRangeToParentEvent, + DateRangeActorRef, +} from '../../../../../state_management/date_range_state_machine'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; +import { PreviewDocsFilterOption } from './preview_docs_filter'; + +export type Simulation = APIReturnType<'POST /api/streams/{name}/processing/_simulate'>; + +export interface SimulationMachineDeps { + data: DataPublicPluginStart; + streamsRepositoryClient: StreamsRepositoryClient; + toasts: IToasts; +} + +export type ProcessorMetrics = + Simulation['processors_metrics'][keyof Simulation['processors_metrics']]; + +export interface SimulationInput { + processors: ProcessorDefinitionWithUIAttributes[]; + streamName: string; +} + +export type SimulationEvent = + | DateRangeToParentEvent + | { type: 'simulation.changePreviewDocsFilter'; filter: PreviewDocsFilterOption } + | { type: 'simulation.reset' } + | { type: 'processors.add'; processors: ProcessorDefinitionWithUIAttributes[] } + | { type: 'processor.cancel'; processors: ProcessorDefinitionWithUIAttributes[] } + | { type: 'processor.change'; processors: ProcessorDefinitionWithUIAttributes[] } + | { type: 'processor.delete'; processors: ProcessorDefinitionWithUIAttributes[] }; + +export interface SimulationContext { + dateRangeRef: DateRangeActorRef; + previewDocsFilter: PreviewDocsFilterOption; + previewDocuments: FlattenRecord[]; + processors: ProcessorDefinitionWithUIAttributes[]; + samples: FlattenRecord[]; + samplingCondition?: Condition; + simulation?: Simulation; + streamName: string; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts new file mode 100644 index 000000000000..964261c568b3 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Condition, UnaryOperator, getProcessorConfig } from '@kbn/streams-schema'; +import { isEmpty, uniq } from 'lodash'; +import { ALWAYS_CONDITION } from '../../../../../util/condition'; +import { ProcessorDefinitionWithUIAttributes, DetectedField } from '../../types'; +import { PreviewDocsFilterOption } from './preview_docs_filter'; +import { Simulation } from './types'; + +export function composeSamplingCondition( + processors: ProcessorDefinitionWithUIAttributes[] +): Condition | undefined { + if (isEmpty(processors)) { + return undefined; + } + + const uniqueFields = uniq(getSourceFields(processors)); + + if (isEmpty(uniqueFields)) { + return ALWAYS_CONDITION; + } + + const conditions = uniqueFields.map((field) => ({ + field, + operator: 'exists' as UnaryOperator, + })); + + return { or: conditions }; +} + +export function getSourceFields(processors: ProcessorDefinitionWithUIAttributes[]): string[] { + return processors.map((processor) => getProcessorConfig(processor).field.trim()).filter(Boolean); +} + +export function getTableColumns( + processors: ProcessorDefinitionWithUIAttributes[], + fields: DetectedField[], + filter: PreviewDocsFilterOption +) { + const uniqueProcessorsFields = uniq(getSourceFields(processors)); + + if (filter === 'outcome_filter_unmatched') { + return uniqueProcessorsFields; + } + + const uniqueDetectedFields = uniq(fields.map((field) => field.name)); + + return uniq([...uniqueProcessorsFields, ...uniqueDetectedFields]); +} + +export function filterSimulationDocuments( + documents: Simulation['documents'], + filter: PreviewDocsFilterOption +) { + switch (filter) { + case 'outcome_filter_matched': + return documents.filter((doc) => doc.status === 'parsed').map((doc) => doc.value); + case 'outcome_filter_unmatched': + return documents.filter((doc) => doc.status !== 'parsed').map((doc) => doc.value); + case 'outcome_filter_all': + default: + return documents.map((doc) => doc.value); + } +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/index.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/index.ts new file mode 100644 index 000000000000..c9d28a5e5cd3 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './use_stream_enrichment'; +export * from './types'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts new file mode 100644 index 000000000000..f814dc958d83 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts @@ -0,0 +1,354 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { + MachineImplementationsFrom, + assign, + forwardTo, + not, + setup, + sendTo, + stopChild, + and, + ActorRefFrom, +} from 'xstate5'; +import { getPlaceholderFor } from '@kbn/xstate-utils'; +import { + IngestStreamGetResponse, + isRootStreamDefinition, + isWiredStreamGetResponse, +} from '@kbn/streams-schema'; +import { htmlIdGenerator } from '@elastic/eui'; +import { + StreamEnrichmentContext, + StreamEnrichmentEvent, + StreamEnrichmentInput, + StreamEnrichmentServiceDependencies, +} from './types'; +import { processorConverter } from '../../utils'; +import { + createUpsertStreamActor, + createUpsertStreamFailureNofitier, + createUpsertStreamSuccessNofitier, +} from './upsert_stream_actor'; + +import { ProcessorDefinitionWithUIAttributes } from '../../types'; +import { + simulationMachine, + createSimulationMachineImplementations, +} from '../simulation_state_machine'; +import { processorMachine, ProcessorActorRef } from '../processor_state_machine'; + +const createId = htmlIdGenerator(); + +export type StreamEnrichmentActorRef = ActorRefFrom; + +export const streamEnrichmentMachine = setup({ + types: { + input: {} as StreamEnrichmentInput, + context: {} as StreamEnrichmentContext, + events: {} as StreamEnrichmentEvent, + }, + actors: { + upsertStream: getPlaceholderFor(createUpsertStreamActor), + processorMachine: getPlaceholderFor(() => processorMachine), + simulationMachine: getPlaceholderFor(() => simulationMachine), + }, + actions: { + spawnSimulationMachine: assign(({ context, spawn }) => ({ + simulatorRef: + context.simulatorRef || + spawn('simulationMachine', { + id: 'simulator', + input: { + processors: getStagedProcessors(context), + streamName: context.definition.stream.name, + }, + }), + })), + notifyUpsertStreamSuccess: getPlaceholderFor(createUpsertStreamSuccessNofitier), + notifyUpsertStreamFailure: getPlaceholderFor(createUpsertStreamFailureNofitier), + refreshDefinition: () => {}, + storeDefinition: assign((_, params: { definition: IngestStreamGetResponse }) => ({ + definition: params.definition, + })), + stopProcessors: ({ context }) => context.processorsRefs.forEach(stopChild), + setupProcessors: assign(({ self, spawn }, params: { definition: IngestStreamGetResponse }) => { + const processorsRefs = params.definition.stream.ingest.processing.map((proc) => { + const processor = processorConverter.toUIDefinition(proc); + return spawn('processorMachine', { + id: processor.id, + input: { + parentRef: self, + processor, + }, + }); + }); + + return { + initialProcessorsRefs: processorsRefs, + processorsRefs, + }; + }), + addProcessor: assign( + ( + { context, spawn, self }, + { processor }: { processor: ProcessorDefinitionWithUIAttributes } + ) => { + const id = createId(); + return { + processorsRefs: context.processorsRefs.concat( + spawn('processorMachine', { + id, + input: { + parentRef: self, + processor: { ...processor, id }, + isNew: true, + }, + }) + ), + }; + } + ), + stopProcessor: stopChild((_, params: { id: string }) => params.id), + deleteProcessor: assign(({ context }, params: { id: string }) => ({ + processorsRefs: context.processorsRefs.filter((proc) => proc.id !== params.id), + })), + reorderProcessors: assign((_, params: { processorsRefs: ProcessorActorRef[] }) => ({ + processorsRefs: params.processorsRefs, + })), + reassignProcessors: assign(({ context }) => ({ + processorsRefs: [...context.processorsRefs], + })), + forwardProcessorsEventToSimulator: sendTo( + 'simulator', + ({ context }, params: { type: StreamEnrichmentEvent['type'] }) => ({ + type: params.type, + processors: getStagedProcessors(context), + }) + ), + sendResetEventToSimulator: sendTo('simulator', { type: 'simulation.reset' }), + }, + guards: { + hasMultipleProcessors: ({ context }) => context.processorsRefs.length > 1, + hasStagedChanges: ({ context }) => { + const { initialProcessorsRefs, processorsRefs } = context; + return ( + // Deleted processors + initialProcessorsRefs.length !== processorsRefs.length || + // New/updated processors + processorsRefs.some((processorRef) => { + const state = processorRef.getSnapshot(); + return state.matches('configured') && state.context.isUpdated; + }) || + // Processor order changed + processorsRefs.some( + (processorRef, pos) => initialProcessorsRefs[pos]?.id !== processorRef.id + ) + ); + }, + hasPendingDraft: ({ context }) => + Boolean(context.processorsRefs.find((p) => p.getSnapshot().matches('draft'))), + '!hasPendingDraft': not('hasPendingDraft'), + canUpdateStream: and(['hasStagedChanges', '!hasPendingDraft']), + isStagedProcessor: ({ context }, params: { id: string }) => { + const processorRef = context.processorsRefs.find((p) => p.id === params.id); + + if (!processorRef) return false; + return processorRef.getSnapshot().context.isNew; + }, + isRootStream: ({ context }) => isRootStreamDefinition(context.definition.stream), + isWiredStream: ({ context }) => isWiredStreamGetResponse(context.definition), + }, +}).createMachine({ + /** @xstate-layout N4IgpgJg5mDOIC5RgHYCcCWBjAFgZQBc0wBDAWwDoMUMCMSAbDAL2qgGIBtABgF1FQABwD2sWhmEoBIAB6IAtAA4ATAE4K3AGzdlizQBYAjPoCsi7voA0IAJ4KAzIoomT3cwHYdixfferFAL4B1qiYuITE5FQ0dIwsbFyG-EggImJ0ktJyCErGzpqGqoaa7u7KmkWG1nYIyiYUytwmFX7mjZqK+kEh6Nj4RKSUkRA27LADUcRYYBgAbpA8yUKi4pkp2fJ13BSObib6Ksr6+o3u1QrHFO6Kqi6q+h0PBcrd4L3hE0OkIxTjkZQYCAMMBjT4UYiwMAERbSNKrKTrRDKPL6biqVTKTE6AyKExnWyIQz2PImZH7Y76Rz2VSvUJ9CKDcHfGy-MGA4Gg-4UACugggJAIYBhKThGQRoA23mUV3cRJMGMaWiK5wQxl0FBOJnsakMWmJD1p736XOGLL+jN5-LoKA4EEkYGis2EAGsHbzIWgCAzyABBLAEYRoYXLdIScWyBQuexXezcNyqdz6VSaeymFVE1TR-RlZSy3GGYp4w1hY2M02srmWgUJMBoNCBiiCBgCgBmgco7trXs+foDQb4sJWYqyDgLFEzufs2k0zWuePTqM044O2nMqh01y6wTeJe9XxIPzpuDIqAIFAgGFgTZINjYAAV69NYLBA7B2IJH3AX2hYBQDxBg1SIcwxHHJOm2bhSn2TxjmUex7BMFUp3cDVzCnApiR8SCTGLekwXLI8cBPFAzwvK9m1vG0H2EJ9vzfD8aK-V8mUDCBa0A0UQMRHJjiXLUmmpA5KQqQx8RqMwnG4QozDjbUzGKXCPhNZkKEI4jSMva9KKgajaNfd9P2fBs2OBQUOOAtYJQcfYdmg1wkzjbhqSQmzZOudxmiaAxDBw7dCL3JkDxZNTT3PTSKPvQy6IMxijLQVkSBgczQ0siMcmubZ-F8DzikaSlFBVeVo2gjpCk8dzAj8o0AoIo11LC8ibzYPAMDIblmzFGK9J-CgACpkvhUDUSueUigqfY0SkzQVXRPjvFlPF-BMYwtx6Xd8JUkKSIarTmta9rq0kLqmPi-qBxFCzw2yYaPPRYpblRdciRVTpoxW1c-EMNQ0UU0tJk2urQrI3abRatqOrDChZgwMAAHcABEBRIB8wGhuGxn2iHJChmGEahMB-UgAAxGGGAgWABuHbj5BcaVdSwtFTHKE4ZtuVDdQQ5oVFXX6aoBkt6uBiLQcxw6UBxuHEYIZHiDR2GMfBsWKFwEgbTAFG5fhmjYBJhhBX7JYgJSq6FB8fQKG8WNCng65iXsF6imcXMikcUlmkKXmNqC1TAe2oWmpFxWxQlvHBUJiASbAMm3zEIPIc1pGNdxymuKsnJ9mlMabj8eD7hudMtEMHZ3G1ODY2ze5fO3FBhDY+AUn8z5B2N0D5F1NQrgqOCvrlRRjBVNvtQaG5mljdcEPsT2uWocQ4lYG1m8G6n-CL9wu+JZEEL7+2CR47ZzDcRmNzqeDfLWvDlKCxeqbT+QEycS2pMzXwfGe3fdSXNyXALOpF0nqr1qXx+Oacg19U5pTvrGGUcoFRxhEumOCKEEylGxK0MwU8ywqRAQCIEYAwGpQ2JoOC44iGzTRCoL6Vh35yQ1KUdyGdFDuAwf9b22CeR8mrAvC6LdqYGHULKDyWoDCOFzAVd+Lh6hJlRFJbMahkyrR3BfTB3stoEHwSbHImZ95YWti-O2Kpyh8ScmvNe64mb-3PkpZRh5fYaUatpXSJ164hiXrfAokibglGTGibUjDCo6AtlzLQTkfK3DPooqxLCbECyBuFAOUAwYHRvkbVxECpQWx0c-W2b8ageRQhuPwiYiidANAApRUTgq2J2sLBJotg4J2lknOG6jW6lCXPBFa2ZZIVGmrvdEHiOYuA6OUNwzD9zRL6ILOJ2lElY3FprfG4dI7RxadTTE5tn5FUMHoTEahWYDKckM7mWgxlMhfAweYEAABKwhhDdn+KstxRxaGlFEo8PQLNd7yBOBoVwfh3ZTgsH4IIQQgA */ + id: 'enrichStream', + context: ({ input }) => ({ + definition: input.definition, + initialProcessorsRefs: [], + processorsRefs: [], + }), + initial: 'initializing', + states: { + initializing: { + always: [ + { + target: 'resolvedRootStream', + guard: 'isRootStream', + }, + { target: 'ready' }, + ], + }, + ready: { + id: 'ready', + type: 'parallel', + entry: [ + { type: 'stopProcessors' }, + { + type: 'setupProcessors', + params: ({ context }) => ({ definition: context.definition }), + }, + ], + on: { + 'stream.received': { + target: '#ready', + actions: [{ type: 'storeDefinition', params: ({ event }) => event }], + reenter: true, + }, + }, + states: { + stream: { + initial: 'idle', + states: { + idle: { + on: { + 'stream.reset': { + guard: 'hasStagedChanges', + target: '#ready', + actions: [{ type: 'sendResetEventToSimulator' }], + reenter: true, + }, + 'stream.update': { + guard: 'canUpdateStream', + actions: [{ type: 'sendResetEventToSimulator' }], + target: 'updating', + }, + }, + }, + updating: { + invoke: { + id: 'upsertStreamActor', + src: 'upsertStream', + input: ({ context }) => ({ + definition: context.definition, + processors: context.processorsRefs + .map((proc) => proc.getSnapshot()) + .filter((proc) => proc.matches('configured')) + .map((proc) => proc.context.processor), + fields: undefined, // TODO: implementing in follow-up PR + }), + onDone: { + target: 'idle', + actions: [{ type: 'notifyUpsertStreamSuccess' }, { type: 'refreshDefinition' }], + }, + onError: { + target: 'idle', + actions: [{ type: 'notifyUpsertStreamFailure' }], + }, + }, + }, + }, + }, + enrichment: { + type: 'parallel', + states: { + displayingProcessors: { + on: { + 'processors.add': { + guard: '!hasPendingDraft', + actions: [{ type: 'addProcessor', params: ({ event }) => event }], + }, + 'processors.reorder': { + guard: 'hasMultipleProcessors', + actions: [{ type: 'reorderProcessors', params: ({ event }) => event }], + }, + 'processor.delete': { + actions: [ + { type: 'stopProcessor', params: ({ event }) => event }, + { type: 'deleteProcessor', params: ({ event }) => event }, + ], + }, + 'processor.stage': { + actions: [{ type: 'reassignProcessors' }], + }, + 'processor.update': { + actions: [{ type: 'reassignProcessors' }], + }, + }, + }, + displayingSimulation: { + entry: [{ type: 'spawnSimulationMachine' }], + initial: 'viewDataPreview', + on: { + 'processor.change': { + guard: { type: 'isStagedProcessor', params: ({ event }) => event }, + actions: [ + { type: 'forwardProcessorsEventToSimulator', params: ({ event }) => event }, + ], + }, + 'processor.*': { + actions: [ + { type: 'forwardProcessorsEventToSimulator', params: ({ event }) => event }, + ], + }, + 'processors.*': { + actions: [ + { type: 'forwardProcessorsEventToSimulator', params: ({ event }) => event }, + ], + }, + }, + states: { + viewDataPreview: { + on: { + 'simulation.viewDetectedFields': 'viewDetectedFields', + 'simulation.changePreviewDocsFilter': { + actions: [forwardTo('simulator')], + }, + }, + }, + viewDetectedFields: { + on: { + 'simulation.viewDataPreview': 'viewDataPreview', + }, + }, + }, + }, + }, + }, + }, + }, + resolvedRootStream: { + type: 'final', + }, + }, +}); + +export const createStreamEnrichmentMachineImplementations = ({ + refreshDefinition, + streamsRepositoryClient, + core, + data, +}: StreamEnrichmentServiceDependencies): MachineImplementationsFrom< + typeof streamEnrichmentMachine +> => ({ + actors: { + upsertStream: createUpsertStreamActor({ streamsRepositoryClient }), + processorMachine, + simulationMachine: simulationMachine.provide( + createSimulationMachineImplementations({ + data, + streamsRepositoryClient, + toasts: core.notifications.toasts, + }) + ), + }, + actions: { + refreshDefinition, + notifyUpsertStreamSuccess: createUpsertStreamSuccessNofitier({ + toasts: core.notifications.toasts, + }), + notifyUpsertStreamFailure: createUpsertStreamFailureNofitier({ + toasts: core.notifications.toasts, + }), + }, +}); + +function getStagedProcessors(context: StreamEnrichmentContext) { + return context.processorsRefs + .map((proc) => proc.getSnapshot()) + .filter((proc) => proc.context.isNew) + .map((proc) => proc.context.processor); +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts new file mode 100644 index 000000000000..c335d4e63820 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { CoreStart } from '@kbn/core/public'; +import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; +import { IngestStreamGetResponse } from '@kbn/streams-schema'; +import { DataPublicPluginStart } from '@kbn/data-plugin/public'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; +import { ProcessorActorRef, ProcessorToParentEvent } from '../processor_state_machine'; +import { PreviewDocsFilterOption, SimulationActorRef } from '../simulation_state_machine'; + +export interface StreamEnrichmentServiceDependencies { + refreshDefinition: () => void; + streamsRepositoryClient: StreamsRepositoryClient; + core: CoreStart; + data: DataPublicPluginStart; +} + +export interface StreamEnrichmentInput { + definition: IngestStreamGetResponse; +} + +export interface StreamEnrichmentContext { + definition: IngestStreamGetResponse; + initialProcessorsRefs: ProcessorActorRef[]; + processorsRefs: ProcessorActorRef[]; + simulatorRef?: SimulationActorRef; +} + +export type StreamEnrichmentEvent = + | ProcessorToParentEvent + | { type: 'stream.received'; definition: IngestStreamGetResponse } + | { type: 'stream.reset' } + | { type: 'stream.update' } + | { type: 'simulation.viewDataPreview' } + | { type: 'simulation.viewDetectedFields' } + | { type: 'simulation.changePreviewDocsFilter'; filter: PreviewDocsFilterOption } + | { type: 'processors.add'; processor: ProcessorDefinitionWithUIAttributes } + | { type: 'processors.reorder'; processorsRefs: ProcessorActorRef[] }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/upsert_stream_actor.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/upsert_stream_actor.ts new file mode 100644 index 000000000000..057ff7a596d2 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/upsert_stream_actor.ts @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + FieldDefinition, + IngestStreamGetResponse, + isWiredStreamGetResponse, +} from '@kbn/streams-schema'; +import { ErrorActorEvent, fromPromise } from 'xstate5'; +import { errors as esErrors } from '@elastic/elasticsearch'; +import { APIReturnType } from '@kbn/streams-plugin/public/api'; +import { IToasts } from '@kbn/core/public'; +import { i18n } from '@kbn/i18n'; +import { StreamEnrichmentServiceDependencies } from './types'; +import { processorConverter } from '../../utils'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; + +export type UpsertStreamResponse = APIReturnType<'PUT /api/streams/{name}/_ingest'>; + +export interface UpsertStreamInput { + definition: IngestStreamGetResponse; + processors: ProcessorDefinitionWithUIAttributes[]; + fields?: FieldDefinition; +} + +export function createUpsertStreamActor({ + streamsRepositoryClient, +}: Pick) { + return fromPromise(({ input, signal }) => { + return streamsRepositoryClient.fetch(`PUT /api/streams/{name}/_ingest`, { + signal, + params: { + path: { + name: input.definition.stream.name, + }, + body: isWiredStreamGetResponse(input.definition) + ? { + ingest: { + ...input.definition.stream.ingest, + processing: input.processors.map(processorConverter.toAPIDefinition), + ...(input.fields && { + wired: { ...input.definition.stream.ingest.wired, fields: input.fields }, + }), + }, + } + : { + ingest: { + ...input.definition.stream.ingest, + processing: input.processors.map(processorConverter.toAPIDefinition), + }, + }, + }, + }); + }); +} + +export const createUpsertStreamSuccessNofitier = + ({ toasts }: { toasts: IToasts }) => + () => { + toasts.addSuccess( + i18n.translate('xpack.streams.streamDetailView.managementTab.enrichment.saveChangesSuccess', { + defaultMessage: "Stream's processors updated", + }) + ); + }; + +export const createUpsertStreamFailureNofitier = + ({ toasts }: { toasts: IToasts }) => + (params: { event: unknown }) => { + const event = params.event as ErrorActorEvent; + toasts.addError(new Error(event.error.body.message), { + title: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.saveChangesError', + { defaultMessage: "An issue occurred saving processors' changes." } + ), + toastMessage: event.error.body.message, + }); + }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/use_stream_enrichment.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/use_stream_enrichment.tsx new file mode 100644 index 000000000000..45452d251a5b --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/use_stream_enrichment.tsx @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import React, { useEffect, useMemo } from 'react'; +import { createActorContext, useSelector } from '@xstate5/react'; +import { createConsoleInspector } from '@kbn/xstate-utils'; +import { + streamEnrichmentMachine, + createStreamEnrichmentMachineImplementations, +} from './stream_enrichment_state_machine'; +import { StreamEnrichmentInput, StreamEnrichmentServiceDependencies } from './types'; +import { ProcessorDefinitionWithUIAttributes } from '../../types'; +import { ProcessorActorRef } from '../processor_state_machine'; +import { PreviewDocsFilterOption, SimulationActorSnapshot } from '../simulation_state_machine'; + +const consoleInspector = createConsoleInspector(); + +const StreamEnrichmentContext = createActorContext(streamEnrichmentMachine); + +export const useStreamsEnrichmentSelector = StreamEnrichmentContext.useSelector; + +export type StreamEnrichmentEvents = ReturnType; + +export const useStreamEnrichmentEvents = () => { + const service = StreamEnrichmentContext.useActorRef(); + + return useMemo( + () => ({ + addProcessor: (processor: ProcessorDefinitionWithUIAttributes) => { + service.send({ type: 'processors.add', processor }); + }, + reorderProcessors: (processorsRefs: ProcessorActorRef[]) => { + service.send({ type: 'processors.reorder', processorsRefs }); + }, + resetChanges: () => { + service.send({ type: 'stream.reset' }); + }, + saveChanges: () => { + service.send({ type: 'stream.update' }); + }, + viewSimulationPreviewData: () => { + service.send({ type: 'simulation.viewDataPreview' }); + }, + viewSimulationDetectedFields: () => { + service.send({ type: 'simulation.viewDetectedFields' }); + }, + changePreviewDocsFilter: (filter: PreviewDocsFilterOption) => { + service.send({ type: 'simulation.changePreviewDocsFilter', filter }); + }, + }), + [service] + ); +}; + +export const StreamEnrichmentContextProvider = ({ + children, + definition, + ...deps +}: React.PropsWithChildren) => { + return ( + + {children} + + ); +}; + +const ListenForDefinitionChanges = ({ + children, + definition, +}: React.PropsWithChildren) => { + const service = StreamEnrichmentContext.useActorRef(); + + useEffect(() => { + service.send({ type: 'stream.received', definition }); + }, [definition, service]); + + return children; +}; + +export const useSimulatorRef = () => { + return useStreamsEnrichmentSelector((state) => state.context.simulatorRef); +}; + +export const useSimulatorSelector = (selector: (snapshot: SimulationActorSnapshot) => T): T => { + const simulationRef = useSimulatorRef(); + + if (!simulationRef) { + throw new Error('useSimulatorSelector must be used within a StreamEnrichmentContextProvider'); + } + + return useSelector(simulationRef, selector); +}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts index 471dba5495d3..deedaa77e0e4 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts @@ -16,7 +16,6 @@ import { export type WithUIAttributes = T & { id: string; type: ProcessorTypeOf; - status: 'draft' | 'saved' | 'updated'; }; export type ProcessorDefinitionWithUIAttributes = WithUIAttributes; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/utils.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/utils.ts index e4817fb9d9f6..6ce2ce2c0a0f 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/utils.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/utils.ts @@ -48,11 +48,13 @@ const defaultProcessorFormStateByType: Record grok: defaultGrokProcessorFormState, }; -export const getDefaultFormState = ( - type: ProcessorType, +export const getDefaultFormStateByType = (type: ProcessorType) => + defaultProcessorFormStateByType[type]; + +export const getFormStateFrom = ( processor?: ProcessorDefinitionWithUIAttributes ): ProcessorFormState => { - if (!processor) return defaultProcessorFormStateByType[type]; + if (!processor) return defaultGrokProcessorFormState; if (isGrokProcessor(processor)) { const { grok } = processor; @@ -73,7 +75,7 @@ export const getDefaultFormState = ( }); } - throw new Error(`Default state not found for unsupported processor type: ${type}`); + throw new Error(`Form state for processor type "${processor.type}" is not implemented.`); }; export const convertFormStateToProcessor = (formState: ProcessorFormState): ProcessorDefinition => { @@ -124,25 +126,22 @@ export const isDissectProcessor = createProcessorGuardByType('dissect'); const createId = htmlIdGenerator(); const toUIDefinition = ( - processor: TProcessorDefinition, - uiAttributes: Partial, 'status'>> = {} + processor: TProcessorDefinition ): ProcessorDefinitionWithUIAttributes => ({ id: createId(), - status: 'saved', type: getProcessorType(processor), - ...uiAttributes, ...processor, }); const toAPIDefinition = (processor: ProcessorDefinitionWithUIAttributes): ProcessorDefinition => { - const { id, status, type, ...processorConfig } = processor; + const { id, type, ...processorConfig } = processor; return processorConfig; }; const toSimulateDefinition = ( processor: ProcessorDefinitionWithUIAttributes ): ProcessorDefinitionWithId => { - const { status, type, ...processorConfig } = processor; + const { type, ...processorConfig } = processor; return processorConfig; }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/index.tsx index 48926e726060..42336d0955cb 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/index.tsx @@ -12,11 +12,9 @@ import { ClassicStreamDetailManagement } from './classic'; export function StreamDetailManagement({ definition, refreshDefinition, - isLoadingDefinition, }: { definition?: IngestStreamGetResponse; refreshDefinition: () => void; - isLoadingDefinition: boolean; }) { if (!definition) { return null; @@ -24,11 +22,7 @@ export function StreamDetailManagement({ if (isWiredStreamGetResponse(definition)) { return ( - + ); } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/wired.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/wired.tsx index 5613a6e697d4..8e9046efa4e5 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/wired.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/wired.tsx @@ -24,11 +24,9 @@ function isValidManagementSubTab(value: string): value is ManagementSubTabs { export function WiredStreamDetailManagement({ definition, refreshDefinition, - isLoadingDefinition, }: { definition?: WiredStreamGetResponse; refreshDefinition: () => void; - isLoadingDefinition: boolean; }) { const { path: { key, subtab }, @@ -53,11 +51,7 @@ export function WiredStreamDetailManagement({ }, schemaEditor: { content: ( - + ), label: i18n.translate('xpack.streams.streamDetailView.schemaEditorTab', { defaultMessage: 'Schema editor', diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx index 50c88f4ae018..8a13a0fde8a1 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx @@ -6,13 +6,13 @@ */ import React from 'react'; import { WiredStreamGetResponse, isRootStreamDefinition } from '@kbn/streams-schema'; +import { useStreamDetail } from '../../../hooks/use_stream_detail'; import { SchemaEditor } from '../schema_editor'; import { useSchemaFields } from '../schema_editor/hooks/use_schema_fields'; interface SchemaEditorProps { definition?: WiredStreamGetResponse; refreshDefinition: () => void; - isLoadingDefinition: boolean; } export function StreamDetailSchemaEditor(props: SchemaEditorProps) { @@ -20,11 +20,9 @@ export function StreamDetailSchemaEditor(props: SchemaEditorProps) { return ; } -const Content = ({ - definition, - refreshDefinition, - isLoadingDefinition, -}: Required) => { +const Content = ({ definition, refreshDefinition }: Required) => { + const { loading } = useStreamDetail(); + const { fields, isLoadingUnmappedFields, refreshFields, unmapField, updateField } = useSchemaFields({ definition, @@ -34,7 +32,7 @@ const Content = ({ return ( { - return streamsRepositoryClient - .fetch('GET /api/streams/{name}', { - signal, - params: { - path: { - name: key, - }, - }, - }) - .then((response) => { - if (isWiredStreamGetResponse(response)) { - return { - dashboards: response.dashboards, - inherited_fields: response.inherited_fields, - elasticsearch_assets: [], - effective_lifecycle: response.effective_lifecycle, - name: key, - stream: { - ...response.stream, - }, - }; - } - - if (isUnwiredStreamGetResponse(response)) { - return { - dashboards: response.dashboards, - elasticsearch_assets: response.elasticsearch_assets, - inherited_fields: {}, - effective_lifecycle: response.effective_lifecycle, - name: key, - data_stream_exists: response.data_stream_exists, - stream: { - ...response.stream, - }, - }; - } - throw new Error('Stream detail only supports IngestStreams.'); - }); - }, - [streamsRepositoryClient, key] + return ( + + + ); +} + +export function StreamDetailViewContent({ name, tab }: { name: string; tab: string }) { + const { definition, refresh } = useStreamDetail(); const entity = { - id: key, - displayName: key, + id: name, + displayName: name, }; const tabs: EntityViewTab[] = [ { name: 'overview', - content: , + content: , label: i18n.translate('xpack.streams.streamDetailView.overviewTab', { defaultMessage: 'Overview', }), }, { name: 'dashboards', - content: , + content: , label: i18n.translate('xpack.streams.streamDetailView.dashboardsTab', { defaultMessage: 'Dashboards', }), }, { name: 'management', - content: ( - - ), + content: , label: i18n.translate('xpack.streams.streamDetailView.managementTab', { defaultMessage: 'Management', }), @@ -118,7 +66,7 @@ export function StreamDetailView() { ); diff --git a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_discard_confirm.ts b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_discard_confirm.ts index 21ddf1e5ee3a..d33148fbf6c5 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_discard_confirm.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_discard_confirm.ts @@ -9,6 +9,10 @@ import { i18n } from '@kbn/i18n'; import { OverlayModalConfirmOptions } from '@kbn/core/public'; import { useKibana } from './use_kibana'; +export interface DiscardPromptOptions extends OverlayModalConfirmOptions { + message: string; +} + const defaultMessage = i18n.translate('xpack.streams.cancelModal.message', { defaultMessage: 'Are you sure you want to discard your changes?', }); diff --git a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_stream_detail.tsx b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_stream_detail.tsx new file mode 100644 index 000000000000..ba251cacfbae --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_stream_detail.tsx @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import React from 'react'; +import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; +import { + IngestStreamGetResponse, + isWiredStreamGetResponse, + isUnwiredStreamGetResponse, +} from '@kbn/streams-schema'; +import { useStreamsAppFetch } from './use_streams_app_fetch'; + +export interface StreamDetailContextProviderProps { + name: string; + streamsRepositoryClient: StreamsRepositoryClient; +} + +export interface StreamDetailContextValue { + definition?: IngestStreamGetResponse; + loading: boolean; + refresh: () => void; +} + +const StreamDetailContext = React.createContext(undefined); + +export function StreamDetailContextProvider({ + name, + streamsRepositoryClient, + children, +}: React.PropsWithChildren) { + const { + value: definition, + loading, + refresh, + } = useStreamsAppFetch( + async ({ signal }) => { + return streamsRepositoryClient + .fetch('GET /api/streams/{name}', { + signal, + params: { + path: { + name, + }, + }, + }) + .then((response) => { + if (isWiredStreamGetResponse(response) || isUnwiredStreamGetResponse(response)) { + return response; + } + + throw new Error('Stream detail only supports IngestStreams.'); + }); + }, + [streamsRepositoryClient, name] + ); + + const context = React.useMemo( + () => ({ definition, loading, refresh }), + [definition, loading, refresh] + ); + + return {children}; +} + +export function useStreamDetail() { + const ctx = React.useContext(StreamDetailContext); + if (!ctx) { + throw new Error('useStreamDetail must be used within a StreamDetailContextProvider'); + } + return ctx; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/date_range_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/date_range_state_machine.ts new file mode 100644 index 000000000000..51bb6b5d4f5e --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/date_range_state_machine.ts @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + MachineImplementationsFrom, + assertEvent, + fromObservable, + enqueueActions, + setup, + assign, + ActorRefFrom, +} from 'xstate5'; +import type { DataPublicPluginStart, TimefilterContract } from '@kbn/data-plugin/public'; +import { getPlaceholderFor } from '@kbn/xstate-utils'; +import { DateRangeContext, DateRangeEvent, DateRangeInput } from './types'; + +export type DateRangeActorRef = ActorRefFrom; + +export const dateRangeMachine = setup({ + types: { + context: {} as DateRangeContext, + events: {} as DateRangeEvent, + input: {} as DateRangeInput, + }, + actors: { + subscribeTimeUpdates: getPlaceholderFor(createTimeUpdatesActor), + }, + actions: { + setTimeUpdates: () => { + throw new Error('Not implemented'); + }, + storeTimeUpdates: () => { + throw new Error('Not implemented'); + }, + notifyDateRangeUpdate: enqueueActions(({ enqueue, context }) => { + if (context.parentRef) { + enqueue.sendTo(context.parentRef, { type: 'dateRange.update' }); + } + }), + }, +}).createMachine({ + /** @xstate-layout N4IgpgJg5mDOIC5QQIYBcwCUUDsYGJUNs8wA6AVwAciwBtABgF1FQqB7WASzS-Z1YgAHogBMANgCsZACwBmAOySANCACeiGQEYGZBQE5R+8QA4lAX3OraJAjdwwyAJzAAzF7AAWjFkhAduXn5BEQQAWjkZMn1jBn1FFXVEOX0FMjktcRkGcVFJSysQHHYIOEF7UkEAnj4BP1CtBVUNBFE5cTITSX15cQY5SS1JXPyCoA */ + id: 'dateRange', + context: ({ input }) => ({ + parentRef: input.parentRef, + timeRange: { + from: '', + to: '', + }, + absoluteTimeRange: { + start: 0, + end: 0, + }, + }), + entry: 'storeTimeUpdates', + invoke: { + id: 'dateRangeSubscriptionActor', + src: 'subscribeTimeUpdates', + onSnapshot: { + actions: [{ type: 'storeTimeUpdates' }, { type: 'notifyDateRangeUpdate' }], + }, + }, + on: { + 'dateRange.update': { + actions: [{ type: 'setTimeUpdates' }], + }, + 'dateRange.refresh': { + actions: [{ type: 'storeTimeUpdates' }, { type: 'notifyDateRangeUpdate' }], + }, + }, +}); + +export const createDateRangeMachineImplementations = ({ + data, +}: { + data: DataPublicPluginStart; +}): MachineImplementationsFrom => ({ + actors: { + subscribeTimeUpdates: createTimeUpdatesActor({ data }), + }, + actions: { + setTimeUpdates: ({ event }: { event: DateRangeEvent }) => { + assertEvent(event, 'dateRange.update'); + data.query.timefilter.timefilter.setTime(event.range); + }, + storeTimeUpdates: assign(() => getTimeContextFromService(data.query.timefilter.timefilter)), + }, +}); + +function createTimeUpdatesActor({ data }: { data: DataPublicPluginStart }) { + return fromObservable(() => data.query.timefilter.timefilter.getTimeUpdate$()); +} + +function getTimeContextFromService(timefilter: TimefilterContract) { + return { + timeRange: timefilter.getTime(), + absoluteTimeRange: { + start: new Date(timefilter.getAbsoluteTime().from).getTime(), + end: new Date(timefilter.getAbsoluteTime().to).getTime(), + }, + }; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/index.ts b/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/index.ts new file mode 100644 index 000000000000..b4fd3c09b9c3 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './date_range_state_machine'; +export * from './types'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/types.ts new file mode 100644 index 000000000000..3e7d2e07ea94 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/state_management/date_range_state_machine/types.ts @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { TimeRange } from '@kbn/es-query'; +import { ActorRef, Snapshot } from 'xstate5'; + +export interface DateRangeToParentEvent { + type: 'dateRange.update'; +} + +export type DateRangeParentActor = ActorRef, DateRangeToParentEvent>; + +export interface DateRangeContext { + parentRef?: DateRangeParentActor; + timeRange: TimeRange; + absoluteTimeRange: { + start?: number; + end?: number; + }; +} + +export interface DateRangeInput { + parentRef?: DateRangeParentActor; +} + +export type DateRangeEvent = + | { type: 'dateRange.refresh' } + | { type: 'dateRange.update'; range: TimeRange }; diff --git a/x-pack/platform/plugins/shared/streams_app/tsconfig.json b/x-pack/platform/plugins/shared/streams_app/tsconfig.json index f0c287d627d5..26f45235bf1b 100644 --- a/x-pack/platform/plugins/shared/streams_app/tsconfig.json +++ b/x-pack/platform/plugins/shared/streams_app/tsconfig.json @@ -58,5 +58,6 @@ "@kbn/traced-es-client", "@kbn/licensing-plugin", "@kbn/datemath", + "@kbn/xstate-utils", ] }