[Streams 🌊] Enrichment state management improvements (#211686)

## 📓 Summary

Closes https://github.com/elastic/streams-program/issues/102
Closes https://github.com/elastic/streams-program/issues/159

This re-work of the enrichment state management introduces XState as
state library to prepare scaling the enrichment part for more processors
and improve performance reducing unnecessary side effects.

## 🤓 Reviewers note

**There is a lot to digest on this PR, I'm open to any suggestion and I
left some notes around to guide the review.
This is also far from perfect as there is margin for other minor DX
improvements for consuming the state machines, but it will all come in
follow-up work after we resolve prioritized work such as integrating the
Schema Editor.**

Most of the changes on this PR are about the state management for the
stream enrichment, but it touches also some other areas to integrate the
event-based flow.

### Stream enrichment machine

This machine handles the complexity around updating/promoting/deleting
processors, and the available simulation states.
It's a root level machine that spawns and manages its children machine,
one for the **simulation** behaviour and one for each **processor**
instantiated.

<img width="950" alt="Screenshot 2025-02-27 at 17 10 03"
src="https://github.com/user-attachments/assets/756a6668-600d-4863-965e-4fc8ccd3a69f"
/>

### Simulation machine

This machine handle the flow around sampling -> simulating, handling
debouncing and determining once a simulation can run or should refresh.
It also spawn a child date range machine to react to the observable time
changes and reloads.
It also derives all the required table configurations (columns, filters,
documents) centralizing the parsing and reducing the cases for
re-computing, since we don't rely anymore on the previous live
processors copy.

<img width="1652" alt="Screenshot 2025-02-27 at 17 33 40"
src="https://github.com/user-attachments/assets/fc1fa089-acb2-4ec5-84bc-f27f81cc6abe"
/>

### Processor machine

A processor can be in different states depending on the changes, not
this tracks each of them independently and send events to the parent
machine to react accordingly. It provide a boost in performance compared
to the previous approach, as we don't have to rerender the whole page
tree since the changes are encapsulated in the machine state.

<img width="1204" alt="Screenshot 2025-03-04 at 11 34 01"
src="https://github.com/user-attachments/assets/0e6b8854-b7c9-4ee8-a721-f4222354d382"
/>

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Marco Antonio Ghiani 2025-03-07 12:34:30 +01:00 committed by GitHub
parent cfa2fb4aa8
commit d0c62a20e9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
44 changed files with 2046 additions and 1244 deletions

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export const getPlaceholderFor = <ImplementationFactory extends (...factoryArgs: any[]) => any>(
_implementationFactory: ImplementationFactory
): ReturnType<ImplementationFactory> =>
(() => {
throw new Error('Not implemented');
}) as ReturnType<ImplementationFactory>;

View file

@ -9,6 +9,7 @@
export * from './actions';
export * from './dev_tools';
export * from './get_placeholder_for';
export * from './console_inspector';
export * from './notification_channel';
export * from './types';

View file

@ -6,8 +6,8 @@
*/
import { MachineImplementationsFrom, assign, setup } from 'xstate5';
import { getPlaceholderFor } from '@kbn/xstate-utils';
import { LogCategory } from '../../types';
import { getPlaceholderFor } from '../../utils/xstate5_utils';
import { categorizeDocuments } from './categorize_documents';
import { countDocuments } from './count_documents';
import { CategorizeLogsServiceDependencies, LogCategorizationParams } from './types';

View file

@ -1,13 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const getPlaceholderFor = <ImplementationFactory extends (...factoryArgs: any[]) => any>(
implementationFactory: ImplementationFactory
): ReturnType<ImplementationFactory> =>
(() => {
throw new Error('Not implemented');
}) as ReturnType<ImplementationFactory>;

View file

@ -38,9 +38,18 @@ export function AssetImage({ type = 'welcome', ...props }: AssetImageProps) {
const [imageSrc, setImageSrc] = useState<string>();
useEffect(() => {
let isMounted = true;
const dynamicImageImport = colorMode === 'LIGHT' ? light() : dark();
dynamicImageImport.then((module) => setImageSrc(module.default));
dynamicImageImport.then((module) => {
if (isMounted) {
setImageSrc(module.default);
}
});
return () => {
isMounted = false;
};
}, [colorMode, dark, light]);
return imageSrc ? <EuiImage size="l" {...props} alt={alt} src={imageSrc} /> : null;

View file

@ -6,13 +6,12 @@
*/
import React from 'react';
import { EuiButton, EuiButtonEmpty, EuiFlexGroup, EuiToolTip, EuiToolTipProps } from '@elastic/eui';
import { EuiButton, EuiButtonEmpty, EuiFlexGroup } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { useDiscardConfirm } from '../../../hooks/use_discard_confirm';
interface ManagementBottomBarProps {
confirmButtonText?: string;
confirmTooltip?: Partial<EuiToolTipProps>;
disabled?: boolean;
isLoading?: boolean;
onCancel: () => void;
@ -21,7 +20,6 @@ interface ManagementBottomBarProps {
export function ManagementBottomBar({
confirmButtonText = defaultConfirmButtonText,
confirmTooltip,
disabled = false,
isLoading = false,
onCancel,
@ -34,31 +32,11 @@ export function ManagementBottomBar({
cancelButtonText: keepEditingLabel,
});
const confirmButtonContent = (
<EuiButton
data-test-subj="streamsAppManagementBottomBarButton"
disabled={disabled}
color="primary"
fill
size="s"
iconType="check"
onClick={onConfirm}
isLoading={isLoading}
>
{confirmButtonText}
</EuiButton>
);
const confirmButton = confirmTooltip ? (
<EuiToolTip {...confirmTooltip}>{confirmButtonContent}</EuiToolTip>
) : (
confirmButtonContent
);
return (
<EuiFlexGroup justifyContent="flexEnd" alignItems="center" responsive={false} gutterSize="s">
<EuiButtonEmpty
data-test-subj="streamsAppManagementBottomBarCancelChangesButton"
disabled={disabled}
color="text"
size="s"
iconType="cross"
@ -68,7 +46,18 @@ export function ManagementBottomBar({
defaultMessage: 'Cancel changes',
})}
</EuiButtonEmpty>
{confirmButton}
<EuiButton
data-test-subj="streamsAppManagementBottomBarButton"
disabled={disabled}
color="primary"
fill
size="s"
iconType="check"
onClick={onConfirm}
isLoading={isLoading}
>
{confirmButtonText}
</EuiButton>
</EuiFlexGroup>
);
}

View file

@ -1,221 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { useState, useMemo, useEffect, useRef, useCallback } from 'react';
import { i18n } from '@kbn/i18n';
import { useAbortController, useBoolean } from '@kbn/react-hooks';
import {
IngestStreamGetResponse,
isWiredStreamGetResponse,
FieldDefinition,
WiredStreamGetResponse,
IngestUpsertRequest,
ProcessorDefinition,
getProcessorType,
} from '@kbn/streams-schema';
import { DetectedField, ProcessorDefinitionWithUIAttributes } from '../types';
import { useKibana } from '../../../../hooks/use_kibana';
import { processorConverter } from '../utils';
export interface UseDefinitionReturn {
processors: ProcessorDefinitionWithUIAttributes[];
hasChanges: boolean;
isSavingChanges: boolean;
addProcessor: (newProcessor: ProcessorDefinition, newFields?: DetectedField[]) => void;
updateProcessor: (
id: string,
processor: ProcessorDefinition,
status?: ProcessorDefinitionWithUIAttributes['status']
) => void;
deleteProcessor: (id: string) => void;
reorderProcessors: (processors: ProcessorDefinitionWithUIAttributes[]) => void;
saveChanges: () => Promise<void>;
setProcessors: (processors: ProcessorDefinitionWithUIAttributes[]) => void;
resetChanges: () => void;
}
export const useDefinition = (
definition: IngestStreamGetResponse,
refreshDefinition: () => void
): UseDefinitionReturn => {
const { core, dependencies } = useKibana();
const { toasts } = core.notifications;
const { processing: existingProcessorDefinitions } = definition.stream.ingest;
const { streamsRepositoryClient } = dependencies.start.streams;
const abortController = useAbortController();
const [isSavingChanges, { on: startsSaving, off: endsSaving }] = useBoolean();
const [processors, setProcessors] = useState(() =>
createProcessorsList(existingProcessorDefinitions)
);
const initialProcessors = useRef(processors);
const [fields, setFields] = useState(() =>
isWiredStreamGetResponse(definition) ? definition.stream.ingest.wired.fields : {}
);
const nextProcessorDefinitions = useMemo(
() => processors.map(processorConverter.toAPIDefinition),
[processors]
);
useEffect(() => {
// Reset processors when definition refreshes
const resetProcessors = createProcessorsList(definition.stream.ingest.processing);
setProcessors(resetProcessors);
initialProcessors.current = resetProcessors;
}, [definition]);
const hasChanges = useMemo(
() =>
processors.length !== initialProcessors.current.length || // Processor count changed, a processor might be deleted
processors.some((proc) => proc.status === 'draft' || proc.status === 'updated') || // New or updated processors
hasOrderChanged(processors, initialProcessors.current), // Processor order changed
[processors]
);
const addProcessor = useCallback(
(newProcessor: ProcessorDefinition, newFields?: DetectedField[]) => {
setProcessors((prevProcs) =>
prevProcs.concat(processorConverter.toUIDefinition(newProcessor, { status: 'draft' }))
);
if (isWiredStreamGetResponse(definition) && newFields) {
setFields((currentFields) => mergeFields(definition, currentFields, newFields));
}
},
[definition]
);
const updateProcessor = useCallback(
(
id: string,
processorUpdate: ProcessorDefinition,
status: ProcessorDefinitionWithUIAttributes['status'] = 'updated'
) => {
setProcessors((prevProcs) =>
prevProcs.map((proc) =>
proc.id === id
? {
...processorUpdate,
id,
type: getProcessorType(processorUpdate),
status,
}
: proc
)
);
},
[]
);
const reorderProcessors = setProcessors;
const deleteProcessor = useCallback((id: string) => {
setProcessors((prevProcs) => prevProcs.filter((proc) => proc.id !== id));
}, []);
const resetChanges = () => {
const resetProcessors = createProcessorsList(existingProcessorDefinitions);
setProcessors(resetProcessors);
initialProcessors.current = resetProcessors;
setFields(isWiredStreamGetResponse(definition) ? definition.stream.ingest.wired.fields : {});
};
const saveChanges = async () => {
startsSaving();
try {
await streamsRepositoryClient.fetch(`PUT /api/streams/{name}/_ingest`, {
signal: abortController.signal,
params: {
path: {
name: definition.stream.name,
},
body: {
ingest: {
...definition.stream.ingest,
processing: nextProcessorDefinitions,
...(isWiredStreamGetResponse(definition) && {
wired: { ...definition.stream.ingest.wired, fields },
}),
},
} as IngestUpsertRequest,
},
});
toasts.addSuccess(
i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.saveChangesSuccess',
{ defaultMessage: "Stream's processors updated" }
)
);
refreshDefinition();
} catch (error) {
toasts.addError(new Error(error.body.message), {
title: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.saveChangesError',
{ defaultMessage: "An issue occurred saving processors' changes." }
),
toastMessage: error.body.message,
});
} finally {
endsSaving();
}
};
return {
// Values
processors,
// Actions
addProcessor,
updateProcessor,
deleteProcessor,
reorderProcessors,
resetChanges,
saveChanges,
setProcessors,
// Flags
hasChanges,
isSavingChanges,
};
};
const createProcessorsList = (processors: ProcessorDefinition[]) => {
return processors.map((processor) => processorConverter.toUIDefinition(processor));
};
const hasOrderChanged = (
processors: ProcessorDefinitionWithUIAttributes[],
initialProcessors: ProcessorDefinitionWithUIAttributes[]
) => {
return processors.some((processor, index) => processor.id !== initialProcessors[index].id);
};
const mergeFields = (
definition: WiredStreamGetResponse,
currentFields: FieldDefinition,
newFields: DetectedField[]
) => {
return {
...definition.stream.ingest.wired.fields,
...newFields.reduce((acc, field) => {
// Add only new fields and ignore unmapped ones
if (
!(field.name in currentFields) &&
!(field.name in definition.inherited_fields) &&
field.type !== undefined
) {
acc[field.name] = { type: field.type };
}
return acc;
}, {} as FieldDefinition),
};
};

View file

@ -1,309 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { useEffect, useMemo, useRef, useState } from 'react';
import { debounce, isEmpty, isEqual, uniq, uniqBy } from 'lodash';
import {
IngestStreamGetResponse,
getProcessorConfig,
UnaryOperator,
Condition,
processorDefinitionSchema,
isSchema,
FlattenRecord,
} from '@kbn/streams-schema';
import { IHttpFetchError, ResponseErrorBody } from '@kbn/core/public';
import { APIReturnType } from '@kbn/streams-plugin/public/api';
import { i18n } from '@kbn/i18n';
import { flattenObjectNestedLast } from '@kbn/object-utils';
import { useStreamsAppFetch } from '../../../../hooks/use_streams_app_fetch';
import { useKibana } from '../../../../hooks/use_kibana';
import { DetectedField, ProcessorDefinitionWithUIAttributes } from '../types';
import { processorConverter } from '../utils';
export type Simulation = APIReturnType<'POST /api/streams/{name}/processing/_simulate'>;
export type ProcessorMetrics =
Simulation['processors_metrics'][keyof Simulation['processors_metrics']];
export interface TableColumn {
name: string;
origin: 'processor' | 'detected';
}
export const docsFilterOptions = {
outcome_filter_all: {
id: 'outcome_filter_all',
label: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.all',
{ defaultMessage: 'All samples' }
),
},
outcome_filter_matched: {
id: 'outcome_filter_matched',
label: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.matched',
{ defaultMessage: 'Matched' }
),
},
outcome_filter_unmatched: {
id: 'outcome_filter_unmatched',
label: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.unmatched',
{ defaultMessage: 'Unmatched' }
),
},
} as const;
export type DocsFilterOption = keyof typeof docsFilterOptions;
export interface UseProcessingSimulatorProps {
definition: IngestStreamGetResponse;
processors: ProcessorDefinitionWithUIAttributes[];
}
export interface UseProcessingSimulatorReturn {
hasLiveChanges: boolean;
error?: IHttpFetchError<ResponseErrorBody>;
isLoading: boolean;
samples: FlattenRecord[];
filteredSamples: FlattenRecord[];
simulation?: Simulation | null;
tableColumns: TableColumn[];
refreshSamples: () => void;
watchProcessor: (
processor: ProcessorDefinitionWithUIAttributes | { id: string; deleteIfExists: true }
) => void;
refreshSimulation: () => void;
selectedDocsFilter: DocsFilterOption;
setSelectedDocsFilter: (filter: DocsFilterOption) => void;
}
export const useProcessingSimulator = ({
definition,
processors,
}: UseProcessingSimulatorProps): UseProcessingSimulatorReturn => {
const { dependencies } = useKibana();
const {
data,
streams: { streamsRepositoryClient },
} = dependencies.start;
const {
absoluteTimeRange: { start, end },
} = data.query.timefilter.timefilter.useTimefilter();
const draftProcessors = useMemo(
() => processors.filter((processor) => processor.status === 'draft'),
[processors]
);
const [liveDraftProcessors, setLiveDraftProcessors] = useState(draftProcessors);
useEffect(() => {
setLiveDraftProcessors((prevLiveProcessors) => {
const inProgressDraft = prevLiveProcessors.find((proc) => proc.id === 'draft');
return inProgressDraft ? [...draftProcessors, inProgressDraft] : draftProcessors;
});
}, [draftProcessors]);
const watchProcessor = useMemo(
() =>
debounce(
(processor: ProcessorDefinitionWithUIAttributes | { id: string; deleteIfExists: true }) => {
if ('deleteIfExists' in processor) {
return setLiveDraftProcessors((prevLiveDraftProcessors) =>
prevLiveDraftProcessors.filter((proc) => proc.id !== processor.id)
);
}
if (processor.status === 'draft') {
setLiveDraftProcessors((prevLiveDraftProcessors) => {
const newLiveDraftProcessors = prevLiveDraftProcessors.slice();
const existingIndex = prevLiveDraftProcessors.findIndex(
(proc) => proc.id === processor.id
);
if (existingIndex !== -1) {
newLiveDraftProcessors[existingIndex] = processor;
} else {
newLiveDraftProcessors.push(processor);
}
return newLiveDraftProcessors;
});
}
},
800
),
[]
);
const memoizedSamplingCondition = useRef<Condition | undefined>();
const samplingCondition = useMemo(() => {
const newSamplingCondition = composeSamplingCondition(liveDraftProcessors);
if (isEqual(newSamplingCondition, memoizedSamplingCondition.current)) {
return memoizedSamplingCondition.current;
}
memoizedSamplingCondition.current = newSamplingCondition;
return newSamplingCondition;
}, [liveDraftProcessors]);
const {
loading: isLoadingSamples,
value: sampleDocs,
refresh: refreshSamples,
} = useStreamsAppFetch(
async ({ signal }) => {
if (!definition) {
return [];
}
const samplesBody = await streamsRepositoryClient.fetch('POST /api/streams/{name}/_sample', {
signal,
params: {
path: { name: definition.stream.name },
body: {
if: samplingCondition,
start: start?.valueOf(),
end: end?.valueOf(),
size: 100,
},
},
});
return samplesBody.documents.map((doc) => flattenObjectNestedLast(doc)) as FlattenRecord[];
},
[definition, streamsRepositoryClient, start, end, samplingCondition],
{ disableToastOnError: true }
);
const {
loading: isLoadingSimulation,
value: simulation,
error: simulationError,
refresh: refreshSimulation,
} = useStreamsAppFetch(
({ signal }): Promise<Simulation> => {
if (!definition || isEmpty<FlattenRecord[]>(sampleDocs) || isEmpty(liveDraftProcessors)) {
// This is a hack to avoid losing the previous value of the simulation once the conditions are not met. The state management refactor will fix this.
return Promise.resolve(simulation!);
}
const processing = liveDraftProcessors.map(processorConverter.toAPIDefinition);
const hasValidProcessors = processing.every((processor) =>
isSchema(processorDefinitionSchema, processor)
);
// Each processor should meet the minimum schema requirements to run the simulation
if (!hasValidProcessors) {
// This is a hack to avoid losing the previous value of the simulation once the conditions are not met. The state management refactor will fix this.
return Promise.resolve(simulation!);
}
return streamsRepositoryClient.fetch('POST /api/streams/{name}/processing/_simulate', {
signal,
params: {
path: { name: definition.stream.name },
body: {
documents: sampleDocs,
processing: liveDraftProcessors.map(processorConverter.toSimulateDefinition),
},
},
});
},
[definition, sampleDocs, liveDraftProcessors, streamsRepositoryClient],
{ disableToastOnError: true }
);
const tableColumns = useMemo(() => {
// If there is an error, we only want the source fields
const detectedFields = simulationError ? [] : simulation?.detected_fields ?? [];
return getTableColumns(liveDraftProcessors, detectedFields);
}, [liveDraftProcessors, simulation, simulationError]);
const hasLiveChanges = !isEmpty(liveDraftProcessors);
const [selectedDocsFilter, setSelectedDocsFilter] =
useState<DocsFilterOption>('outcome_filter_all');
const filteredSamples = useMemo(() => {
if (!simulation?.documents) {
return sampleDocs?.map((doc) => flattenObjectNestedLast(doc)) as FlattenRecord[];
}
const filterDocuments = (filter: DocsFilterOption) => {
switch (filter) {
case 'outcome_filter_matched':
return simulation.documents.filter((doc) => doc.status === 'parsed');
case 'outcome_filter_unmatched':
return simulation.documents.filter((doc) => doc.status !== 'parsed');
case 'outcome_filter_all':
default:
return simulation.documents;
}
};
return filterDocuments(selectedDocsFilter).map((doc) => doc.value);
}, [sampleDocs, simulation?.documents, selectedDocsFilter]);
return {
hasLiveChanges,
isLoading: isLoadingSamples || isLoadingSimulation,
error: simulationError as IHttpFetchError<ResponseErrorBody> | undefined,
refreshSamples,
simulation,
samples: sampleDocs ?? [],
filteredSamples: filteredSamples ?? [],
tableColumns,
watchProcessor,
refreshSimulation,
selectedDocsFilter,
setSelectedDocsFilter,
};
};
const composeSamplingCondition = (
processors: ProcessorDefinitionWithUIAttributes[]
): Condition | undefined => {
if (isEmpty(processors)) {
return undefined;
}
const uniqueFields = uniq(getSourceFields(processors));
const conditions = uniqueFields.map((field) => ({
field,
operator: 'exists' as UnaryOperator,
}));
return { or: conditions };
};
const getSourceFields = (processors: ProcessorDefinitionWithUIAttributes[]): string[] => {
return processors.map((processor) => getProcessorConfig(processor).field);
};
const getTableColumns = (
processors: ProcessorDefinitionWithUIAttributes[],
fields: DetectedField[]
) => {
const uniqueProcessorsFields = getSourceFields(processors).map((name) => ({
name,
origin: 'processor',
}));
const uniqueDetectedFields = fields.map((field) => ({
name: field.name,
origin: 'detected',
}));
return uniqBy([...uniqueProcessorsFields, ...uniqueDetectedFields], 'name') as TableColumn[];
};

View file

@ -18,23 +18,22 @@ import {
useEuiTheme,
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { IngestStreamGetResponse, isRootStreamDefinition } from '@kbn/streams-schema';
import { IngestStreamGetResponse } from '@kbn/streams-schema';
import { useUnsavedChangesPrompt } from '@kbn/unsaved-changes-prompt';
import { css } from '@emotion/react';
import { isEmpty } from 'lodash';
import { UseDefinitionReturn, useDefinition } from './hooks/use_definition';
import { useKibana } from '../../../hooks/use_kibana';
import { RootStreamEmptyPrompt } from './root_stream_empty_prompt';
import { DraggableProcessorListItem } from './processors_list';
import { SortableList } from './sortable_list';
import { ManagementBottomBar } from '../management_bottom_bar';
import { AddProcessorPanel } from './processors';
import { SimulationPlayground } from './simulation_playground';
import {
UseProcessingSimulatorReturn,
useProcessingSimulator,
} from './hooks/use_processing_simulator';
import { SimulatorContextProvider } from './simulator_context';
StreamEnrichmentContextProvider,
useSimulatorSelector,
useStreamEnrichmentEvents,
useStreamsEnrichmentSelector,
} from './state_management/stream_enrichment_state_machine';
const MemoSimulationPlayground = React.memo(SimulationPlayground);
@ -43,238 +42,171 @@ interface StreamDetailEnrichmentContentProps {
refreshDefinition: () => void;
}
export function StreamDetailEnrichmentContent({
definition,
refreshDefinition,
}: StreamDetailEnrichmentContentProps) {
export function StreamDetailEnrichmentContent(props: StreamDetailEnrichmentContentProps) {
const { core, dependencies } = useKibana();
const {
data,
streams: { streamsRepositoryClient },
} = dependencies.start;
return (
<StreamEnrichmentContextProvider
definition={props.definition}
refreshDefinition={props.refreshDefinition}
core={core}
data={data}
streamsRepositoryClient={streamsRepositoryClient}
>
<StreamDetailEnrichmentContentImpl />
</StreamEnrichmentContextProvider>
);
}
export function StreamDetailEnrichmentContentImpl() {
const { appParams, core } = useKibana();
const {
processors,
addProcessor,
updateProcessor,
deleteProcessor,
resetChanges,
saveChanges,
reorderProcessors,
hasChanges,
isSavingChanges,
} = useDefinition(definition, refreshDefinition);
const { resetChanges, saveChanges } = useStreamEnrichmentEvents();
const processingSimulator = useProcessingSimulator({ definition, processors });
const {
hasLiveChanges,
isLoading,
refreshSamples,
filteredSamples,
simulation,
tableColumns,
watchProcessor,
selectedDocsFilter,
setSelectedDocsFilter,
} = processingSimulator;
const hasChanges = useStreamsEnrichmentSelector((state) => state.can({ type: 'stream.update' }));
const isSavingChanges = useStreamsEnrichmentSelector((state) =>
state.matches({ ready: { stream: 'updating' } })
);
useUnsavedChangesPrompt({
hasUnsavedChanges: hasChanges || hasLiveChanges,
hasUnsavedChanges: hasChanges,
history: appParams.history,
http: core.http,
navigateToUrl: core.application.navigateToUrl,
openConfirm: core.overlays.openConfirm,
});
if (isRootStreamDefinition(definition.stream)) {
return <RootStreamEmptyPrompt />;
}
const isNonAdditiveSimulation = simulation && simulation.is_non_additive_simulation;
const isSubmitDisabled = Boolean(!hasChanges || isNonAdditiveSimulation);
const confirmTooltip = isNonAdditiveSimulation
? {
title: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.nonAdditiveProcessorsTooltip.title',
{ defaultMessage: 'Non additive simulation detected' }
),
content: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.nonAdditiveProcessorsTooltip.content',
{
defaultMessage:
'We currently prevent adding processors that change/remove existing data. Please update your processor configurations to continue.',
}
),
}
: undefined;
return (
<SimulatorContextProvider processingSimulator={processingSimulator} definition={definition}>
<EuiSplitPanel.Outer grow hasBorder hasShadow={false}>
<EuiSplitPanel.Inner
paddingSize="none"
css={css`
display: flex;
overflow: hidden auto;
`}
>
<EuiResizableContainer>
{(EuiResizablePanel, EuiResizableButton) => (
<>
<EuiResizablePanel
initialSize={40}
minSize="480px"
tabIndex={0}
paddingSize="none"
css={verticalFlexCss}
>
<ProcessorsEditor
definition={definition}
processors={processors}
onUpdateProcessor={updateProcessor}
onDeleteProcessor={deleteProcessor}
onWatchProcessor={watchProcessor}
onAddProcessor={addProcessor}
onReorderProcessor={reorderProcessors}
simulation={simulation}
/>
</EuiResizablePanel>
<EuiResizableButton indicator="border" accountForScrollbars="both" />
<EuiResizablePanel
initialSize={60}
minSize="300px"
tabIndex={0}
paddingSize="s"
css={verticalFlexCss}
>
<MemoSimulationPlayground
definition={definition}
columns={tableColumns}
simulation={simulation}
filteredSamples={filteredSamples}
onRefreshSamples={refreshSamples}
isLoading={isLoading}
selectedDocsFilter={selectedDocsFilter}
setSelectedDocsFilter={setSelectedDocsFilter}
/>
</EuiResizablePanel>
</>
)}
</EuiResizableContainer>
</EuiSplitPanel.Inner>
<EuiSplitPanel.Inner grow={false} color="subdued">
<ManagementBottomBar
confirmTooltip={confirmTooltip}
onCancel={resetChanges}
onConfirm={saveChanges}
isLoading={isSavingChanges}
disabled={isSubmitDisabled}
/>
</EuiSplitPanel.Inner>
</EuiSplitPanel.Outer>
</SimulatorContextProvider>
<EuiSplitPanel.Outer grow hasBorder hasShadow={false}>
<EuiSplitPanel.Inner
paddingSize="none"
css={css`
display: flex;
overflow: hidden auto;
`}
>
<EuiResizableContainer>
{(EuiResizablePanel, EuiResizableButton) => (
<>
<EuiResizablePanel
initialSize={40}
minSize="480px"
tabIndex={0}
paddingSize="none"
css={verticalFlexCss}
>
<ProcessorsEditor />
</EuiResizablePanel>
<EuiResizableButton indicator="border" accountForScrollbars="both" />
<EuiResizablePanel
initialSize={60}
minSize="300px"
tabIndex={0}
paddingSize="s"
css={verticalFlexCss}
>
<MemoSimulationPlayground />
</EuiResizablePanel>
</>
)}
</EuiResizableContainer>
</EuiSplitPanel.Inner>
<EuiSplitPanel.Inner grow={false} color="subdued">
<ManagementBottomBar
onCancel={resetChanges}
onConfirm={saveChanges}
isLoading={isSavingChanges}
disabled={!hasChanges}
/>
</EuiSplitPanel.Inner>
</EuiSplitPanel.Outer>
);
}
interface ProcessorsEditorProps {
definition: IngestStreamGetResponse;
processors: UseDefinitionReturn['processors'];
onAddProcessor: UseDefinitionReturn['addProcessor'];
onDeleteProcessor: UseDefinitionReturn['deleteProcessor'];
onReorderProcessor: UseDefinitionReturn['reorderProcessors'];
onUpdateProcessor: UseDefinitionReturn['updateProcessor'];
onWatchProcessor: UseProcessingSimulatorReturn['watchProcessor'];
simulation: UseProcessingSimulatorReturn['simulation'];
}
const ProcessorsEditor = React.memo(() => {
const { euiTheme } = useEuiTheme();
const ProcessorsEditor = React.memo(
({
definition,
processors,
onAddProcessor,
onDeleteProcessor,
onReorderProcessor,
onUpdateProcessor,
onWatchProcessor,
simulation,
}: ProcessorsEditorProps) => {
const { euiTheme } = useEuiTheme();
const { reorderProcessors } = useStreamEnrichmentEvents();
const handlerItemDrag: DragDropContextProps['onDragEnd'] = ({ source, destination }) => {
if (source && destination) {
const items = euiDragDropReorder(processors, source.index, destination.index);
onReorderProcessor(items);
}
};
const processorsRefs = useStreamsEnrichmentSelector((state) =>
state.context.processorsRefs.filter((processorRef) =>
processorRef.getSnapshot().matches('configured')
)
);
const hasProcessors = !isEmpty(processors);
const simulationSnapshot = useSimulatorSelector((s) => s);
return (
<>
<EuiPanel
paddingSize="m"
hasShadow={false}
borderRadius="none"
grow={false}
css={css`
z-index: ${euiTheme.levels.maskBelowHeader};
${useEuiShadow('xs')};
`}
>
<EuiTitle size="xxs">
<h2>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.headingTitle',
{
defaultMessage: 'Processors for field extraction',
}
)}
</h2>
</EuiTitle>
<EuiText component="p" size="xs">
const handlerItemDrag: DragDropContextProps['onDragEnd'] = ({ source, destination }) => {
if (source && destination) {
const items = euiDragDropReorder(processorsRefs, source.index, destination.index);
reorderProcessors(items);
}
};
const hasProcessors = !isEmpty(processorsRefs);
return (
<>
<EuiPanel
paddingSize="m"
hasShadow={false}
borderRadius="none"
grow={false}
css={css`
z-index: ${euiTheme.levels.maskBelowHeader};
${useEuiShadow('xs')};
`}
>
<EuiTitle size="xxs">
<h2>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.headingSubtitle',
'xpack.streams.streamDetailView.managementTab.enrichment.headingTitle',
{
defaultMessage:
'Drag and drop existing processors to update their execution order.',
defaultMessage: 'Processors for field extraction',
}
)}
</EuiText>
</EuiPanel>
<EuiPanel
paddingSize="m"
hasShadow={false}
borderRadius="none"
css={css`
overflow: auto;
`}
>
{hasProcessors && (
<SortableList onDragItem={handlerItemDrag}>
{processors.map((processor, idx) => (
<DraggableProcessorListItem
key={processor.id}
idx={idx}
definition={definition}
processor={processor}
onDeleteProcessor={onDeleteProcessor}
onUpdateProcessor={onUpdateProcessor}
onWatchProcessor={onWatchProcessor}
processorMetrics={simulation?.processors_metrics[processor.id]}
/>
))}
</SortableList>
</h2>
</EuiTitle>
<EuiText component="p" size="xs">
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.headingSubtitle',
{
defaultMessage: 'Drag and drop existing processors to update their execution order.',
}
)}
<AddProcessorPanel
key={processors.length} // Used to force reset the inner form state once a new processor is added
definition={definition}
onAddProcessor={onAddProcessor}
onWatchProcessor={onWatchProcessor}
processorMetrics={simulation?.processors_metrics.draft}
/>
</EuiPanel>
</>
);
}
);
</EuiText>
</EuiPanel>
<EuiPanel
paddingSize="m"
hasShadow={false}
borderRadius="none"
css={css`
overflow: auto;
`}
>
{hasProcessors && (
<SortableList onDragItem={handlerItemDrag}>
{processorsRefs.map((processorRef, idx) => (
<DraggableProcessorListItem
key={processorRef.id}
idx={idx}
processorRef={processorRef}
processorMetrics={
simulationSnapshot.context.simulation?.processors_metrics[processorRef.id]
}
/>
))}
</SortableList>
)}
<AddProcessorPanel />
</EuiPanel>
</>
);
});
const verticalFlexCss = css`
display: flex;

View file

@ -16,121 +16,74 @@ import {
EuiProgress,
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { TimeRange } from '@kbn/es-query';
import { isEmpty } from 'lodash';
import { SampleDocument } from '@kbn/streams-schema';
import { useKibana } from '../../../hooks/use_kibana';
import { useSelector } from '@xstate5/react';
import { isEmpty, isEqual } from 'lodash';
import { StreamsAppSearchBar, StreamsAppSearchBarProps } from '../../streams_app_search_bar';
import { PreviewTable } from '../preview_table';
import {
DocsFilterOption,
TableColumn,
UseProcessingSimulatorReturn,
docsFilterOptions,
} from './hooks/use_processing_simulator';
import { AssetImage } from '../../asset_image';
import {
useSimulatorSelector,
useStreamEnrichmentEvents,
} from './state_management/stream_enrichment_state_machine';
import {
PreviewDocsFilterOption,
getTableColumns,
previewDocsFilterOptions,
} from './state_management/simulation_state_machine';
interface ProcessorOutcomePreviewProps {
columns: TableColumn[];
isLoading: UseProcessingSimulatorReturn['isLoading'];
simulation: UseProcessingSimulatorReturn['simulation'];
filteredSamples: UseProcessingSimulatorReturn['samples'];
onRefreshSamples: UseProcessingSimulatorReturn['refreshSamples'];
selectedDocsFilter: UseProcessingSimulatorReturn['selectedDocsFilter'];
setSelectedDocsFilter: UseProcessingSimulatorReturn['setSelectedDocsFilter'];
}
export const ProcessorOutcomePreview = ({
columns,
isLoading,
simulation,
filteredSamples,
onRefreshSamples,
selectedDocsFilter,
setSelectedDocsFilter,
}: ProcessorOutcomePreviewProps) => {
const { dependencies } = useKibana();
const { data } = dependencies.start;
const { timeRange, setTimeRange } = data.query.timefilter.timefilter.useTimefilter();
const tableColumns = useMemo(() => {
switch (selectedDocsFilter) {
case 'outcome_filter_unmatched':
return columns
.filter((column) => column.origin === 'processor')
.map((column) => column.name);
case 'outcome_filter_matched':
case 'outcome_filter_all':
default:
return columns.map((column) => column.name);
}
}, [columns, selectedDocsFilter]);
const simulationFailureRate = simulation
? simulation?.failure_rate + simulation?.skipped_rate
: undefined;
const simulationSuccessRate = simulation?.success_rate;
export const ProcessorOutcomePreview = () => {
const isLoading = useSimulatorSelector(
(state) =>
state.matches('debouncingChanges') ||
state.matches('loadingSamples') ||
state.matches('runningSimulation')
);
return (
<>
<EuiFlexItem grow={false}>
<OutcomeControls
docsFilter={selectedDocsFilter}
onDocsFilterChange={setSelectedDocsFilter}
timeRange={timeRange}
onTimeRangeChange={setTimeRange}
onTimeRangeRefresh={onRefreshSamples}
simulationFailureRate={simulationFailureRate}
simulationSuccessRate={simulationSuccessRate}
/>
<OutcomeControls />
</EuiFlexItem>
<EuiSpacer size="m" />
<OutcomePreviewTable documents={filteredSamples} columns={tableColumns} />
<OutcomePreviewTable />
{isLoading && <EuiProgress size="xs" color="accent" position="absolute" />}
</>
);
};
interface OutcomeControlsProps {
docsFilter: DocsFilterOption;
timeRange: TimeRange;
onDocsFilterChange: (filter: DocsFilterOption) => void;
onTimeRangeChange: (timeRange: TimeRange) => void;
onTimeRangeRefresh: () => void;
simulationFailureRate?: number;
simulationSuccessRate?: number;
}
const OutcomeControls = () => {
const { changePreviewDocsFilter } = useStreamEnrichmentEvents();
const previewDocsFilter = useSimulatorSelector((state) => state.context.previewDocsFilter);
const simulationFailureRate = useSimulatorSelector((state) =>
state.context.simulation
? state.context.simulation.failure_rate + state.context.simulation.skipped_rate
: undefined
);
const simulationSuccessRate = useSimulatorSelector(
(state) => state.context.simulation?.success_rate
);
const dateRangeRef = useSimulatorSelector((state) => state.context.dateRangeRef);
const timeRange = useSelector(dateRangeRef, (state) => state.context.timeRange);
const handleRefresh = () => dateRangeRef.send({ type: 'dateRange.refresh' });
const OutcomeControls = ({
docsFilter,
timeRange,
onDocsFilterChange,
onTimeRangeChange,
onTimeRangeRefresh,
simulationFailureRate,
simulationSuccessRate,
}: OutcomeControlsProps) => {
const handleQuerySubmit: StreamsAppSearchBarProps['onQuerySubmit'] = (
{ dateRange },
isUpdate
) => {
if (!isUpdate) {
return onTimeRangeRefresh();
return handleRefresh();
}
if (dateRange) {
onTimeRangeChange({
from: dateRange.from,
to: dateRange?.to,
mode: dateRange.mode,
});
dateRangeRef.send({ type: 'dateRange.update', range: dateRange });
}
};
const getFilterButtonPropsFor = (filterId: DocsFilterOption) => ({
hasActiveFilters: docsFilter === filterId,
onClick: () => onDocsFilterChange(filterId),
const getFilterButtonPropsFor = (filter: PreviewDocsFilterOption) => ({
hasActiveFilters: previewDocsFilter === filter,
onClick: () => changePreviewDocsFilter(filter),
});
return (
@ -141,45 +94,60 @@ const OutcomeControls = ({
{ defaultMessage: 'Filter for all, matching or unmatching previewed documents.' }
)}
>
<EuiFilterButton {...getFilterButtonPropsFor(docsFilterOptions.outcome_filter_all.id)}>
{docsFilterOptions.outcome_filter_all.label}
<EuiFilterButton
{...getFilterButtonPropsFor(previewDocsFilterOptions.outcome_filter_all.id)}
>
{previewDocsFilterOptions.outcome_filter_all.label}
</EuiFilterButton>
<EuiFilterButton
{...getFilterButtonPropsFor(docsFilterOptions.outcome_filter_matched.id)}
{...getFilterButtonPropsFor(previewDocsFilterOptions.outcome_filter_matched.id)}
badgeColor="success"
numActiveFilters={
simulationSuccessRate ? parseFloat((simulationSuccessRate * 100).toFixed(2)) : undefined
}
>
{docsFilterOptions.outcome_filter_matched.label}
{previewDocsFilterOptions.outcome_filter_matched.label}
</EuiFilterButton>
<EuiFilterButton
{...getFilterButtonPropsFor(docsFilterOptions.outcome_filter_unmatched.id)}
{...getFilterButtonPropsFor(previewDocsFilterOptions.outcome_filter_unmatched.id)}
badgeColor="accent"
numActiveFilters={
simulationFailureRate ? parseFloat((simulationFailureRate * 100).toFixed(2)) : undefined
}
>
{docsFilterOptions.outcome_filter_unmatched.label}
{previewDocsFilterOptions.outcome_filter_unmatched.label}
</EuiFilterButton>
</EuiFilterGroup>
<StreamsAppSearchBar
onQuerySubmit={handleQuerySubmit}
onRefresh={onTimeRangeRefresh}
dateRangeFrom={timeRange.from}
dateRangeTo={timeRange.to}
onRefresh={handleRefresh}
dateRangeFrom={timeRange?.from}
dateRangeTo={timeRange?.to}
/>
</EuiFlexGroup>
);
};
interface OutcomePreviewTableProps {
documents: SampleDocument[];
columns: string[];
}
const MemoPreviewTable = React.memo(PreviewTable, (prevProps, nextProps) => {
// Need to specify the props to compare since the columns might be the same even if the useMemo call returns a new array
return (
prevProps.documents === nextProps.documents &&
isEqual(prevProps.displayColumns, nextProps.displayColumns)
);
});
const OutcomePreviewTable = ({ documents, columns }: OutcomePreviewTableProps) => {
if (isEmpty(documents)) {
const OutcomePreviewTable = () => {
const processors = useSimulatorSelector((state) => state.context.processors);
const detectedFields = useSimulatorSelector((state) => state.context.simulation?.detected_fields);
const previewDocsFilter = useSimulatorSelector((state) => state.context.previewDocsFilter);
const previewDocuments = useSimulatorSelector((state) => state.context.previewDocuments);
const previewColumns = useMemo(
() => getTableColumns(processors, detectedFields ?? [], previewDocsFilter),
[detectedFields, previewDocsFilter, processors]
);
if (!previewDocuments || isEmpty(previewDocuments)) {
return (
<EuiEmptyPrompt
titleSize="xs"
@ -207,5 +175,5 @@ const OutcomePreviewTable = ({ documents, columns }: OutcomePreviewTableProps) =
);
}
return <PreviewTable documents={documents} displayColumns={columns} />;
return <MemoPreviewTable documents={previewDocuments} displayColumns={previewColumns} />;
};

View file

@ -29,10 +29,10 @@ import type { FindActionResult } from '@kbn/actions-plugin/server';
import { UseGenAIConnectorsResult } from '@kbn/observability-ai-assistant-plugin/public/hooks/use_genai_connectors';
import { useAbortController, useBoolean } from '@kbn/react-hooks';
import useObservable from 'react-use/lib/useObservable';
import { useStreamDetail } from '../../../../../hooks/use_stream_detail';
import { useKibana } from '../../../../../hooks/use_kibana';
import { GrokFormState, ProcessorFormState } from '../../types';
import { UseProcessingSimulatorReturn } from '../../hooks/use_processing_simulator';
import { useSimulatorContext } from '../../simulator_context';
import { useSimulatorSelector } from '../../state_management/stream_enrichment_state_machine';
const RefreshButton = ({
generatePatterns,
@ -148,12 +148,10 @@ function useAiEnabled() {
}
function InnerGrokAiSuggestions({
refreshSimulation,
filteredSamples,
previewDocuments,
definition,
}: {
refreshSimulation: UseProcessingSimulatorReturn['refreshSimulation'];
filteredSamples: FlattenRecord[];
previewDocuments: FlattenRecord[];
definition: IngestStreamGetResponse;
}) {
const { dependencies } = useKibana();
@ -193,7 +191,7 @@ function InnerGrokAiSuggestions({
body: {
field: fieldValue,
connectorId: currentConnector,
samples: filteredSamples,
samples: previewDocuments,
},
},
})
@ -210,7 +208,7 @@ function InnerGrokAiSuggestions({
currentConnector,
definition.stream.name,
fieldValue,
filteredSamples,
previewDocuments,
streamsRepositoryClient,
]);
@ -225,11 +223,11 @@ function InnerGrokAiSuggestions({
const hasValidField = useMemo(() => {
return Boolean(
currentFieldName &&
filteredSamples.some(
previewDocuments.some(
(sample) => sample[currentFieldName] && typeof sample[currentFieldName] === 'string'
)
);
}, [filteredSamples, currentFieldName]);
}, [previewDocuments, currentFieldName]);
const filteredSuggestions = suggestions?.patterns
.map((pattern, i) => ({
@ -304,7 +302,6 @@ function InnerGrokAiSuggestions({
{ value: suggestion.pattern },
]);
}
refreshSimulation();
}}
data-test-subj="streamsAppGrokAiSuggestionsButton"
iconType="plusInCircle"
@ -360,7 +357,8 @@ export function GrokAiSuggestions() {
core: { http },
} = useKibana();
const { enabled: isAiEnabled, couldBeEnabled } = useAiEnabled();
const props = useSimulatorContext();
const { definition } = useStreamDetail();
const previewDocuments = useSimulatorSelector((state) => state.context.previewDocuments);
if (!isAiEnabled && couldBeEnabled) {
return (
@ -390,8 +388,9 @@ export function GrokAiSuggestions() {
);
}
if (!isAiEnabled) {
if (!isAiEnabled || !definition) {
return null;
}
return <InnerGrokAiSuggestions {...props} />;
return <InnerGrokAiSuggestions definition={definition} previewDocuments={previewDocuments} />;
}

View file

@ -20,94 +20,84 @@ import {
EuiText,
EuiBadge,
} from '@elastic/eui';
import { useSelector } from '@xstate5/react';
import { i18n } from '@kbn/i18n';
import { ProcessorType, IngestStreamGetResponse } from '@kbn/streams-schema';
import { isEmpty, isEqual } from 'lodash';
import React, { useEffect, useMemo, useState } from 'react';
import { isEmpty } from 'lodash';
import React, { useEffect, useMemo } from 'react';
import { useForm, SubmitHandler, FormProvider, useWatch } from 'react-hook-form';
import { css } from '@emotion/react';
import { useBoolean } from '@kbn/react-hooks';
import { DiscardPromptOptions, useDiscardConfirm } from '../../../../hooks/use_discard_confirm';
import { DissectProcessorForm } from './dissect';
import { GrokProcessorForm } from './grok';
import { ProcessorTypeSelector } from './processor_type_selector';
import { ProcessorFormState, ProcessorDefinitionWithUIAttributes } from '../types';
import {
getDefaultFormState,
getFormStateFrom,
convertFormStateToProcessor,
isGrokProcessor,
isDissectProcessor,
getDefaultFormStateByType,
} from '../utils';
import { useDiscardConfirm } from '../../../../hooks/use_discard_confirm';
import { ProcessorMetrics, UseProcessingSimulatorReturn } from '../hooks/use_processing_simulator';
import { ProcessorErrors, ProcessorMetricBadges } from './processor_metrics';
import { UseDefinitionReturn } from '../hooks/use_definition';
import {
useStreamEnrichmentEvents,
useStreamsEnrichmentSelector,
useSimulatorSelector,
StreamEnrichmentContext,
} from '../state_management/stream_enrichment_state_machine';
import { ProcessorMetrics } from '../state_management/simulation_state_machine';
export interface ProcessorPanelProps {
definition: IngestStreamGetResponse;
processorMetrics?: ProcessorMetrics;
onWatchProcessor: UseProcessingSimulatorReturn['watchProcessor'];
}
export interface AddProcessorPanelProps extends ProcessorPanelProps {
isInitiallyOpen?: boolean;
onAddProcessor: UseDefinitionReturn['addProcessor'];
}
export interface EditProcessorPanelProps extends ProcessorPanelProps {
processor: ProcessorDefinitionWithUIAttributes;
onDeleteProcessor: UseDefinitionReturn['deleteProcessor'];
onUpdateProcessor: UseDefinitionReturn['updateProcessor'];
}
export function AddProcessorPanel({
onAddProcessor,
onWatchProcessor,
processorMetrics,
}: AddProcessorPanelProps) {
export function AddProcessorPanel() {
const { euiTheme } = useEuiTheme();
const [hasChanges, setHasChanges] = useState(false);
const [isOpen, { on: openPanel, off: closePanel }] = useBoolean(false);
const { addProcessor } = useStreamEnrichmentEvents();
const defaultValues = useMemo(() => getDefaultFormState('grok'), []);
const processorRef = useStreamsEnrichmentSelector((state) =>
state.context.processorsRefs.find((p) => p.getSnapshot().matches('draft'))
);
const processorMetrics = useSimulatorSelector(
(state) => processorRef && state.context.simulation?.processors_metrics[processorRef.id]
);
const isOpen = Boolean(processorRef);
const defaultValues = useMemo(() => getDefaultFormStateByType('grok'), []);
const methods = useForm<ProcessorFormState>({ defaultValues, mode: 'onChange' });
const type = useWatch({ control: methods.control, name: 'type' });
useEffect(() => {
if (isOpen) {
if (!processorRef) {
methods.reset(defaultValues);
}
}, [defaultValues, methods, processorRef]);
useEffect(() => {
if (processorRef) {
const { unsubscribe } = methods.watch((value) => {
const draftProcessor = createDraftProcessorFromForm(value as ProcessorFormState);
onWatchProcessor(draftProcessor);
setHasChanges(!isEqual(defaultValues, value));
const processor = convertFormStateToProcessor(value as ProcessorFormState);
processorRef.send({ type: 'processor.change', processor });
});
return () => unsubscribe();
}
}, [defaultValues, isOpen, methods, onWatchProcessor]);
}, [methods, processorRef]);
const handleSubmit: SubmitHandler<ProcessorFormState> = async (data) => {
const processingDefinition = convertFormStateToProcessor(data);
const handleCancel = useDiscardConfirm(
() => processorRef?.send({ type: 'processor.cancel' }),
discardChangesPromptOptions
);
onWatchProcessor({ id: 'draft', deleteIfExists: true });
onAddProcessor(processingDefinition, data.detected_fields);
closePanel();
};
const handleCancel = () => {
closePanel();
methods.reset();
onWatchProcessor({ id: 'draft', deleteIfExists: true });
const handleSubmit: SubmitHandler<ProcessorFormState> = async () => {
processorRef?.send({ type: 'processor.stage' });
};
const handleOpen = () => {
const draftProcessor = createDraftProcessorFromForm(defaultValues);
onWatchProcessor(draftProcessor);
openPanel();
addProcessor(draftProcessor);
};
const confirmDiscardAndClose = useDiscardConfirm(handleCancel);
const buttonContent = isOpen ? (
i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processorPanel.addingProcessor',
@ -146,7 +136,7 @@ export function AddProcessorPanel({
<EuiFlexGroup alignItems="center" gutterSize="s">
<EuiButtonEmpty
data-test-subj="streamsAppAddProcessorPanelCancelButton"
onClick={hasChanges ? confirmDiscardAndClose : handleCancel}
onClick={handleCancel}
size="s"
>
{i18n.translate(
@ -195,74 +185,74 @@ const createDraftProcessorFromForm = (
return {
id: 'draft',
status: 'draft',
type: formState.type,
...processingDefinition,
};
};
export function EditProcessorPanel({
onDeleteProcessor,
onUpdateProcessor,
onWatchProcessor,
processor,
processorMetrics,
}: EditProcessorPanelProps) {
const { euiTheme } = useEuiTheme();
export interface EditProcessorPanelProps {
processorRef: StreamEnrichmentContext['processorsRefs'][number];
processorMetrics?: ProcessorMetrics;
}
const [hasChanges, setHasChanges] = useState(false);
const [isOpen, { on: openPanel, off: closePanel }] = useBoolean();
export function EditProcessorPanel({ processorRef, processorMetrics }: EditProcessorPanelProps) {
const { euiTheme } = useEuiTheme();
const state = useSelector(processorRef, (s) => s);
const previousProcessor = state.context.previousProcessor;
const processor = state.context.processor;
const processorDescription = getProcessorDescription(processor);
const isDraft = processor.status === 'draft';
const isUnsaved = isDraft || processor.status === 'updated';
const isOpen = state.matches({ configured: 'edit' });
const isNew = state.context.isNew;
const isUnsaved = isNew || state.context.isUpdated;
const defaultValues = useMemo(() => getDefaultFormState(processor.type, processor), [processor]);
const defaultValues = useMemo(() => getFormStateFrom(processor), [processor]);
const methods = useForm<ProcessorFormState>({ defaultValues, mode: 'onChange' });
const methods = useForm<ProcessorFormState>({
defaultValues,
mode: 'onChange',
});
const type = useWatch({ control: methods.control, name: 'type' });
useEffect(() => {
const { unsubscribe } = methods.watch((value) => {
const processingDefinition = convertFormStateToProcessor(value as ProcessorFormState);
onWatchProcessor({
id: processor.id,
status: processor.status,
type: value.type as ProcessorType,
...processingDefinition,
processorRef.send({
type: 'processor.change',
processor: processingDefinition,
});
setHasChanges(!isEqual(defaultValues, value));
});
return () => unsubscribe();
}, [defaultValues, methods, onWatchProcessor, processor.id, processor.status]);
}, [methods, processorRef]);
const handleSubmit: SubmitHandler<ProcessorFormState> = (data) => {
const processorDefinition = convertFormStateToProcessor(data);
useEffect(() => {
const subscription = processorRef.on('processor.changesDiscarded', () => {
methods.reset(getFormStateFrom(previousProcessor));
});
onUpdateProcessor(processor.id, processorDefinition, isDraft ? 'draft' : 'updated');
closePanel();
return () => subscription.unsubscribe();
}, [methods, previousProcessor, processorRef]);
const handleCancel = useDiscardConfirm(
() => processorRef?.send({ type: 'processor.cancel' }),
discardChangesPromptOptions
);
const handleProcessorDelete = useDiscardConfirm(
() => processorRef?.send({ type: 'processor.delete' }),
deleteProcessorPromptOptions
);
const handleSubmit: SubmitHandler<ProcessorFormState> = () => {
processorRef.send({ type: 'processor.update' });
};
const handleProcessorDelete = () => {
onDeleteProcessor(processor.id);
closePanel();
const handleOpen = () => {
processorRef.send({ type: 'processor.edit' });
};
const handleCancel = () => {
methods.reset();
closePanel();
};
const confirmDiscardAndClose = useDiscardConfirm(handleCancel);
const confirmDeletionAndClose = useDiscardConfirm(handleProcessorDelete, {
title: deleteProcessorTitle,
message: deleteProcessorMessage,
confirmButtonText: deleteProcessorLabel,
cancelButtonText: deleteProcessorCancelLabel,
});
const buttonContent = isOpen ? (
<strong>{processor.type.toUpperCase()}</strong>
) : (
@ -278,7 +268,7 @@ export function EditProcessorPanel({
return (
<EuiPanel
hasBorder
color={isDraft ? 'subdued' : undefined}
color={isNew ? 'subdued' : undefined}
css={css`
border: ${euiTheme.border.thin};
padding: ${euiTheme.size.m};
@ -308,7 +298,7 @@ export function EditProcessorPanel({
<EuiFlexGroup alignItems="center" gutterSize="s">
<EuiButtonEmpty
data-test-subj="streamsAppEditProcessorPanelCancelButton"
onClick={hasChanges ? confirmDiscardAndClose : handleCancel}
onClick={handleCancel}
size="s"
>
{i18n.translate(
@ -321,7 +311,7 @@ export function EditProcessorPanel({
size="s"
fill
onClick={methods.handleSubmit(handleSubmit)}
disabled={!methods.formState.isValid}
disabled={!methods.formState.isValid || !state.can({ type: 'processor.update' })}
>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processorPanel.confirmEditProcessor',
@ -342,7 +332,7 @@ export function EditProcessorPanel({
)}
<EuiButtonIcon
data-test-subj="streamsAppEditProcessorPanelButton"
onClick={openPanel}
onClick={handleOpen}
iconType="pencil"
color="text"
size="xs"
@ -363,15 +353,15 @@ export function EditProcessorPanel({
<EuiSpacer size="m" />
{type === 'grok' && <GrokProcessorForm />}
{type === 'dissect' && <DissectProcessorForm />}
<EuiHorizontalRule margin="m" />
<EuiButton
data-test-subj="streamsAppEditProcessorPanelButton"
color="danger"
onClick={confirmDeletionAndClose}
>
{deleteProcessorLabel}
</EuiButton>
</EuiForm>
<EuiHorizontalRule margin="m" />
<EuiButton
data-test-subj="streamsAppEditProcessorPanelButton"
color="danger"
onClick={handleProcessorDelete}
>
{deleteProcessorLabel}
</EuiButton>
{processorMetrics && !isEmpty(processorMetrics.errors) && (
<ProcessorErrors metrics={processorMetrics} />
)}
@ -397,21 +387,6 @@ const deleteProcessorLabel = i18n.translate(
{ defaultMessage: 'Delete processor' }
);
const deleteProcessorCancelLabel = i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.deleteProcessorCancelLabel',
{ defaultMessage: 'Cancel' }
);
const deleteProcessorTitle = i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.deleteProcessorTitle',
{ defaultMessage: 'Are you sure you want to delete this processor?' }
);
const deleteProcessorMessage = i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.deleteProcessorMessage',
{ defaultMessage: 'Deleting this processor will permanently impact the field configuration.' }
);
const getProcessorDescription = (processor: ProcessorDefinitionWithUIAttributes) => {
if (isGrokProcessor(processor)) {
return processor.grok.patterns.join(' • ');
@ -421,3 +396,37 @@ const getProcessorDescription = (processor: ProcessorDefinitionWithUIAttributes)
return '';
};
export const discardChangesPromptOptions: DiscardPromptOptions = {
message: i18n.translate('xpack.streams.enrichment.processor.discardChanges.message', {
defaultMessage: 'Are you sure you want to discard your changes?',
}),
title: i18n.translate('xpack.streams.enrichment.processor.discardChanges.title', {
defaultMessage: 'Discard changes?',
}),
confirmButtonText: i18n.translate(
'xpack.streams.enrichment.processor.discardChanges.confirmButtonText',
{ defaultMessage: 'Discard' }
),
cancelButtonText: i18n.translate(
'xpack.streams.enrichment.processor.discardChanges.cancelButtonText',
{ defaultMessage: 'Keep editing' }
),
};
export const deleteProcessorPromptOptions: DiscardPromptOptions = {
message: i18n.translate('xpack.streams.enrichment.processor.deleteProcessor.message', {
defaultMessage: 'Deleting this processor will permanently impact the field configuration.',
}),
title: i18n.translate('xpack.streams.enrichment.processor.deleteProcessor.title', {
defaultMessage: 'Are you sure you want to delete this processor?',
}),
confirmButtonText: i18n.translate(
'xpack.streams.enrichment.processor.deleteProcessor.confirmButtonText',
{ defaultMessage: 'Delete processor' }
),
cancelButtonText: i18n.translate(
'xpack.streams.enrichment.processor.deleteProcessor.cancelButtonText',
{ defaultMessage: 'Cancel' }
),
};

View file

@ -20,7 +20,7 @@ import React from 'react';
import { i18n } from '@kbn/i18n';
import useToggle from 'react-use/lib/useToggle';
import { css } from '@emotion/react';
import { ProcessorMetrics } from '../hooks/use_processing_simulator';
import { ProcessorMetrics } from '../state_management/simulation_state_machine';
type ProcessorMetricBadgesProps = ProcessorMetrics;
@ -116,7 +116,9 @@ export const ProcessorErrors = ({ metrics }: { metrics: ProcessorMetrics }) => {
const shouldDisplayErrorToggle = remainingCount > 0;
const getCalloutProps = (type: ProcessorMetrics['errors'][number]['type']): EuiCallOutProps => {
const isWarningError = type === 'generic_processor_failure' && success_rate > 0;
const isWarningError =
type === 'non_additive_processor_failure' ||
(type === 'generic_processor_failure' && success_rate > 0);
return {
color: isWarningError ? 'warning' : 'danger',

View file

@ -12,7 +12,7 @@ import { FormattedMessage } from '@kbn/i18n-react';
import { useController, useFormContext, useWatch } from 'react-hook-form';
import { ProcessorType } from '@kbn/streams-schema';
import { useKibana } from '../../../../hooks/use_kibana';
import { getDefaultFormState } from '../utils';
import { getDefaultFormStateByType } from '../utils';
import { ProcessorFormState } from '../types';
interface TAvailableProcessor {
@ -38,7 +38,7 @@ export const ProcessorTypeSelector = ({
const processorType = useWatch<{ type: ProcessorType }>({ name: 'type' });
const handleChange = (type: ProcessorType) => {
const formState = getDefaultFormState(type);
const formState = getDefaultFormStateByType(type);
reset(formState);
};

View file

@ -10,20 +10,19 @@ import { EuiDraggable } from '@elastic/eui';
import { EditProcessorPanel, type EditProcessorPanelProps } from './processors';
export const DraggableProcessorListItem = ({
processor,
idx,
...props
}: EditProcessorPanelProps & { idx: number }) => (
<EuiDraggable
index={idx}
spacing="m"
draggableId={processor.id}
draggableId={props.processorRef.id}
hasInteractiveChildren
style={{
paddingLeft: 0,
paddingRight: 0,
}}
>
{() => <EditProcessorPanel processor={processor} {...props} />}
{() => <EditProcessorPanel {...props} />}
</EuiDraggable>
);

View file

@ -5,83 +5,56 @@
* 2.0.
*/
import React, { useState } from 'react';
import React from 'react';
import { i18n } from '@kbn/i18n';
import { EuiFlexItem, EuiSpacer, EuiTab, EuiTabs } from '@elastic/eui';
import { IngestStreamGetResponse, isWiredStreamGetResponse } from '@kbn/streams-schema';
import { isWiredStreamGetResponse } from '@kbn/streams-schema';
import { ProcessorOutcomePreview } from './processor_outcome_preview';
import { TableColumn, UseProcessingSimulatorReturn } from './hooks/use_processing_simulator';
import {
useStreamEnrichmentEvents,
useStreamsEnrichmentSelector,
} from './state_management/stream_enrichment_state_machine';
interface SimulationPlaygroundProps {
definition: IngestStreamGetResponse;
columns: TableColumn[];
isLoading: UseProcessingSimulatorReturn['isLoading'];
simulation: UseProcessingSimulatorReturn['simulation'];
filteredSamples: UseProcessingSimulatorReturn['filteredSamples'];
selectedDocsFilter: UseProcessingSimulatorReturn['selectedDocsFilter'];
setSelectedDocsFilter: UseProcessingSimulatorReturn['setSelectedDocsFilter'];
onRefreshSamples: UseProcessingSimulatorReturn['refreshSamples'];
}
export const SimulationPlayground = () => {
const isViewingDataPreview = useStreamsEnrichmentSelector((state) =>
state.matches({
ready: { enrichment: { displayingSimulation: 'viewDataPreview' } },
})
);
const isViewingDetectedFields = useStreamsEnrichmentSelector((state) =>
state.matches({
ready: { enrichment: { displayingSimulation: 'viewDetectedFields' } },
})
);
const canViewDetectedFields = useStreamsEnrichmentSelector((state) =>
isWiredStreamGetResponse(state.context.definition)
);
export const SimulationPlayground = (props: SimulationPlaygroundProps) => {
const {
definition,
columns,
isLoading,
simulation,
filteredSamples,
onRefreshSamples,
setSelectedDocsFilter,
selectedDocsFilter,
} = props;
const tabs = {
dataPreview: {
name: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.dataPreview',
{ defaultMessage: 'Data preview' }
),
},
...(isWiredStreamGetResponse(definition) && {
detectedFields: {
name: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFields',
{ defaultMessage: 'Detected fields' }
),
},
}),
} as const;
const [selectedTabId, setSelectedTabId] = useState<keyof typeof tabs>('dataPreview');
const { viewSimulationPreviewData, viewSimulationDetectedFields } = useStreamEnrichmentEvents();
return (
<>
<EuiFlexItem grow={false}>
<EuiTabs bottomBorder={false}>
{Object.entries(tabs).map(([tabId, tab]) => (
<EuiTab
key={tabId}
isSelected={selectedTabId === tabId}
onClick={() => setSelectedTabId(tabId as keyof typeof tabs)}
>
{tab.name}
<EuiTab isSelected={isViewingDataPreview} onClick={viewSimulationPreviewData}>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.dataPreview',
{ defaultMessage: 'Data preview' }
)}
</EuiTab>
{canViewDetectedFields && (
<EuiTab isSelected={isViewingDetectedFields} onClick={viewSimulationDetectedFields}>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFields',
{ defaultMessage: 'Detected fields' }
)}
</EuiTab>
))}
)}
</EuiTabs>
</EuiFlexItem>
<EuiSpacer size="m" />
{selectedTabId === 'dataPreview' && (
<ProcessorOutcomePreview
columns={columns}
isLoading={isLoading}
simulation={simulation}
filteredSamples={filteredSamples}
onRefreshSamples={onRefreshSamples}
selectedDocsFilter={selectedDocsFilter}
setSelectedDocsFilter={setSelectedDocsFilter}
/>
)}
{selectedTabId === 'detectedFields' &&
{isViewingDataPreview && <ProcessorOutcomePreview />}
{isViewingDetectedFields &&
i18n.translate('xpack.streams.simulationPlayground.div.detectedFieldsLabel', {
defaultMessage: 'WIP',
})}

View file

@ -1,46 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { useMemo } from 'react';
import { createContext } from 'react';
import { IngestStreamGetResponse } from '@kbn/streams-schema';
import { UseProcessingSimulatorReturn } from './hooks/use_processing_simulator';
export const context = createContext<SimulatorContextValue | undefined>(undefined);
export interface SimulatorContextValue extends UseProcessingSimulatorReturn {
definition: IngestStreamGetResponse;
}
export function SimulatorContextProvider({
processingSimulator,
definition,
children,
}: {
processingSimulator: UseProcessingSimulatorReturn;
definition: IngestStreamGetResponse;
children: React.ReactNode;
}) {
const contextValue = useMemo(() => {
return {
definition,
...processingSimulator,
};
}, [definition, processingSimulator]);
return <context.Provider value={contextValue}>{children}</context.Provider>;
}
export function useSimulatorContext() {
const ctx = React.useContext(context);
if (!ctx) {
throw new Error(
'useStreamsEnrichmentContext must be used within a StreamsEnrichmentContextProvider'
);
}
return ctx;
}

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './processor_state_machine';
export * from './types';

View file

@ -0,0 +1,138 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ActorRefFrom, assign, emit, forwardTo, sendTo, setup } from 'xstate5';
import { isEqual } from 'lodash';
import { ProcessorDefinition, getProcessorType } from '@kbn/streams-schema';
import { ProcessorInput, ProcessorContext, ProcessorEvent, ProcessorEmittedEvent } from './types';
export type ProcessorActorRef = ActorRefFrom<typeof processorMachine>;
export const processorMachine = setup({
types: {
input: {} as ProcessorInput,
context: {} as ProcessorContext,
events: {} as ProcessorEvent,
emitted: {} as ProcessorEmittedEvent,
},
actions: {
changeProcessor: assign(({ context }, params: { processor: ProcessorDefinition }) => ({
processor: {
id: context.processor.id,
type: getProcessorType(params.processor),
...params.processor,
},
})),
resetToPrevious: assign(({ context }) => ({
processor: context.previousProcessor,
})),
markAsUpdated: assign(({ context }) => ({
previousProcessor: context.processor,
isUpdated: true,
})),
forwardEventToParent: forwardTo(({ context }) => context.parentRef),
forwardChangeEventToParent: sendTo(
({ context }) => context.parentRef,
({ context }) => ({
type: 'processor.change',
id: context.processor.id,
})
),
notifyProcessorDelete: sendTo(
({ context }) => context.parentRef,
({ context }) => ({
type: 'processor.delete',
id: context.processor.id,
})
),
emitChangesDiscarded: emit({ type: 'processor.changesDiscarded' }),
},
guards: {
isDraft: ({ context }) => context.isNew,
hasEditingChanges: ({ context }) => !isEqual(context.previousProcessor, context.processor),
},
}).createMachine({
/** @xstate-layout N4IgpgJg5mDOIC5QAcBOB7AxnW7UDoBXAO1TnQBsA3SAYgG0AGAXURXVgEsAXT9YtiAAeiAMz4AbACYJAFgDsATkWN5jCfNETRUgDQgAnogCM8gBz4ArMbOjjsxossalAX1f60WHHiKly1HT0xqxIIMgcPHwCYSIIUrKW+IxSlimpEtrOorL6RgimFta2jLLKMlJSivLunhjYsLgEEKgAhgBm3PiQUcRQtF4NTfiw3K0wTKHsXLz8gnGVsviilopmxpWMxnYSeYiJ4hLV8kpSovLGpme14fU+zW2d3RC9-YP3+JitxNgUk4IRGbReaIRT4RSVUTrMpmI7yByWPYICSlfC2RSiHSMSzyKo1Dy3byNXwtDpdHq8PoDO7EgiYAAW3wmLABkVmMVAcQAtNtGOC7NYqpYpIxHCckVolkdZMZnBoUvJLGZLDd3rTPvx2pwoIQyBB8JwIBQwNSicMKf8woConNYohpBJwZopOszGZZEdFMYJal8IqUplTMUpMZVTThphNdrdZBnjw45S3uHfIRkBBWtwwJbpjaOcJ7VDwcYvfI5KkVhIzEiDpJjqdzpdcWGzb5I8QtTq9QmE5wqWqI99ftnwmzgXaEG7HbI7FD7NKId7DIgqnydJZrKVKhJ7GZm0NW1HO7GKT2+8nmmBjZnh9b2SCEJjjH715iZVpMW6kfC+TJZZdjJWZSKHuHxth2Mb6ieFK9kmLZ0oyfRZiyVqjranKIGYLr4KkLpmBiphQkcSJmFsfpKE4UjmCKGLuASxDoBAcAAuerJAmh+YIFylQWIksJQgotjGIwUJfksEJbDkKzFriOQgeqJBkLggQQKxub3jyJH4Lx2juuYdjCVWS4FBcyQqCGIYnMoSoqgS-Yko83CqXe448uY-J-kKIpivISKUfI+DFjKsjTmUlarHJwykk80F9E5Y7ocifKyBUiruvpsi2NWKgBWkbprKIGLSFIEUHu20Z6nF7FxDiFjVDorrup6i75ABUhaSKc4aBklQ2XUcEamVR76oaxqVXmcQnEkthKMKUINuKRkJGC4n2KoZhqMlmIlXSh4QQmY33pkYJaC6Xp4phOhIhIwp+tulgrDIWyndtA3gV2UEvImB3jhc-knXhGzVBdehGd+2F3f+gHKC9jFXpA30JVyFi2GkKzVPd8JQiD+S4kkXp4dkQmlIktGuEAA */
id: 'processor',
context: ({ input }) => ({
parentRef: input.parentRef,
previousProcessor: input.processor,
processor: input.processor,
isNew: input.isNew ?? false,
}),
initial: 'unresolved',
states: {
unresolved: {
always: [{ target: 'draft', guard: 'isDraft' }, { target: 'configured' }],
},
draft: {
initial: 'editing',
states: {
editing: {
on: {
'processor.stage': {
target: '#configured',
actions: [{ type: 'markAsUpdated' }, { type: 'forwardEventToParent' }],
},
'processor.cancel': {
target: '#deleted',
actions: [{ type: 'resetToPrevious' }],
},
'processor.change': {
actions: [
{ type: 'changeProcessor', params: ({ event }) => event },
{ type: 'forwardChangeEventToParent' },
],
},
},
},
},
},
configured: {
id: 'configured',
initial: 'idle',
states: {
idle: {
on: { 'processor.edit': 'edit' },
},
edit: {
initial: 'editing',
states: {
editing: {
on: {
'processor.update': {
guard: 'hasEditingChanges',
target: '#configured.idle',
actions: [{ type: 'markAsUpdated' }, { type: 'forwardEventToParent' }],
},
'processor.cancel': {
target: '#configured.idle',
actions: [
{ type: 'emitChangesDiscarded' },
{ type: 'resetToPrevious' },
{ type: 'forwardEventToParent' },
],
},
'processor.delete': '#deleted',
'processor.change': {
actions: [
{ type: 'changeProcessor', params: ({ event }) => event },
{ type: 'forwardChangeEventToParent' },
],
},
},
},
},
},
},
},
deleted: {
id: 'deleted',
type: 'final',
entry: [{ type: 'notifyProcessorDelete' }],
},
},
});

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ProcessorDefinition } from '@kbn/streams-schema';
import { ActorRef, Snapshot } from 'xstate5';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
export type ProcessorToParentEvent =
| { type: 'processor.change'; id: string }
| { type: 'processor.delete'; id: string }
| { type: 'processor.stage' };
export interface ProcessorInput {
parentRef: ProcessorParentActor;
processor: ProcessorDefinitionWithUIAttributes;
isNew?: boolean;
}
export type ProcessorParentActor = ActorRef<Snapshot<unknown>, ProcessorToParentEvent>;
export interface ProcessorContext {
parentRef: ProcessorParentActor;
previousProcessor: ProcessorDefinitionWithUIAttributes;
processor: ProcessorDefinitionWithUIAttributes;
isNew: boolean;
isUpdated?: boolean;
}
export type ProcessorEvent =
| { type: 'processor.cancel' }
| { type: 'processor.change'; processor: ProcessorDefinition }
| { type: 'processor.delete' }
| { type: 'processor.edit' }
| { type: 'processor.stage' }
| { type: 'processor.update' };
export interface ProcessorEmittedEvent {
type: 'processor.changesDiscarded';
}

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './preview_docs_filter';
export * from './simulation_state_machine';
export * from './types';
export * from './utils';

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { i18n } from '@kbn/i18n';
export const previewDocsFilterOptions = {
outcome_filter_all: {
id: 'outcome_filter_all',
label: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.all',
{ defaultMessage: 'All samples' }
),
},
outcome_filter_matched: {
id: 'outcome_filter_matched',
label: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.matched',
{ defaultMessage: 'Matched' }
),
},
outcome_filter_unmatched: {
id: 'outcome_filter_unmatched',
label: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.outcomeControls.unmatched',
{ defaultMessage: 'Unmatched' }
),
},
} as const;
export type PreviewDocsFilterOption = keyof typeof previewDocsFilterOptions;

View file

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { i18n } from '@kbn/i18n';
import { flattenObjectNestedLast } from '@kbn/object-utils';
import { Condition, FlattenRecord } from '@kbn/streams-schema';
import { fromPromise, ErrorActorEvent } from 'xstate5';
import { errors as esErrors } from '@elastic/elasticsearch';
import { DateRangeContext } from '../../../../../state_management/date_range_state_machine';
import { SimulationMachineDeps } from './types';
export interface SamplesFetchInput {
condition?: Condition;
streamName: string;
absoluteTimeRange: DateRangeContext['absoluteTimeRange'];
}
export function createSamplesFetchActor({
streamsRepositoryClient,
}: Pick<SimulationMachineDeps, 'streamsRepositoryClient'>) {
return fromPromise<FlattenRecord[], SamplesFetchInput>(async ({ input, signal }) => {
const samplesBody = await streamsRepositoryClient.fetch('POST /api/streams/{name}/_sample', {
signal,
params: {
path: { name: input.streamName },
body: {
if: input.condition,
start: input.absoluteTimeRange.start,
end: input.absoluteTimeRange.end,
size: 100,
},
},
});
return samplesBody.documents.map(flattenObjectNestedLast) as FlattenRecord[];
});
}
export function createSamplesFetchFailureNofitier({
toasts,
}: Pick<SimulationMachineDeps, 'toasts'>) {
return (params: { event: unknown }) => {
const event = params.event as ErrorActorEvent<esErrors.ResponseError, string>;
toasts.addError(new Error(event.error.body.message), {
title: i18n.translate('xpack.streams.enrichment.simulation.samplesFetchError', {
defaultMessage: 'An issue occurred retrieving samples.',
}),
toastMessage: event.error.body.message,
});
};
}

View file

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { i18n } from '@kbn/i18n';
import { FlattenRecord } from '@kbn/streams-schema';
import { fromPromise, ErrorActorEvent } from 'xstate5';
import { errors as esErrors } from '@elastic/elasticsearch';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
import { processorConverter } from '../../utils';
import { Simulation, SimulationMachineDeps } from './types';
export interface SimulationRunnerInput {
streamName: string;
documents: FlattenRecord[];
processors: ProcessorDefinitionWithUIAttributes[];
}
export function createSimulationRunnerActor({
streamsRepositoryClient,
}: Pick<SimulationMachineDeps, 'streamsRepositoryClient'>) {
return fromPromise<Simulation, SimulationRunnerInput>(({ input, signal }) =>
streamsRepositoryClient.fetch('POST /api/streams/{name}/processing/_simulate', {
signal,
params: {
path: { name: input.streamName },
body: {
documents: input.documents,
processing: input.processors.map(processorConverter.toSimulateDefinition),
},
},
})
);
}
export function createSimulationRunFailureNofitier({
toasts,
}: Pick<SimulationMachineDeps, 'toasts'>) {
return (params: { event: unknown }) => {
const event = params.event as ErrorActorEvent<esErrors.ResponseError, string>;
toasts.addError(new Error(event.error.body.message), {
title: i18n.translate('xpack.streams.enrichment.simulation.simulationRunError', {
defaultMessage: 'An issue occurred running the simulation.',
}),
toastMessage: event.error.body.message,
});
};
}

View file

@ -0,0 +1,280 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ActorRefFrom, MachineImplementationsFrom, SnapshotFrom, assign, setup } from 'xstate5';
import { getPlaceholderFor } from '@kbn/xstate-utils';
import { FlattenRecord, isSchema, processorDefinitionSchema } from '@kbn/streams-schema';
import { isEmpty, isEqual } from 'lodash';
import {
dateRangeMachine,
createDateRangeMachineImplementations,
} from '../../../../../state_management/date_range_state_machine';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
import { processorConverter } from '../../utils';
import {
SimulationInput,
SimulationContext,
SimulationEvent,
Simulation,
SimulationMachineDeps,
} from './types';
import { PreviewDocsFilterOption } from './preview_docs_filter';
import {
createSamplesFetchActor,
createSamplesFetchFailureNofitier,
} from './samples_fetcher_actor';
import {
createSimulationRunnerActor,
createSimulationRunFailureNofitier,
} from './simulation_runner_actor';
import { filterSimulationDocuments, composeSamplingCondition } from './utils';
export type SimulationActorRef = ActorRefFrom<typeof simulationMachine>;
export type SimulationActorSnapshot = SnapshotFrom<typeof simulationMachine>;
export interface ProcessorEventParams {
processors: ProcessorDefinitionWithUIAttributes[];
}
const hasSamples = (samples: FlattenRecord[]) => !isEmpty(samples);
const isValidProcessor = (processor: ProcessorDefinitionWithUIAttributes) =>
isSchema(processorDefinitionSchema, processorConverter.toAPIDefinition(processor));
const hasValidProcessors = (processors: ProcessorDefinitionWithUIAttributes[]) =>
processors.every(isValidProcessor);
export const simulationMachine = setup({
types: {
input: {} as SimulationInput,
context: {} as SimulationContext,
events: {} as SimulationEvent,
},
actors: {
fetchSamples: getPlaceholderFor(createSamplesFetchActor),
runSimulation: getPlaceholderFor(createSimulationRunnerActor),
dateRangeMachine: getPlaceholderFor(() => dateRangeMachine),
},
actions: {
notifySamplesFetchFailure: getPlaceholderFor(createSamplesFetchFailureNofitier),
notifySimulationRunFailure: getPlaceholderFor(createSimulationRunFailureNofitier),
storeTimeUpdated: getPlaceholderFor(createSimulationRunFailureNofitier),
storePreviewDocsFilter: assign((_, params: { filter: PreviewDocsFilterOption }) => ({
previewDocsFilter: params.filter,
})),
storeProcessors: assign((_, params: ProcessorEventParams) => ({
processors: params.processors,
})),
storeSamples: assign((_, params: { samples: FlattenRecord[] }) => ({
samples: params.samples,
})),
storeSimulation: assign((_, params: { simulation: Simulation | undefined }) => ({
simulation: params.simulation,
})),
derivePreviewDocuments: assign(({ context }) => {
return {
previewDocuments: context.simulation
? filterSimulationDocuments(context.simulation.documents, context.previewDocsFilter)
: context.samples,
};
}),
deriveSamplingCondition: assign(({ context }) => ({
samplingCondition: composeSamplingCondition(context.processors),
})),
resetSimulation: assign({
processors: [],
simulation: undefined,
samplingCondition: composeSamplingCondition([]),
previewDocsFilter: 'outcome_filter_all',
}),
},
delays: {
debounceTime: 800,
},
guards: {
canSimulate: ({ context }, params: ProcessorEventParams) =>
hasSamples(context.samples) && hasValidProcessors(params.processors),
hasProcessors: (_, params: ProcessorEventParams) => !isEmpty(params.processors),
hasSamples: ({ context }) => hasSamples(context.samples),
hasValidProcessors: (_, params: ProcessorEventParams) => hasValidProcessors(params.processors),
shouldRefetchSamples: ({ context }) =>
Boolean(
context.samplingCondition &&
!isEqual(context.samplingCondition, composeSamplingCondition(context.processors))
),
},
}).createMachine({
/** @xstate-layout N4IgpgJg5mDOIC5SwJYFsCuAbAhgFxQHsA7AYgnzACUdiYA6DABwrzAG0AGAXUVCcKoCJPiAAeiAIwB2AGwAWetICsnTgA5Js2cuUBOZQCYANCACeiALSTOh+rfUBmWdL3TpzzkYC+306kxcYTIA7HwiYnoAYwALWhgABQAnMAA3FDAAdwARQijYADEULDYkrl4kEAEhCNEJBFlHSXplFXVDPRl1ZVlNUwsES2VJR3oRw0dDeXUXR04ZX390MODSUKCI+hTYMDxy0WqUYLrEKdcx9T15ecNJPVlO6X6rQ3V6SddDdzd5K-llRyLEDrcIkUhMJJ5OCwQhJWD0ABU+0qh2OlXqkhmikc8kkhmU6hm+n08meg3xzU4-3aUy0kmmyiBINWEKhsBhSWitCiYCwyP4giOtXRpxGynoBhmhkMsipLlJ5isb2U13u8mk6qmRl0TOWGzBrJ57Nh0TidA4PAOgrRoHqhL07w6elpBL0GjJlkcDoBvWkcr0k0ahl1gVBZEN0JNEF5uwtFQFNREIoQ0oU9GmOlusr9KrJNkULlkmLdk2l6g0IZWEXBkKNHPo0awsfYknjVWtwttpzmdnkuNubk6KscZL7dkL8g+-3V0hllf1kRQxCFOCwKAAXkuoKR+e3E8QTggRo43r9NGOs6m87J6L0epxnZcFPIOvOw-Qlyu15u6DvW1b90PMVmnUdwvC8GxnSuMkXCUSCvE6B4mimN9ggbMAACNCAwYgoi3ABhM0YFgGs2XrWJ4jjAChSTLsEF+aQJXuTgNT0J9Og9HF7FVV4Az9RwCVkVDNmjLCcLwuhCMokixFgPBKHoHAADNSgAClE7DcLAAAVdAwAASjWPV3w08SCKIuBd1RTtxCkWwQJfRxpFAjwNUmPNOBvGQix0dUNHuV5hJIdCxNw8zpNIWT5LYRSVLAJJ1MwzSeV0tADKM0M0NMsLJIs2AWzbazaNso8WJvbR-leTRhlkExFRTXp7G0bQOmkcZXCEvxgWMtCsEIHAIC3ABlHA0CYJsSIgEgwA-YhUkIABrGbYFG8a4AKXZYnigBBKI8FhKyO2KjEVE4JrGnmRwvSuiYPLsVRJ1eB9XAEq4gsiPqBuG1aJtIeLIU5cb8CU2E0HoFaxomja8C2pJdv2spLRRI6D2TQx5jeSQeh6X4XxkBUBhse6qUmctB1e+R3sU9l4oIOghp6iIqDAABHDAUBSNLiDwEjDsA5N6SaehbhYr08SlOqBiLM7CU8kYri+Nr1CpnAaaSOmoAZzKmdZ9nObAbnef-ZH+bowWzvslVZVuFVMRgnRhZmOR3AZS6qaSHDl3pxmwSm4gZqXealvBn3iCoT2dr2g6kYTGjUbouQb1UVQ5g8IwVFkUdJiUUDdCcAE3Wmd3PeG0O-qSAH6CBvAQaSMHmSZiO4ajxHCpRw9nbGDx5kHbQvT0MkCUYrHrnkHR3C0QEgWIQho3gSoG+Kor45KvsHT0ZjWPYyROJPFpqSxhRug1Tgp6WbXgs-AhVw3LdqJtEqxRvOZBJUMeRnUAmlRvNr+-+XRMTlmkFTFAEAmz3xsvUQs9BP7qjxP8DeFIyQakUAhF2fxdAbyptlCSUApLmnnrHB+doRhKF6P8ekWhuh23qpILG6YaqT0QqfZ0VNPqDXpj9OAEDjpSF6M0CYk41StC9EYD01wmq2FlAGB4rw+yUy6ovSIqsdjq1LhfMOusOZgC5jzHhK8MQPAdNoDebhpTox6E8eqXxRi3BGM5U+E8R7F2IF7TWod9FAVsGdAk-9nSunlIPboYw5jZjmE5ekytfDeCAA */
id: 'simulation',
context: ({ input, self, spawn }) => ({
dateRangeRef: spawn('dateRangeMachine', {
id: 'dateRange',
input: {
parentRef: self,
},
}),
previewDocsFilter: 'outcome_filter_all',
previewDocuments: [],
processors: input.processors,
samples: [],
samplingCondition: composeSamplingCondition(input.processors),
streamName: input.streamName,
}),
initial: 'initializing',
on: {
'dateRange.update': '.loadingSamples',
'simulation.changePreviewDocsFilter': {
actions: [
{ type: 'storePreviewDocsFilter', params: ({ event }) => event },
{ type: 'derivePreviewDocuments' },
],
},
'simulation.reset': {
target: '.idle',
actions: [{ type: 'resetSimulation' }, { type: 'derivePreviewDocuments' }],
},
// Handle adding/reordering processors
'processors.*': {
target: '.assertingSimulationRequirements',
actions: [{ type: 'storeProcessors', params: ({ event }) => event }],
},
'processor.cancel': {
target: '.assertingSimulationRequirements',
actions: [{ type: 'storeProcessors', params: ({ event }) => event }],
},
'processor.change': {
target: '.debouncingChanges',
actions: [{ type: 'storeProcessors', params: ({ event }) => event }],
},
'processor.delete': [
{
guard: {
type: 'hasProcessors',
params: ({ event }) => ({ processors: event.processors }),
},
target: '.assertingSimulationRequirements',
actions: [{ type: 'storeProcessors', params: ({ event }) => event }],
},
{
target: '.idle',
actions: [{ type: 'resetSimulation' }, { type: 'derivePreviewDocuments' }],
},
],
},
states: {
initializing: {
always: [
{
guard: {
type: 'hasProcessors',
params: ({ context }) => ({ processors: context.processors }),
},
target: 'loadingSamples',
},
{ target: 'idle' },
],
},
idle: {},
debouncingChanges: {
on: {
'processor.change': {
target: 'debouncingChanges',
actions: [{ type: 'storeProcessors', params: ({ event }) => event }],
description: 'Re-enter debouncing state and reinitialize the delayed processing.',
reenter: true,
},
},
after: {
debounceTime: [
{
guard: 'shouldRefetchSamples',
target: 'loadingSamples',
actions: [{ type: 'deriveSamplingCondition' }],
},
{ target: 'assertingSimulationRequirements' },
],
},
},
loadingSamples: {
invoke: {
id: 'samplesFetcherActor',
src: 'fetchSamples',
input: ({ context }) => ({
condition: context.samplingCondition,
streamName: context.streamName,
absoluteTimeRange: context.dateRangeRef.getSnapshot().context.absoluteTimeRange,
}),
onDone: {
target: 'assertingSimulationRequirements',
actions: [
{ type: 'storeSamples', params: ({ event }) => ({ samples: event.output }) },
{ type: 'derivePreviewDocuments' },
],
},
onError: {
target: 'idle',
actions: [
{ type: 'storeSamples', params: () => ({ samples: [] }) },
{ type: 'notifySamplesFetchFailure' },
],
},
},
},
assertingSimulationRequirements: {
always: [
{
guard: {
type: 'canSimulate',
params: ({ context }) => ({ processors: context.processors }),
},
target: 'runningSimulation',
},
{ target: 'idle' },
],
},
runningSimulation: {
invoke: {
id: 'simulationRunnerActor',
src: 'runSimulation',
input: ({ context }) => ({
streamName: context.streamName,
documents: context.samples,
processors: context.processors,
}),
onDone: {
target: 'idle',
actions: [
{ type: 'storeSimulation', params: ({ event }) => ({ simulation: event.output }) },
{ type: 'derivePreviewDocuments' },
],
},
onError: {
target: 'idle',
actions: [{ type: 'notifySimulationRunFailure' }],
},
},
},
},
});
export const createSimulationMachineImplementations = ({
data,
streamsRepositoryClient,
toasts,
}: SimulationMachineDeps): MachineImplementationsFrom<typeof simulationMachine> => ({
actors: {
fetchSamples: createSamplesFetchActor({ streamsRepositoryClient }),
runSimulation: createSimulationRunnerActor({ streamsRepositoryClient }),
dateRangeMachine: dateRangeMachine.provide(createDateRangeMachineImplementations({ data })),
},
actions: {
notifySamplesFetchFailure: createSamplesFetchFailureNofitier({ toasts }),
notifySimulationRunFailure: createSimulationRunFailureNofitier({ toasts }),
},
});

View file

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Condition, FlattenRecord } from '@kbn/streams-schema';
import { APIReturnType, StreamsRepositoryClient } from '@kbn/streams-plugin/public/api';
import { IToasts } from '@kbn/core/public';
import { DataPublicPluginStart } from '@kbn/data-plugin/public';
import {
DateRangeToParentEvent,
DateRangeActorRef,
} from '../../../../../state_management/date_range_state_machine';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
import { PreviewDocsFilterOption } from './preview_docs_filter';
export type Simulation = APIReturnType<'POST /api/streams/{name}/processing/_simulate'>;
export interface SimulationMachineDeps {
data: DataPublicPluginStart;
streamsRepositoryClient: StreamsRepositoryClient;
toasts: IToasts;
}
export type ProcessorMetrics =
Simulation['processors_metrics'][keyof Simulation['processors_metrics']];
export interface SimulationInput {
processors: ProcessorDefinitionWithUIAttributes[];
streamName: string;
}
export type SimulationEvent =
| DateRangeToParentEvent
| { type: 'simulation.changePreviewDocsFilter'; filter: PreviewDocsFilterOption }
| { type: 'simulation.reset' }
| { type: 'processors.add'; processors: ProcessorDefinitionWithUIAttributes[] }
| { type: 'processor.cancel'; processors: ProcessorDefinitionWithUIAttributes[] }
| { type: 'processor.change'; processors: ProcessorDefinitionWithUIAttributes[] }
| { type: 'processor.delete'; processors: ProcessorDefinitionWithUIAttributes[] };
export interface SimulationContext {
dateRangeRef: DateRangeActorRef;
previewDocsFilter: PreviewDocsFilterOption;
previewDocuments: FlattenRecord[];
processors: ProcessorDefinitionWithUIAttributes[];
samples: FlattenRecord[];
samplingCondition?: Condition;
simulation?: Simulation;
streamName: string;
}

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Condition, UnaryOperator, getProcessorConfig } from '@kbn/streams-schema';
import { isEmpty, uniq } from 'lodash';
import { ALWAYS_CONDITION } from '../../../../../util/condition';
import { ProcessorDefinitionWithUIAttributes, DetectedField } from '../../types';
import { PreviewDocsFilterOption } from './preview_docs_filter';
import { Simulation } from './types';
export function composeSamplingCondition(
processors: ProcessorDefinitionWithUIAttributes[]
): Condition | undefined {
if (isEmpty(processors)) {
return undefined;
}
const uniqueFields = uniq(getSourceFields(processors));
if (isEmpty(uniqueFields)) {
return ALWAYS_CONDITION;
}
const conditions = uniqueFields.map((field) => ({
field,
operator: 'exists' as UnaryOperator,
}));
return { or: conditions };
}
export function getSourceFields(processors: ProcessorDefinitionWithUIAttributes[]): string[] {
return processors.map((processor) => getProcessorConfig(processor).field.trim()).filter(Boolean);
}
export function getTableColumns(
processors: ProcessorDefinitionWithUIAttributes[],
fields: DetectedField[],
filter: PreviewDocsFilterOption
) {
const uniqueProcessorsFields = uniq(getSourceFields(processors));
if (filter === 'outcome_filter_unmatched') {
return uniqueProcessorsFields;
}
const uniqueDetectedFields = uniq(fields.map((field) => field.name));
return uniq([...uniqueProcessorsFields, ...uniqueDetectedFields]);
}
export function filterSimulationDocuments(
documents: Simulation['documents'],
filter: PreviewDocsFilterOption
) {
switch (filter) {
case 'outcome_filter_matched':
return documents.filter((doc) => doc.status === 'parsed').map((doc) => doc.value);
case 'outcome_filter_unmatched':
return documents.filter((doc) => doc.status !== 'parsed').map((doc) => doc.value);
case 'outcome_filter_all':
default:
return documents.map((doc) => doc.value);
}
}

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './use_stream_enrichment';
export * from './types';

View file

@ -0,0 +1,354 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
MachineImplementationsFrom,
assign,
forwardTo,
not,
setup,
sendTo,
stopChild,
and,
ActorRefFrom,
} from 'xstate5';
import { getPlaceholderFor } from '@kbn/xstate-utils';
import {
IngestStreamGetResponse,
isRootStreamDefinition,
isWiredStreamGetResponse,
} from '@kbn/streams-schema';
import { htmlIdGenerator } from '@elastic/eui';
import {
StreamEnrichmentContext,
StreamEnrichmentEvent,
StreamEnrichmentInput,
StreamEnrichmentServiceDependencies,
} from './types';
import { processorConverter } from '../../utils';
import {
createUpsertStreamActor,
createUpsertStreamFailureNofitier,
createUpsertStreamSuccessNofitier,
} from './upsert_stream_actor';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
import {
simulationMachine,
createSimulationMachineImplementations,
} from '../simulation_state_machine';
import { processorMachine, ProcessorActorRef } from '../processor_state_machine';
const createId = htmlIdGenerator();
export type StreamEnrichmentActorRef = ActorRefFrom<typeof streamEnrichmentMachine>;
export const streamEnrichmentMachine = setup({
types: {
input: {} as StreamEnrichmentInput,
context: {} as StreamEnrichmentContext,
events: {} as StreamEnrichmentEvent,
},
actors: {
upsertStream: getPlaceholderFor(createUpsertStreamActor),
processorMachine: getPlaceholderFor(() => processorMachine),
simulationMachine: getPlaceholderFor(() => simulationMachine),
},
actions: {
spawnSimulationMachine: assign(({ context, spawn }) => ({
simulatorRef:
context.simulatorRef ||
spawn('simulationMachine', {
id: 'simulator',
input: {
processors: getStagedProcessors(context),
streamName: context.definition.stream.name,
},
}),
})),
notifyUpsertStreamSuccess: getPlaceholderFor(createUpsertStreamSuccessNofitier),
notifyUpsertStreamFailure: getPlaceholderFor(createUpsertStreamFailureNofitier),
refreshDefinition: () => {},
storeDefinition: assign((_, params: { definition: IngestStreamGetResponse }) => ({
definition: params.definition,
})),
stopProcessors: ({ context }) => context.processorsRefs.forEach(stopChild),
setupProcessors: assign(({ self, spawn }, params: { definition: IngestStreamGetResponse }) => {
const processorsRefs = params.definition.stream.ingest.processing.map((proc) => {
const processor = processorConverter.toUIDefinition(proc);
return spawn('processorMachine', {
id: processor.id,
input: {
parentRef: self,
processor,
},
});
});
return {
initialProcessorsRefs: processorsRefs,
processorsRefs,
};
}),
addProcessor: assign(
(
{ context, spawn, self },
{ processor }: { processor: ProcessorDefinitionWithUIAttributes }
) => {
const id = createId();
return {
processorsRefs: context.processorsRefs.concat(
spawn('processorMachine', {
id,
input: {
parentRef: self,
processor: { ...processor, id },
isNew: true,
},
})
),
};
}
),
stopProcessor: stopChild((_, params: { id: string }) => params.id),
deleteProcessor: assign(({ context }, params: { id: string }) => ({
processorsRefs: context.processorsRefs.filter((proc) => proc.id !== params.id),
})),
reorderProcessors: assign((_, params: { processorsRefs: ProcessorActorRef[] }) => ({
processorsRefs: params.processorsRefs,
})),
reassignProcessors: assign(({ context }) => ({
processorsRefs: [...context.processorsRefs],
})),
forwardProcessorsEventToSimulator: sendTo(
'simulator',
({ context }, params: { type: StreamEnrichmentEvent['type'] }) => ({
type: params.type,
processors: getStagedProcessors(context),
})
),
sendResetEventToSimulator: sendTo('simulator', { type: 'simulation.reset' }),
},
guards: {
hasMultipleProcessors: ({ context }) => context.processorsRefs.length > 1,
hasStagedChanges: ({ context }) => {
const { initialProcessorsRefs, processorsRefs } = context;
return (
// Deleted processors
initialProcessorsRefs.length !== processorsRefs.length ||
// New/updated processors
processorsRefs.some((processorRef) => {
const state = processorRef.getSnapshot();
return state.matches('configured') && state.context.isUpdated;
}) ||
// Processor order changed
processorsRefs.some(
(processorRef, pos) => initialProcessorsRefs[pos]?.id !== processorRef.id
)
);
},
hasPendingDraft: ({ context }) =>
Boolean(context.processorsRefs.find((p) => p.getSnapshot().matches('draft'))),
'!hasPendingDraft': not('hasPendingDraft'),
canUpdateStream: and(['hasStagedChanges', '!hasPendingDraft']),
isStagedProcessor: ({ context }, params: { id: string }) => {
const processorRef = context.processorsRefs.find((p) => p.id === params.id);
if (!processorRef) return false;
return processorRef.getSnapshot().context.isNew;
},
isRootStream: ({ context }) => isRootStreamDefinition(context.definition.stream),
isWiredStream: ({ context }) => isWiredStreamGetResponse(context.definition),
},
}).createMachine({
/** @xstate-layout N4IgpgJg5mDOIC5RgHYCcCWBjAFgZQBc0wBDAWwDoMUMCMSAbDAL2qgGIBtABgF1FQABwD2sWhmEoBIAB6IAtAA4ATAE4K3AGzdlizQBYAjPoCsi7voA0IAJ4KAzIoomT3cwHYdixfferFAL4B1qiYuITE5FQ0dIwsbFyG-EggImJ0ktJyCErGzpqGqoaa7u7KmkWG1nYIyiYUytwmFX7mjZqK+kEh6Nj4RKSUkRA27LADUcRYYBgAbpA8yUKi4pkp2fJ13BSObib6Ksr6+o3u1QrHFO6Kqi6q+h0PBcrd4L3hE0OkIxTjkZQYCAMMBjT4UYiwMAERbSNKrKTrRDKPL6biqVTKTE6AyKExnWyIQz2PImZH7Y76Rz2VSvUJ9CKDcHfGy-MGA4Gg-4UACugggJAIYBhKThGQRoA23mUV3cRJMGMaWiK5wQxl0FBOJnsakMWmJD1p736XOGLL+jN5-LoKA4EEkYGis2EAGsHbzIWgCAzyABBLAEYRoYXLdIScWyBQuexXezcNyqdz6VSaeymFVE1TR-RlZSy3GGYp4w1hY2M02srmWgUJMBoNCBiiCBgCgBmgco7trXs+foDQb4sJWYqyDgLFEzufs2k0zWuePTqM044O2nMqh01y6wTeJe9XxIPzpuDIqAIFAgGFgTZINjYAAV69NYLBA7B2IJH3AX2hYBQDxBg1SIcwxHHJOm2bhSn2TxjmUex7BMFUp3cDVzCnApiR8SCTGLekwXLI8cBPFAzwvK9m1vG0H2EJ9vzfD8aK-V8mUDCBa0A0UQMRHJjiXLUmmpA5KQqQx8RqMwnG4QozDjbUzGKXCPhNZkKEI4jSMva9KKgajaNfd9P2fBs2OBQUOOAtYJQcfYdmg1wkzjbhqSQmzZOudxmiaAxDBw7dCL3JkDxZNTT3PTSKPvQy6IMxijLQVkSBgczQ0siMcmubZ-F8DzikaSlFBVeVo2gjpCk8dzAj8o0AoIo11LC8ibzYPAMDIblmzFGK9J-CgACpkvhUDUSueUigqfY0SkzQVXRPjvFlPF-BMYwtx6Xd8JUkKSIarTmta9rq0kLqmPi-qBxFCzw2yYaPPRYpblRdciRVTpoxW1c-EMNQ0UU0tJk2urQrI3abRatqOrDChZgwMAAHcABEBRIB8wGhuGxn2iHJChmGEahMB-UgAAxGGGAgWABuHbj5BcaVdSwtFTHKE4ZtuVDdQQ5oVFXX6aoBkt6uBiLQcxw6UBxuHEYIZHiDR2GMfBsWKFwEgbTAFG5fhmjYBJhhBX7JYgJSq6FB8fQKG8WNCng65iXsF6imcXMikcUlmkKXmNqC1TAe2oWmpFxWxQlvHBUJiASbAMm3zEIPIc1pGNdxymuKsnJ9mlMabj8eD7hudMtEMHZ3G1ODY2ze5fO3FBhDY+AUn8z5B2N0D5F1NQrgqOCvrlRRjBVNvtQaG5mljdcEPsT2uWocQ4lYG1m8G6n-CL9wu+JZEEL7+2CR47ZzDcRmNzqeDfLWvDlKCxeqbT+QEycS2pMzXwfGe3fdSXNyXALOpF0nqr1qXx+Oacg19U5pTvrGGUcoFRxhEumOCKEEylGxK0MwU8ywqRAQCIEYAwGpQ2JoOC44iGzTRCoL6Vh35yQ1KUdyGdFDuAwf9b22CeR8mrAvC6LdqYGHULKDyWoDCOFzAVd+Lh6hJlRFJbMahkyrR3BfTB3stoEHwSbHImZ95YWti-O2Kpyh8ScmvNe64mb-3PkpZRh5fYaUatpXSJ164hiXrfAokibglGTGibUjDCo6AtlzLQTkfK3DPooqxLCbECyBuFAOUAwYHRvkbVxECpQWx0c-W2b8ageRQhuPwiYiidANAApRUTgq2J2sLBJotg4J2lknOG6jW6lCXPBFa2ZZIVGmrvdEHiOYuA6OUNwzD9zRL6ILOJ2lElY3FprfG4dI7RxadTTE5tn5FUMHoTEahWYDKckM7mWgxlMhfAweYEAABKwhhDdn+KstxRxaGlFEo8PQLNd7yBOBoVwfh3ZTgsH4IIQQgA */
id: 'enrichStream',
context: ({ input }) => ({
definition: input.definition,
initialProcessorsRefs: [],
processorsRefs: [],
}),
initial: 'initializing',
states: {
initializing: {
always: [
{
target: 'resolvedRootStream',
guard: 'isRootStream',
},
{ target: 'ready' },
],
},
ready: {
id: 'ready',
type: 'parallel',
entry: [
{ type: 'stopProcessors' },
{
type: 'setupProcessors',
params: ({ context }) => ({ definition: context.definition }),
},
],
on: {
'stream.received': {
target: '#ready',
actions: [{ type: 'storeDefinition', params: ({ event }) => event }],
reenter: true,
},
},
states: {
stream: {
initial: 'idle',
states: {
idle: {
on: {
'stream.reset': {
guard: 'hasStagedChanges',
target: '#ready',
actions: [{ type: 'sendResetEventToSimulator' }],
reenter: true,
},
'stream.update': {
guard: 'canUpdateStream',
actions: [{ type: 'sendResetEventToSimulator' }],
target: 'updating',
},
},
},
updating: {
invoke: {
id: 'upsertStreamActor',
src: 'upsertStream',
input: ({ context }) => ({
definition: context.definition,
processors: context.processorsRefs
.map((proc) => proc.getSnapshot())
.filter((proc) => proc.matches('configured'))
.map((proc) => proc.context.processor),
fields: undefined, // TODO: implementing in follow-up PR
}),
onDone: {
target: 'idle',
actions: [{ type: 'notifyUpsertStreamSuccess' }, { type: 'refreshDefinition' }],
},
onError: {
target: 'idle',
actions: [{ type: 'notifyUpsertStreamFailure' }],
},
},
},
},
},
enrichment: {
type: 'parallel',
states: {
displayingProcessors: {
on: {
'processors.add': {
guard: '!hasPendingDraft',
actions: [{ type: 'addProcessor', params: ({ event }) => event }],
},
'processors.reorder': {
guard: 'hasMultipleProcessors',
actions: [{ type: 'reorderProcessors', params: ({ event }) => event }],
},
'processor.delete': {
actions: [
{ type: 'stopProcessor', params: ({ event }) => event },
{ type: 'deleteProcessor', params: ({ event }) => event },
],
},
'processor.stage': {
actions: [{ type: 'reassignProcessors' }],
},
'processor.update': {
actions: [{ type: 'reassignProcessors' }],
},
},
},
displayingSimulation: {
entry: [{ type: 'spawnSimulationMachine' }],
initial: 'viewDataPreview',
on: {
'processor.change': {
guard: { type: 'isStagedProcessor', params: ({ event }) => event },
actions: [
{ type: 'forwardProcessorsEventToSimulator', params: ({ event }) => event },
],
},
'processor.*': {
actions: [
{ type: 'forwardProcessorsEventToSimulator', params: ({ event }) => event },
],
},
'processors.*': {
actions: [
{ type: 'forwardProcessorsEventToSimulator', params: ({ event }) => event },
],
},
},
states: {
viewDataPreview: {
on: {
'simulation.viewDetectedFields': 'viewDetectedFields',
'simulation.changePreviewDocsFilter': {
actions: [forwardTo('simulator')],
},
},
},
viewDetectedFields: {
on: {
'simulation.viewDataPreview': 'viewDataPreview',
},
},
},
},
},
},
},
},
resolvedRootStream: {
type: 'final',
},
},
});
export const createStreamEnrichmentMachineImplementations = ({
refreshDefinition,
streamsRepositoryClient,
core,
data,
}: StreamEnrichmentServiceDependencies): MachineImplementationsFrom<
typeof streamEnrichmentMachine
> => ({
actors: {
upsertStream: createUpsertStreamActor({ streamsRepositoryClient }),
processorMachine,
simulationMachine: simulationMachine.provide(
createSimulationMachineImplementations({
data,
streamsRepositoryClient,
toasts: core.notifications.toasts,
})
),
},
actions: {
refreshDefinition,
notifyUpsertStreamSuccess: createUpsertStreamSuccessNofitier({
toasts: core.notifications.toasts,
}),
notifyUpsertStreamFailure: createUpsertStreamFailureNofitier({
toasts: core.notifications.toasts,
}),
},
});
function getStagedProcessors(context: StreamEnrichmentContext) {
return context.processorsRefs
.map((proc) => proc.getSnapshot())
.filter((proc) => proc.context.isNew)
.map((proc) => proc.context.processor);
}

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CoreStart } from '@kbn/core/public';
import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api';
import { IngestStreamGetResponse } from '@kbn/streams-schema';
import { DataPublicPluginStart } from '@kbn/data-plugin/public';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
import { ProcessorActorRef, ProcessorToParentEvent } from '../processor_state_machine';
import { PreviewDocsFilterOption, SimulationActorRef } from '../simulation_state_machine';
export interface StreamEnrichmentServiceDependencies {
refreshDefinition: () => void;
streamsRepositoryClient: StreamsRepositoryClient;
core: CoreStart;
data: DataPublicPluginStart;
}
export interface StreamEnrichmentInput {
definition: IngestStreamGetResponse;
}
export interface StreamEnrichmentContext {
definition: IngestStreamGetResponse;
initialProcessorsRefs: ProcessorActorRef[];
processorsRefs: ProcessorActorRef[];
simulatorRef?: SimulationActorRef;
}
export type StreamEnrichmentEvent =
| ProcessorToParentEvent
| { type: 'stream.received'; definition: IngestStreamGetResponse }
| { type: 'stream.reset' }
| { type: 'stream.update' }
| { type: 'simulation.viewDataPreview' }
| { type: 'simulation.viewDetectedFields' }
| { type: 'simulation.changePreviewDocsFilter'; filter: PreviewDocsFilterOption }
| { type: 'processors.add'; processor: ProcessorDefinitionWithUIAttributes }
| { type: 'processors.reorder'; processorsRefs: ProcessorActorRef[] };

View file

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
FieldDefinition,
IngestStreamGetResponse,
isWiredStreamGetResponse,
} from '@kbn/streams-schema';
import { ErrorActorEvent, fromPromise } from 'xstate5';
import { errors as esErrors } from '@elastic/elasticsearch';
import { APIReturnType } from '@kbn/streams-plugin/public/api';
import { IToasts } from '@kbn/core/public';
import { i18n } from '@kbn/i18n';
import { StreamEnrichmentServiceDependencies } from './types';
import { processorConverter } from '../../utils';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
export type UpsertStreamResponse = APIReturnType<'PUT /api/streams/{name}/_ingest'>;
export interface UpsertStreamInput {
definition: IngestStreamGetResponse;
processors: ProcessorDefinitionWithUIAttributes[];
fields?: FieldDefinition;
}
export function createUpsertStreamActor({
streamsRepositoryClient,
}: Pick<StreamEnrichmentServiceDependencies, 'streamsRepositoryClient'>) {
return fromPromise<UpsertStreamResponse, UpsertStreamInput>(({ input, signal }) => {
return streamsRepositoryClient.fetch(`PUT /api/streams/{name}/_ingest`, {
signal,
params: {
path: {
name: input.definition.stream.name,
},
body: isWiredStreamGetResponse(input.definition)
? {
ingest: {
...input.definition.stream.ingest,
processing: input.processors.map(processorConverter.toAPIDefinition),
...(input.fields && {
wired: { ...input.definition.stream.ingest.wired, fields: input.fields },
}),
},
}
: {
ingest: {
...input.definition.stream.ingest,
processing: input.processors.map(processorConverter.toAPIDefinition),
},
},
},
});
});
}
export const createUpsertStreamSuccessNofitier =
({ toasts }: { toasts: IToasts }) =>
() => {
toasts.addSuccess(
i18n.translate('xpack.streams.streamDetailView.managementTab.enrichment.saveChangesSuccess', {
defaultMessage: "Stream's processors updated",
})
);
};
export const createUpsertStreamFailureNofitier =
({ toasts }: { toasts: IToasts }) =>
(params: { event: unknown }) => {
const event = params.event as ErrorActorEvent<esErrors.ResponseError, string>;
toasts.addError(new Error(event.error.body.message), {
title: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.saveChangesError',
{ defaultMessage: "An issue occurred saving processors' changes." }
),
toastMessage: event.error.body.message,
});
};

View file

@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { useEffect, useMemo } from 'react';
import { createActorContext, useSelector } from '@xstate5/react';
import { createConsoleInspector } from '@kbn/xstate-utils';
import {
streamEnrichmentMachine,
createStreamEnrichmentMachineImplementations,
} from './stream_enrichment_state_machine';
import { StreamEnrichmentInput, StreamEnrichmentServiceDependencies } from './types';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
import { ProcessorActorRef } from '../processor_state_machine';
import { PreviewDocsFilterOption, SimulationActorSnapshot } from '../simulation_state_machine';
const consoleInspector = createConsoleInspector();
const StreamEnrichmentContext = createActorContext(streamEnrichmentMachine);
export const useStreamsEnrichmentSelector = StreamEnrichmentContext.useSelector;
export type StreamEnrichmentEvents = ReturnType<typeof useStreamEnrichmentEvents>;
export const useStreamEnrichmentEvents = () => {
const service = StreamEnrichmentContext.useActorRef();
return useMemo(
() => ({
addProcessor: (processor: ProcessorDefinitionWithUIAttributes) => {
service.send({ type: 'processors.add', processor });
},
reorderProcessors: (processorsRefs: ProcessorActorRef[]) => {
service.send({ type: 'processors.reorder', processorsRefs });
},
resetChanges: () => {
service.send({ type: 'stream.reset' });
},
saveChanges: () => {
service.send({ type: 'stream.update' });
},
viewSimulationPreviewData: () => {
service.send({ type: 'simulation.viewDataPreview' });
},
viewSimulationDetectedFields: () => {
service.send({ type: 'simulation.viewDetectedFields' });
},
changePreviewDocsFilter: (filter: PreviewDocsFilterOption) => {
service.send({ type: 'simulation.changePreviewDocsFilter', filter });
},
}),
[service]
);
};
export const StreamEnrichmentContextProvider = ({
children,
definition,
...deps
}: React.PropsWithChildren<StreamEnrichmentServiceDependencies & StreamEnrichmentInput>) => {
return (
<StreamEnrichmentContext.Provider
logic={streamEnrichmentMachine.provide(createStreamEnrichmentMachineImplementations(deps))}
options={{
id: 'streamEnrichment',
inspect: consoleInspector,
input: {
definition,
},
}}
>
<ListenForDefinitionChanges definition={definition}>{children}</ListenForDefinitionChanges>
</StreamEnrichmentContext.Provider>
);
};
const ListenForDefinitionChanges = ({
children,
definition,
}: React.PropsWithChildren<StreamEnrichmentInput>) => {
const service = StreamEnrichmentContext.useActorRef();
useEffect(() => {
service.send({ type: 'stream.received', definition });
}, [definition, service]);
return children;
};
export const useSimulatorRef = () => {
return useStreamsEnrichmentSelector((state) => state.context.simulatorRef);
};
export const useSimulatorSelector = <T,>(selector: (snapshot: SimulationActorSnapshot) => T): T => {
const simulationRef = useSimulatorRef();
if (!simulationRef) {
throw new Error('useSimulatorSelector must be used within a StreamEnrichmentContextProvider');
}
return useSelector(simulationRef, selector);
};

View file

@ -16,7 +16,6 @@ import {
export type WithUIAttributes<T extends ProcessorDefinition> = T & {
id: string;
type: ProcessorTypeOf<T>;
status: 'draft' | 'saved' | 'updated';
};
export type ProcessorDefinitionWithUIAttributes = WithUIAttributes<ProcessorDefinition>;

View file

@ -48,11 +48,13 @@ const defaultProcessorFormStateByType: Record<ProcessorType, ProcessorFormState>
grok: defaultGrokProcessorFormState,
};
export const getDefaultFormState = (
type: ProcessorType,
export const getDefaultFormStateByType = (type: ProcessorType) =>
defaultProcessorFormStateByType[type];
export const getFormStateFrom = (
processor?: ProcessorDefinitionWithUIAttributes
): ProcessorFormState => {
if (!processor) return defaultProcessorFormStateByType[type];
if (!processor) return defaultGrokProcessorFormState;
if (isGrokProcessor(processor)) {
const { grok } = processor;
@ -73,7 +75,7 @@ export const getDefaultFormState = (
});
}
throw new Error(`Default state not found for unsupported processor type: ${type}`);
throw new Error(`Form state for processor type "${processor.type}" is not implemented.`);
};
export const convertFormStateToProcessor = (formState: ProcessorFormState): ProcessorDefinition => {
@ -124,25 +126,22 @@ export const isDissectProcessor = createProcessorGuardByType('dissect');
const createId = htmlIdGenerator();
const toUIDefinition = <TProcessorDefinition extends ProcessorDefinition>(
processor: TProcessorDefinition,
uiAttributes: Partial<Pick<WithUIAttributes<TProcessorDefinition>, 'status'>> = {}
processor: TProcessorDefinition
): ProcessorDefinitionWithUIAttributes => ({
id: createId(),
status: 'saved',
type: getProcessorType(processor),
...uiAttributes,
...processor,
});
const toAPIDefinition = (processor: ProcessorDefinitionWithUIAttributes): ProcessorDefinition => {
const { id, status, type, ...processorConfig } = processor;
const { id, type, ...processorConfig } = processor;
return processorConfig;
};
const toSimulateDefinition = (
processor: ProcessorDefinitionWithUIAttributes
): ProcessorDefinitionWithId => {
const { status, type, ...processorConfig } = processor;
const { type, ...processorConfig } = processor;
return processorConfig;
};

View file

@ -12,11 +12,9 @@ import { ClassicStreamDetailManagement } from './classic';
export function StreamDetailManagement({
definition,
refreshDefinition,
isLoadingDefinition,
}: {
definition?: IngestStreamGetResponse;
refreshDefinition: () => void;
isLoadingDefinition: boolean;
}) {
if (!definition) {
return null;
@ -24,11 +22,7 @@ export function StreamDetailManagement({
if (isWiredStreamGetResponse(definition)) {
return (
<WiredStreamDetailManagement
definition={definition}
refreshDefinition={refreshDefinition}
isLoadingDefinition={isLoadingDefinition}
/>
<WiredStreamDetailManagement definition={definition} refreshDefinition={refreshDefinition} />
);
}

View file

@ -24,11 +24,9 @@ function isValidManagementSubTab(value: string): value is ManagementSubTabs {
export function WiredStreamDetailManagement({
definition,
refreshDefinition,
isLoadingDefinition,
}: {
definition?: WiredStreamGetResponse;
refreshDefinition: () => void;
isLoadingDefinition: boolean;
}) {
const {
path: { key, subtab },
@ -53,11 +51,7 @@ export function WiredStreamDetailManagement({
},
schemaEditor: {
content: (
<StreamDetailSchemaEditor
definition={definition}
refreshDefinition={refreshDefinition}
isLoadingDefinition={isLoadingDefinition}
/>
<StreamDetailSchemaEditor definition={definition} refreshDefinition={refreshDefinition} />
),
label: i18n.translate('xpack.streams.streamDetailView.schemaEditorTab', {
defaultMessage: 'Schema editor',

View file

@ -6,13 +6,13 @@
*/
import React from 'react';
import { WiredStreamGetResponse, isRootStreamDefinition } from '@kbn/streams-schema';
import { useStreamDetail } from '../../../hooks/use_stream_detail';
import { SchemaEditor } from '../schema_editor';
import { useSchemaFields } from '../schema_editor/hooks/use_schema_fields';
interface SchemaEditorProps {
definition?: WiredStreamGetResponse;
refreshDefinition: () => void;
isLoadingDefinition: boolean;
}
export function StreamDetailSchemaEditor(props: SchemaEditorProps) {
@ -20,11 +20,9 @@ export function StreamDetailSchemaEditor(props: SchemaEditorProps) {
return <Content definition={props.definition} {...props} />;
}
const Content = ({
definition,
refreshDefinition,
isLoadingDefinition,
}: Required<SchemaEditorProps>) => {
const Content = ({ definition, refreshDefinition }: Required<SchemaEditorProps>) => {
const { loading } = useStreamDetail();
const { fields, isLoadingUnmappedFields, refreshFields, unmapField, updateField } =
useSchemaFields({
definition,
@ -34,7 +32,7 @@ const Content = ({
return (
<SchemaEditor
fields={fields}
isLoading={isLoadingDefinition || isLoadingUnmappedFields}
isLoading={loading || isLoadingUnmappedFields}
stream={definition.stream}
onFieldUnmap={unmapField}
onFieldUpdate={updateField}

View file

@ -5,109 +5,57 @@
* 2.0.
*/
import { i18n } from '@kbn/i18n';
import { isUnwiredStreamGetResponse, isWiredStreamGetResponse } from '@kbn/streams-schema';
import React from 'react';
import { useKibana } from '../../hooks/use_kibana';
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';
import { useStreamsAppParams } from '../../hooks/use_streams_app_params';
import { EntityDetailViewWithoutParams, EntityViewTab } from '../entity_detail_view';
import { StreamDetailDashboardsView } from '../stream_detail_dashboards_view';
import { StreamDetailManagement } from '../data_management/stream_detail_management';
import { StreamDetailOverview } from '../stream_detail_overview';
import { StreamDetailContextProvider, useStreamDetail } from '../../hooks/use_stream_detail';
export function StreamDetailView() {
const params1 = useStreamsAppParams('/{key}/{tab}', true);
const { streamsRepositoryClient } = useKibana().dependencies.start.streams;
const params1 = useStreamsAppParams('/{key}/{tab}', true);
const params2 = useStreamsAppParams('/{key}/management/{subtab}', true);
const key = params1?.path?.key || params2.path.key;
const name = params1?.path?.key || params2.path.key;
const tab = params1?.path?.tab || 'management';
const {
dependencies: {
start: {
streams: { streamsRepositoryClient },
},
},
} = useKibana();
const {
value: streamEntity,
refresh,
loading,
} = useStreamsAppFetch(
async ({ signal }) => {
return streamsRepositoryClient
.fetch('GET /api/streams/{name}', {
signal,
params: {
path: {
name: key,
},
},
})
.then((response) => {
if (isWiredStreamGetResponse(response)) {
return {
dashboards: response.dashboards,
inherited_fields: response.inherited_fields,
elasticsearch_assets: [],
effective_lifecycle: response.effective_lifecycle,
name: key,
stream: {
...response.stream,
},
};
}
if (isUnwiredStreamGetResponse(response)) {
return {
dashboards: response.dashboards,
elasticsearch_assets: response.elasticsearch_assets,
inherited_fields: {},
effective_lifecycle: response.effective_lifecycle,
name: key,
data_stream_exists: response.data_stream_exists,
stream: {
...response.stream,
},
};
}
throw new Error('Stream detail only supports IngestStreams.');
});
},
[streamsRepositoryClient, key]
return (
<StreamDetailContextProvider name={name} streamsRepositoryClient={streamsRepositoryClient}>
<StreamDetailViewContent name={name} tab={tab} />
</StreamDetailContextProvider>
);
}
export function StreamDetailViewContent({ name, tab }: { name: string; tab: string }) {
const { definition, refresh } = useStreamDetail();
const entity = {
id: key,
displayName: key,
id: name,
displayName: name,
};
const tabs: EntityViewTab[] = [
{
name: 'overview',
content: <StreamDetailOverview definition={streamEntity} />,
content: <StreamDetailOverview definition={definition} />,
label: i18n.translate('xpack.streams.streamDetailView.overviewTab', {
defaultMessage: 'Overview',
}),
},
{
name: 'dashboards',
content: <StreamDetailDashboardsView definition={streamEntity} />,
content: <StreamDetailDashboardsView definition={definition} />,
label: i18n.translate('xpack.streams.streamDetailView.dashboardsTab', {
defaultMessage: 'Dashboards',
}),
},
{
name: 'management',
content: (
<StreamDetailManagement
definition={streamEntity}
refreshDefinition={refresh}
isLoadingDefinition={loading}
/>
),
content: <StreamDetailManagement definition={definition} refreshDefinition={refresh} />,
label: i18n.translate('xpack.streams.streamDetailView.managementTab', {
defaultMessage: 'Management',
}),
@ -118,7 +66,7 @@ export function StreamDetailView() {
<EntityDetailViewWithoutParams
tabs={tabs}
entity={entity}
definition={streamEntity}
definition={definition}
selectedTab={tab}
/>
);

View file

@ -9,6 +9,10 @@ import { i18n } from '@kbn/i18n';
import { OverlayModalConfirmOptions } from '@kbn/core/public';
import { useKibana } from './use_kibana';
export interface DiscardPromptOptions extends OverlayModalConfirmOptions {
message: string;
}
const defaultMessage = i18n.translate('xpack.streams.cancelModal.message', {
defaultMessage: 'Are you sure you want to discard your changes?',
});

View file

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React from 'react';
import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api';
import {
IngestStreamGetResponse,
isWiredStreamGetResponse,
isUnwiredStreamGetResponse,
} from '@kbn/streams-schema';
import { useStreamsAppFetch } from './use_streams_app_fetch';
export interface StreamDetailContextProviderProps {
name: string;
streamsRepositoryClient: StreamsRepositoryClient;
}
export interface StreamDetailContextValue {
definition?: IngestStreamGetResponse;
loading: boolean;
refresh: () => void;
}
const StreamDetailContext = React.createContext<StreamDetailContextValue | undefined>(undefined);
export function StreamDetailContextProvider({
name,
streamsRepositoryClient,
children,
}: React.PropsWithChildren<StreamDetailContextProviderProps>) {
const {
value: definition,
loading,
refresh,
} = useStreamsAppFetch(
async ({ signal }) => {
return streamsRepositoryClient
.fetch('GET /api/streams/{name}', {
signal,
params: {
path: {
name,
},
},
})
.then((response) => {
if (isWiredStreamGetResponse(response) || isUnwiredStreamGetResponse(response)) {
return response;
}
throw new Error('Stream detail only supports IngestStreams.');
});
},
[streamsRepositoryClient, name]
);
const context = React.useMemo(
() => ({ definition, loading, refresh }),
[definition, loading, refresh]
);
return <StreamDetailContext.Provider value={context}>{children}</StreamDetailContext.Provider>;
}
export function useStreamDetail() {
const ctx = React.useContext(StreamDetailContext);
if (!ctx) {
throw new Error('useStreamDetail must be used within a StreamDetailContextProvider');
}
return ctx;
}

View file

@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
MachineImplementationsFrom,
assertEvent,
fromObservable,
enqueueActions,
setup,
assign,
ActorRefFrom,
} from 'xstate5';
import type { DataPublicPluginStart, TimefilterContract } from '@kbn/data-plugin/public';
import { getPlaceholderFor } from '@kbn/xstate-utils';
import { DateRangeContext, DateRangeEvent, DateRangeInput } from './types';
export type DateRangeActorRef = ActorRefFrom<typeof dateRangeMachine>;
export const dateRangeMachine = setup({
types: {
context: {} as DateRangeContext,
events: {} as DateRangeEvent,
input: {} as DateRangeInput,
},
actors: {
subscribeTimeUpdates: getPlaceholderFor(createTimeUpdatesActor),
},
actions: {
setTimeUpdates: () => {
throw new Error('Not implemented');
},
storeTimeUpdates: () => {
throw new Error('Not implemented');
},
notifyDateRangeUpdate: enqueueActions(({ enqueue, context }) => {
if (context.parentRef) {
enqueue.sendTo(context.parentRef, { type: 'dateRange.update' });
}
}),
},
}).createMachine({
/** @xstate-layout N4IgpgJg5mDOIC5QQIYBcwCUUDsYGJUNs8wA6AVwAciwBtABgF1FQqB7WASzS-Z1YgAHogBMANgCsZACwBmAOySANCACeiGQEYGZBQE5R+8QA4lAX3OraJAjdwwyAJzAAzF7AAWjFkhAduXn5BEQQAWjkZMn1jBn1FFXVEOX0FMjktcRkGcVFJSysQHHYIOEF7UkEAnj4BP1CtBVUNBFE5cTITSX15cQY5SS1JXPyCoA */
id: 'dateRange',
context: ({ input }) => ({
parentRef: input.parentRef,
timeRange: {
from: '',
to: '',
},
absoluteTimeRange: {
start: 0,
end: 0,
},
}),
entry: 'storeTimeUpdates',
invoke: {
id: 'dateRangeSubscriptionActor',
src: 'subscribeTimeUpdates',
onSnapshot: {
actions: [{ type: 'storeTimeUpdates' }, { type: 'notifyDateRangeUpdate' }],
},
},
on: {
'dateRange.update': {
actions: [{ type: 'setTimeUpdates' }],
},
'dateRange.refresh': {
actions: [{ type: 'storeTimeUpdates' }, { type: 'notifyDateRangeUpdate' }],
},
},
});
export const createDateRangeMachineImplementations = ({
data,
}: {
data: DataPublicPluginStart;
}): MachineImplementationsFrom<typeof dateRangeMachine> => ({
actors: {
subscribeTimeUpdates: createTimeUpdatesActor({ data }),
},
actions: {
setTimeUpdates: ({ event }: { event: DateRangeEvent }) => {
assertEvent(event, 'dateRange.update');
data.query.timefilter.timefilter.setTime(event.range);
},
storeTimeUpdates: assign(() => getTimeContextFromService(data.query.timefilter.timefilter)),
},
});
function createTimeUpdatesActor({ data }: { data: DataPublicPluginStart }) {
return fromObservable(() => data.query.timefilter.timefilter.getTimeUpdate$());
}
function getTimeContextFromService(timefilter: TimefilterContract) {
return {
timeRange: timefilter.getTime(),
absoluteTimeRange: {
start: new Date(timefilter.getAbsoluteTime().from).getTime(),
end: new Date(timefilter.getAbsoluteTime().to).getTime(),
},
};
}

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './date_range_state_machine';
export * from './types';

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { TimeRange } from '@kbn/es-query';
import { ActorRef, Snapshot } from 'xstate5';
export interface DateRangeToParentEvent {
type: 'dateRange.update';
}
export type DateRangeParentActor = ActorRef<Snapshot<unknown>, DateRangeToParentEvent>;
export interface DateRangeContext {
parentRef?: DateRangeParentActor;
timeRange: TimeRange;
absoluteTimeRange: {
start?: number;
end?: number;
};
}
export interface DateRangeInput {
parentRef?: DateRangeParentActor;
}
export type DateRangeEvent =
| { type: 'dateRange.refresh' }
| { type: 'dateRange.update'; range: TimeRange };

View file

@ -58,5 +58,6 @@
"@kbn/traced-es-client",
"@kbn/licensing-plugin",
"@kbn/datemath",
"@kbn/xstate-utils",
]
}