[8.x] [Streams 🌊] Enrichment - Add Schema editor on simulation outcome (#215824) (#216836)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Streams 🌊] Enrichment - Add Schema editor on simulation outcome
(#215824)](https://github.com/elastic/kibana/pull/215824)

<!--- Backport version: 9.6.6 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sorenlouv/backport)

<!--BACKPORT [{"author":{"name":"Marco Antonio
Ghiani","email":"marcoantonio.ghiani01@gmail.com"},"sourceCommit":{"committedDate":"2025-04-02T13:22:18Z","message":"[Streams
🌊] Enrichment - Add Schema editor on simulation outcome (#215824)\n\n##
📓 Summary\n\nCloses
https://github.com/elastic/streams-program/issues/70\n\nThis work embed
the Schema Editor into the enrichment part, such that\ndetected fields
during the simulation can be directly mapped and saved\nwith the newly
created
processors.\n\n\nhttps://github.com/user-attachments/assets/09a3fe48-4bfc-4501-8c2c-133b1290d884\n\n---------\n\nCo-authored-by:
kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"9268afecf7935c2cdfb97e07f110a51968446557","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team:obs-ux-logs","backport:version","Feature:Streams","v9.1.0","v8.19.0"],"title":"[Streams
🌊] Enrichment - Add Schema editor on simulation
outcome","number":215824,"url":"https://github.com/elastic/kibana/pull/215824","mergeCommit":{"message":"[Streams
🌊] Enrichment - Add Schema editor on simulation outcome (#215824)\n\n##
📓 Summary\n\nCloses
https://github.com/elastic/streams-program/issues/70\n\nThis work embed
the Schema Editor into the enrichment part, such that\ndetected fields
during the simulation can be directly mapped and saved\nwith the newly
created
processors.\n\n\nhttps://github.com/user-attachments/assets/09a3fe48-4bfc-4501-8c2c-133b1290d884\n\n---------\n\nCo-authored-by:
kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"9268afecf7935c2cdfb97e07f110a51968446557"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/215824","number":215824,"mergeCommit":{"message":"[Streams
🌊] Enrichment - Add Schema editor on simulation outcome (#215824)\n\n##
📓 Summary\n\nCloses
https://github.com/elastic/streams-program/issues/70\n\nThis work embed
the Schema Editor into the enrichment part, such that\ndetected fields
during the simulation can be directly mapped and saved\nwith the newly
created
processors.\n\n\nhttps://github.com/user-attachments/assets/09a3fe48-4bfc-4501-8c2c-133b1290d884\n\n---------\n\nCo-authored-by:
kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"9268afecf7935c2cdfb97e07f110a51968446557"}},{"branch":"8.x","label":"v8.19.0","branchLabelMappingKey":"^v8.19.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Marco Antonio Ghiani <marcoantonio.ghiani01@gmail.com>
This commit is contained in:
Kibana Machine 2025-04-02 17:35:37 +02:00 committed by GitHub
parent 926499f0ab
commit 0dcd49bed0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 368 additions and 62 deletions

View file

@ -673,11 +673,11 @@ const computeSkippedRate = (docs: SimulationDocReport[]) => {
const computeMappingProperties = (detectedFields: NamedFieldDefinitionConfig[]) => {
return Object.fromEntries(
detectedFields.flatMap(({ name, type }) => {
if (type === 'system') {
detectedFields.flatMap(({ name, ...config }) => {
if (config.type === 'system') {
return [];
}
return [[name, { type }]];
return [[name, config]];
})
);
};

View file

@ -103,4 +103,8 @@ export const TABLE_COLUMNS = {
defaultMessage: 'Status',
}),
},
};
} as const;
export type TableColumnName = keyof typeof TABLE_COLUMNS;
export const SUPPORTED_TABLE_COLUMN_NAMES = Object.keys(TABLE_COLUMNS) as TableColumnName[];

View file

@ -16,7 +16,7 @@ import {
} from '@elastic/eui';
import React from 'react';
import { i18n } from '@kbn/i18n';
import { WiredStreamDefinition } from '@kbn/streams-schema';
import { IngestStreamDefinition, isWiredStreamDefinition } from '@kbn/streams-schema';
import { useStreamsAppRouter } from '../../../../hooks/use_streams_app_router';
import { FieldParent } from '../field_parent';
import { FieldStatusBadge } from '../field_status';
@ -57,7 +57,7 @@ interface FieldSummaryProps {
field: SchemaField;
isEditing: boolean;
toggleEditMode: () => void;
stream: WiredStreamDefinition;
stream: IngestStreamDefinition;
onChange: (field: Partial<SchemaField>) => void;
}
@ -203,7 +203,7 @@ export const FieldSummary = (props: FieldSummaryProps) => {
<EuiHorizontalRule margin="xs" />
</EuiFlexGroup>
{isEditing && stream.ingest.wired.routing.length > 0 ? (
{isEditing && isWiredStreamDefinition(stream) && stream.ingest.wired.routing.length > 0 ? (
<EuiFlexItem grow={false}>
<ChildrenAffectedCallout childStreams={stream.ingest.wired.routing} />
</EuiFlexItem>

View file

@ -17,7 +17,7 @@ import {
} from '@elastic/eui';
import React, { useReducer } from 'react';
import { i18n } from '@kbn/i18n';
import { WiredStreamDefinition } from '@kbn/streams-schema';
import { IngestStreamDefinition } from '@kbn/streams-schema';
import useAsyncFn from 'react-use/lib/useAsyncFn';
import useToggle from 'react-use/lib/useToggle';
import { SamplePreviewTable } from './sample_preview_table';
@ -30,7 +30,7 @@ export interface SchemaEditorFlyoutProps {
isEditingByDefault?: boolean;
onClose?: () => void;
onSave: (field: SchemaField) => void;
stream: WiredStreamDefinition;
stream: IngestStreamDefinition;
withFieldSimulation?: boolean;
}

View file

@ -9,7 +9,7 @@ import React, { useMemo } from 'react';
import { css } from '@emotion/react';
import { i18n } from '@kbn/i18n';
import { EuiCallOut } from '@elastic/eui';
import { WiredStreamDefinition } from '@kbn/streams-schema';
import { IngestStreamDefinition } from '@kbn/streams-schema';
import { useKibana } from '../../../../hooks/use_kibana';
import { getFormattedError } from '../../../../util/errors';
import { useStreamsAppFetch } from '../../../../hooks/use_streams_app_fetch';
@ -19,7 +19,7 @@ import { MappedSchemaField, SchemaField, isSchemaFieldTyped } from '../types';
import { convertToFieldDefinitionConfig } from '../utils';
interface SamplePreviewTableProps {
stream: WiredStreamDefinition;
stream: IngestStreamDefinition;
nextField: SchemaField;
}

View file

@ -12,8 +12,10 @@ import { SchemaEditorProps } from './types';
import { SchemaEditorContextProvider } from './schema_editor_context';
import { Controls } from './schema_editor_controls';
import { FieldsTable } from './schema_editor_table';
import { SUPPORTED_TABLE_COLUMN_NAMES } from './constants';
export function SchemaEditor({
defaultColumns = SUPPORTED_TABLE_COLUMN_NAMES,
fields,
isLoading,
onFieldUnmap,
@ -47,8 +49,9 @@ export function SchemaEditor({
<Controls controls={controls} onChange={updateControls} onRefreshData={onRefreshData} />
)}
<FieldsTable
fields={fields}
controls={controls}
defaultColumns={defaultColumns}
fields={fields}
stream={stream}
withTableActions={withTableActions}
/>

View file

@ -15,9 +15,9 @@ import {
EuiDataGridControlColumn,
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { WiredStreamDefinition } from '@kbn/streams-schema';
import { IngestStreamDefinition } from '@kbn/streams-schema';
import { isEmpty } from 'lodash';
import { TABLE_COLUMNS, EMPTY_CONTENT } from './constants';
import { TABLE_COLUMNS, EMPTY_CONTENT, TableColumnName } from './constants';
import { FieldActionsCell } from './field_actions';
import { FieldParent } from './field_parent';
import { FieldStatusBadge } from './field_status';
@ -26,18 +26,20 @@ import { SchemaField } from './types';
import { FieldType } from './field_type';
export function FieldsTable({
fields,
controls,
defaultColumns,
fields,
stream,
withTableActions,
}: {
fields: SchemaField[];
controls: TControls;
stream: WiredStreamDefinition;
defaultColumns: TableColumnName[];
fields: SchemaField[];
stream: IngestStreamDefinition;
withTableActions: boolean;
}) {
// Column visibility
const [visibleColumns, setVisibleColumns] = useState(Object.keys(TABLE_COLUMNS));
const [visibleColumns, setVisibleColumns] = useState<string[]>(defaultColumns);
// Column sorting
const [sortingColumns, setSortingColumns] = useState<EuiDataGridColumnSortingConfig[]>([]);
@ -88,7 +90,10 @@ export function FieldsTable({
}
const createCellRenderer =
(fields: SchemaField[], stream: WiredStreamDefinition): EuiDataGridCellProps['renderCellValue'] =>
(
fields: SchemaField[],
stream: IngestStreamDefinition
): EuiDataGridCellProps['renderCellValue'] =>
({ rowIndex, columnId }) => {
const field = fields[rowIndex];
if (!field) return null;

View file

@ -8,8 +8,9 @@
import {
FieldDefinitionConfig,
FieldDefinitionConfigAdvancedParameters,
WiredStreamDefinition,
IngestStreamDefinition,
} from '@kbn/streams-schema';
import { TableColumnName } from './constants';
export type SchemaFieldStatus = 'inherited' | 'mapped' | 'unmapped';
export type SchemaFieldType = FieldDefinitionConfig['type'];
@ -35,12 +36,13 @@ export interface UnmappedSchemaField extends BaseSchemaField {
export type SchemaField = MappedSchemaField | UnmappedSchemaField;
export interface SchemaEditorProps {
defaultColumns?: TableColumnName[];
fields: SchemaField[];
isLoading?: boolean;
onFieldUnmap: (fieldName: SchemaField['name']) => void;
onFieldUpdate: (field: SchemaField) => void;
onRefreshData?: () => void;
stream: WiredStreamDefinition;
stream: IngestStreamDefinition;
withControls?: boolean;
withFieldSimulation?: boolean;
withTableActions?: boolean;

View file

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React from 'react';
import { useEuiTheme, EuiEmptyPrompt, EuiText } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { css } from '@emotion/react';
import { WiredStreamGetResponse } from '@kbn/streams-schema';
import { AssetImage } from '../../asset_image';
import { SchemaEditor } from '../schema_editor';
import { SchemaField } from '../schema_editor/types';
import { useStreamEnrichmentEvents } from './state_management/stream_enrichment_state_machine';
interface DetectedFieldsEditorProps {
definition: WiredStreamGetResponse;
detectedFields: SchemaField[];
}
export const DetectedFieldsEditor = ({ definition, detectedFields }: DetectedFieldsEditorProps) => {
const { euiTheme } = useEuiTheme();
const { mapField, unmapField } = useStreamEnrichmentEvents();
const hasFields = detectedFields.length > 0;
if (!hasFields) {
return (
<EuiEmptyPrompt
titleSize="xs"
icon={<AssetImage type="noResults" />}
body={
<p>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFields.noResults.content',
{
defaultMessage:
'No fields were detected during the simulation. You can add fields manually in the Schema Editor.',
}
)}
</p>
}
/>
);
}
return (
<>
<EuiText
component="p"
color="subdued"
size="xs"
css={css`
margin-bottom: ${euiTheme.size.base};
`}
>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFieldsHeadline',
{ defaultMessage: 'You can review and adjust saved fields further in the Schema Editor.' }
)}
</EuiText>
<SchemaEditor
defaultColumns={['name', 'type', 'format', 'status']}
fields={detectedFields}
stream={definition.stream}
onFieldUnmap={unmapField}
onFieldUpdate={mapField}
withTableActions
/>
</>
);
};

View file

@ -7,15 +7,26 @@
import React from 'react';
import { i18n } from '@kbn/i18n';
import { EuiFlexItem, EuiSpacer, EuiTab, EuiTabs } from '@elastic/eui';
import {
EuiFlexItem,
EuiNotificationBadge,
EuiProgress,
EuiSpacer,
EuiTab,
EuiTabs,
} from '@elastic/eui';
import { isWiredStreamGetResponse } from '@kbn/streams-schema';
import { ProcessorOutcomePreview } from './processor_outcome_preview';
import {
useSimulatorSelector,
useStreamEnrichmentEvents,
useStreamsEnrichmentSelector,
} from './state_management/stream_enrichment_state_machine';
import { DetectedFieldsEditor } from './detected_fields_editor';
export const SimulationPlayground = () => {
const { viewSimulationPreviewData, viewSimulationDetectedFields } = useStreamEnrichmentEvents();
const isViewingDataPreview = useStreamsEnrichmentSelector((state) =>
state.matches({
ready: { enrichment: { displayingSimulation: 'viewDataPreview' } },
@ -26,11 +37,17 @@ export const SimulationPlayground = () => {
ready: { enrichment: { displayingSimulation: 'viewDetectedFields' } },
})
);
const canViewDetectedFields = useStreamsEnrichmentSelector((state) =>
isWiredStreamGetResponse(state.context.definition)
const detectedFields = useSimulatorSelector((state) => state.context.detectedSchemaFields);
const isLoading = useSimulatorSelector(
(state) =>
state.matches('debouncingChanges') ||
state.matches('loadingSamples') ||
state.matches('runningSimulation')
);
const { viewSimulationPreviewData, viewSimulationDetectedFields } = useStreamEnrichmentEvents();
const definition = useStreamsEnrichmentSelector((state) => state.context.definition);
const canViewDetectedFields = isWiredStreamGetResponse(definition);
return (
<>
@ -43,7 +60,15 @@ export const SimulationPlayground = () => {
)}
</EuiTab>
{canViewDetectedFields && (
<EuiTab isSelected={isViewingDetectedFields} onClick={viewSimulationDetectedFields}>
<EuiTab
isSelected={isViewingDetectedFields}
onClick={viewSimulationDetectedFields}
append={
detectedFields.length > 0 ? (
<EuiNotificationBadge size="m">{detectedFields.length}</EuiNotificationBadge>
) : undefined
}
>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFields',
{ defaultMessage: 'Detected fields' }
@ -51,13 +76,13 @@ export const SimulationPlayground = () => {
</EuiTab>
)}
</EuiTabs>
{isLoading && <EuiProgress size="xs" color="accent" position="absolute" />}
</EuiFlexItem>
<EuiSpacer size="m" />
{isViewingDataPreview && <ProcessorOutcomePreview />}
{isViewingDetectedFields &&
i18n.translate('xpack.streams.simulationPlayground.div.detectedFieldsLabel', {
defaultMessage: 'WIP',
})}
{isViewingDetectedFields && canViewDetectedFields && (
<DetectedFieldsEditor definition={definition} detectedFields={detectedFields} />
)}
</>
);
};

View file

@ -9,12 +9,17 @@ import { i18n } from '@kbn/i18n';
import { FlattenRecord } from '@kbn/streams-schema';
import { fromPromise, ErrorActorEvent } from 'xstate5';
import { errors as esErrors } from '@elastic/elasticsearch';
import { isEmpty } from 'lodash';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
import { processorConverter } from '../../utils';
import { Simulation, SimulationMachineDeps } from './types';
import { SchemaField } from '../../../schema_editor/types';
import { getMappedSchemaFields } from './utils';
import { convertToFieldDefinitionConfig } from '../../../schema_editor/utils';
export interface SimulationRunnerInput {
streamName: string;
detectedFields?: SchemaField[];
documents: FlattenRecord[];
processors: ProcessorDefinitionWithUIAttributes[];
}
@ -30,6 +35,13 @@ export function createSimulationRunnerActor({
body: {
documents: input.documents,
processing: input.processors.map(processorConverter.toSimulateDefinition),
detected_fields:
input.detectedFields && !isEmpty(input.detectedFields)
? getMappedSchemaFields(input.detectedFields).map((field) => ({
name: field.name,
...convertToFieldDefinitionConfig(field),
}))
: undefined,
},
},
})

View file

@ -36,7 +36,13 @@ import {
createSimulationRunnerActor,
createSimulationRunFailureNofitier,
} from './simulation_runner_actor';
import { composeSamplingCondition } from './utils';
import {
composeSamplingCondition,
getSchemaFieldsFromSimulation,
mapField,
unmapField,
} from './utils';
import { MappedSchemaField } from '../../../schema_editor/types';
export type SimulationActorRef = ActorRefFrom<typeof simulationMachine>;
export type SimulationActorSnapshot = SnapshotFrom<typeof simulationMachine>;
@ -81,8 +87,24 @@ export const simulationMachine = setup({
deriveSamplingCondition: assign(({ context }) => ({
samplingCondition: composeSamplingCondition(context.processors),
})),
deriveDetectedSchemaFields: assign(({ context }) => ({
detectedSchemaFields: context.simulation
? getSchemaFieldsFromSimulation(
context.simulation.detected_fields,
context.detectedSchemaFields,
context.streamName
)
: context.detectedSchemaFields,
})),
mapField: assign(({ context }, params: { field: MappedSchemaField }) => ({
detectedSchemaFields: mapField(context.detectedSchemaFields, params.field),
})),
unmapField: assign(({ context }, params: { fieldName: string }) => ({
detectedSchemaFields: unmapField(context.detectedSchemaFields, params.fieldName),
})),
resetSimulation: assign({
processors: [],
detectedSchemaFields: [],
simulation: undefined,
samplingCondition: composeSamplingCondition([]),
previewDocsFilter: 'outcome_filter_all',
@ -113,6 +135,7 @@ export const simulationMachine = setup({
parentRef: self,
},
}),
detectedSchemaFields: [],
previewDocsFilter: 'outcome_filter_all',
previewDocuments: [],
processors: input.processors,
@ -172,7 +195,17 @@ export const simulationMachine = setup({
],
},
idle: {},
idle: {
on: {
'simulation.fields.map': {
target: 'assertingSimulationRequirements',
actions: [{ type: 'mapField', params: ({ event }) => event }],
},
'simulation.fields.unmap': {
actions: [{ type: 'unmapField', params: ({ event }) => event }],
},
},
},
debouncingChanges: {
on: {
@ -239,11 +272,13 @@ export const simulationMachine = setup({
streamName: context.streamName,
documents: context.samples.map(flattenObjectNestedLast) as FlattenRecord[],
processors: context.processors,
detectedFields: context.detectedSchemaFields,
}),
onDone: {
target: 'idle',
actions: [
{ type: 'storeSimulation', params: ({ event }) => ({ simulation: event.output }) },
{ type: 'deriveDetectedSchemaFields' },
],
},
onError: {

View file

@ -15,8 +15,10 @@ import {
} from '../../../../../state_management/date_range_state_machine';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
import { PreviewDocsFilterOption } from './preview_docs_filter';
import { MappedSchemaField, SchemaField } from '../../../schema_editor/types';
export type Simulation = APIReturnType<'POST /internal/streams/{name}/processing/_simulate'>;
export type DetectedField = Simulation['detected_fields'][number];
export interface SimulationMachineDeps {
data: DataPublicPluginStart;
@ -34,15 +36,18 @@ export interface SimulationInput {
export type SimulationEvent =
| DateRangeToParentEvent
| { type: 'simulation.changePreviewDocsFilter'; filter: PreviewDocsFilterOption }
| { type: 'simulation.reset' }
| { type: 'processors.add'; processors: ProcessorDefinitionWithUIAttributes[] }
| { type: 'processor.cancel'; processors: ProcessorDefinitionWithUIAttributes[] }
| { type: 'processor.change'; processors: ProcessorDefinitionWithUIAttributes[] }
| { type: 'processor.delete'; processors: ProcessorDefinitionWithUIAttributes[] };
| { type: 'processor.delete'; processors: ProcessorDefinitionWithUIAttributes[] }
| { type: 'simulation.changePreviewDocsFilter'; filter: PreviewDocsFilterOption }
| { type: 'simulation.fields.map'; field: MappedSchemaField }
| { type: 'simulation.fields.unmap'; fieldName: string }
| { type: 'simulation.reset' };
export interface SimulationContext {
dateRangeRef: DateRangeActorRef;
detectedSchemaFields: SchemaField[];
previewDocsFilter: PreviewDocsFilterOption;
previewDocuments: FlattenRecord[];
processors: ProcessorDefinitionWithUIAttributes[];

View file

@ -5,12 +5,14 @@
* 2.0.
*/
import { Condition, UnaryOperator, getProcessorConfig } from '@kbn/streams-schema';
import { Condition, FieldDefinition, UnaryOperator, getProcessorConfig } from '@kbn/streams-schema';
import { isEmpty, uniq } from 'lodash';
import { ALWAYS_CONDITION } from '../../../../../util/condition';
import { ProcessorDefinitionWithUIAttributes, DetectedField } from '../../types';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
import { PreviewDocsFilterOption } from './preview_docs_filter';
import { Simulation } from './types';
import { DetectedField, Simulation } from './types';
import { MappedSchemaField, SchemaField, isSchemaFieldTyped } from '../../../schema_editor/types';
import { convertToFieldDefinitionConfig } from '../../../schema_editor/utils';
export function composeSamplingCondition(
processors: ProcessorDefinitionWithUIAttributes[]
@ -67,3 +69,88 @@ export function filterSimulationDocuments(
return documents.map((doc) => doc.value);
}
}
export function getSchemaFieldsFromSimulation(
detectedFields: DetectedField[],
previousDetectedFields: SchemaField[],
streamName: string
) {
const previousDetectedFieldsMap = previousDetectedFields.reduce<Record<string, SchemaField>>(
(acc, field) => {
acc[field.name] = field;
return acc;
},
{}
);
const schemaFields: SchemaField[] = detectedFields.map((field) => {
// Detected field already mapped by the user on previous simulation
if (previousDetectedFieldsMap[field.name]) {
return previousDetectedFieldsMap[field.name];
}
// Detected field already inherited
if ('from' in field) {
return {
...field,
status: 'inherited',
parent: field.from,
};
}
// Detected field already mapped
if ('type' in field) {
return {
...field,
status: 'mapped',
parent: streamName,
};
}
// Detected field still unmapped
return {
status: 'unmapped',
name: field.name,
parent: streamName,
};
});
return schemaFields.sort(compareFieldsByStatus);
}
const statusOrder = { inherited: 0, mapped: 1, unmapped: 2 };
const compareFieldsByStatus = (curr: SchemaField, next: SchemaField) => {
return statusOrder[curr.status] - statusOrder[next.status];
};
export function mapField(
schemaFields: SchemaField[],
updatedField: MappedSchemaField
): SchemaField[] {
return schemaFields.map((field) => {
if (field.name !== updatedField.name) return field;
return { ...updatedField, status: 'mapped' };
});
}
export function unmapField(schemaFields: SchemaField[], fieldName: string): SchemaField[] {
return schemaFields.map((field) => {
if (field.name !== fieldName) return field;
return { ...field, status: 'unmapped' };
});
}
export function getMappedSchemaFields(fields: SchemaField[]) {
return fields.filter(isSchemaFieldTyped).filter((field) => field.status === 'mapped');
}
export function getUnmappedSchemaFields(fields: SchemaField[]) {
return fields.filter((field) => field.status === 'unmapped');
}
export function convertToFieldDefinition(fields: MappedSchemaField[]): FieldDefinition {
return fields.reduce(
(mappedFields, field) =>
Object.assign(mappedFields, { [field.name]: convertToFieldDefinitionConfig(field) }),
{}
);
}

View file

@ -14,6 +14,7 @@ import {
stopChild,
and,
ActorRefFrom,
raise,
} from 'xstate5';
import { getPlaceholderFor } from '@kbn/xstate-utils';
import {
@ -41,6 +42,7 @@ import {
createSimulationMachineImplementations,
} from '../simulation_state_machine';
import { processorMachine, ProcessorActorRef } from '../processor_state_machine';
import { getConfiguredProcessors, getStagedProcessors, getUpsertWiredFields } from './utils';
const createId = htmlIdGenerator();
@ -187,10 +189,7 @@ export const streamEnrichmentMachine = setup({
type: 'parallel',
entry: [
{ type: 'stopProcessors' },
{
type: 'setupProcessors',
params: ({ context }) => ({ definition: context.definition }),
},
{ type: 'setupProcessors', params: ({ context }) => ({ definition: context.definition }) },
],
on: {
'stream.received': {
@ -213,7 +212,10 @@ export const streamEnrichmentMachine = setup({
},
'stream.update': {
guard: 'canUpdateStream',
actions: [{ type: 'sendResetEventToSimulator' }],
actions: [
{ type: 'sendResetEventToSimulator' },
raise({ type: 'simulation.viewDataPreview' }),
],
target: 'updating',
},
},
@ -224,11 +226,8 @@ export const streamEnrichmentMachine = setup({
src: 'upsertStream',
input: ({ context }) => ({
definition: context.definition,
processors: context.processorsRefs
.map((proc) => proc.getSnapshot())
.filter((proc) => proc.matches('configured'))
.map((proc) => proc.context.processor),
fields: undefined, // TODO: implementing in follow-up PR
processors: getConfiguredProcessors(context),
fields: getUpsertWiredFields(context),
}),
onDone: {
target: 'idle',
@ -295,13 +294,16 @@ export const streamEnrichmentMachine = setup({
on: {
'simulation.viewDetectedFields': 'viewDetectedFields',
'simulation.changePreviewDocsFilter': {
actions: [forwardTo('simulator')],
actions: forwardTo('simulator'),
},
},
},
viewDetectedFields: {
on: {
'simulation.viewDataPreview': 'viewDataPreview',
'simulation.fields.*': {
actions: forwardTo('simulator'),
},
},
},
},
@ -345,10 +347,3 @@ export const createStreamEnrichmentMachineImplementations = ({
}),
},
});
function getStagedProcessors(context: StreamEnrichmentContextType) {
return context.processorsRefs
.map((proc) => proc.getSnapshot())
.filter((proc) => proc.context.isNew)
.map((proc) => proc.context.processor);
}

View file

@ -12,6 +12,7 @@ import { DataPublicPluginStart } from '@kbn/data-plugin/public';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
import { ProcessorActorRef, ProcessorToParentEvent } from '../processor_state_machine';
import { PreviewDocsFilterOption, SimulationActorRef } from '../simulation_state_machine';
import { MappedSchemaField } from '../../../schema_editor/types';
export interface StreamEnrichmentServiceDependencies {
refreshDefinition: () => void;
@ -39,5 +40,7 @@ export type StreamEnrichmentEvent =
| { type: 'simulation.viewDataPreview' }
| { type: 'simulation.viewDetectedFields' }
| { type: 'simulation.changePreviewDocsFilter'; filter: PreviewDocsFilterOption }
| { type: 'simulation.fields.map'; field: MappedSchemaField }
| { type: 'simulation.fields.unmap'; fieldName: string }
| { type: 'processors.add'; processor: ProcessorDefinitionWithUIAttributes }
| { type: 'processors.reorder'; processorsRefs: ProcessorActorRef[] };

View file

@ -16,6 +16,7 @@ import { StreamEnrichmentInput, StreamEnrichmentServiceDependencies } from './ty
import { ProcessorDefinitionWithUIAttributes } from '../../types';
import { ProcessorActorRef } from '../processor_state_machine';
import { PreviewDocsFilterOption, SimulationActorSnapshot } from '../simulation_state_machine';
import { MappedSchemaField, SchemaField } from '../../../schema_editor/types';
const consoleInspector = createConsoleInspector();
@ -51,6 +52,12 @@ export const useStreamEnrichmentEvents = () => {
changePreviewDocsFilter: (filter: PreviewDocsFilterOption) => {
service.send({ type: 'simulation.changePreviewDocsFilter', filter });
},
mapField: (field: SchemaField) => {
service.send({ type: 'simulation.fields.map', field: field as MappedSchemaField });
},
unmapField: (fieldName: string) => {
service.send({ type: 'simulation.fields.unmap', fieldName });
},
}),
[service]
);

View file

@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { FieldDefinition, isWiredStreamGetResponse } from '@kbn/streams-schema';
import { StreamEnrichmentContextType } from './types';
import {
convertToFieldDefinition,
getMappedSchemaFields,
getUnmappedSchemaFields,
} from '../simulation_state_machine';
export function getStagedProcessors(context: StreamEnrichmentContextType) {
return context.processorsRefs
.map((proc) => proc.getSnapshot())
.filter((proc) => proc.context.isNew)
.map((proc) => proc.context.processor);
}
export function getConfiguredProcessors(context: StreamEnrichmentContextType) {
return context.processorsRefs
.map((proc) => proc.getSnapshot())
.filter((proc) => proc.matches('configured'))
.map((proc) => proc.context.processor);
}
export function getUpsertWiredFields(
context: StreamEnrichmentContextType
): FieldDefinition | undefined {
if (!isWiredStreamGetResponse(context.definition) || !context.simulatorRef) {
return undefined;
}
const originalFieldDefinition = { ...context.definition.stream.ingest.wired.fields };
const { detectedSchemaFields } = context.simulatorRef.getSnapshot().context;
// Remove unmapped fields from original definition
const unmappedSchemaFields = getUnmappedSchemaFields(detectedSchemaFields);
unmappedSchemaFields.forEach((field) => {
delete originalFieldDefinition[field.name];
});
const mappedSchemaFields = getMappedSchemaFields(detectedSchemaFields).filter(
(field) => !originalFieldDefinition[field.name]
);
const simulationMappedFieldDefinition = convertToFieldDefinition(mappedSchemaFields);
return { ...originalFieldDefinition, ...simulationMappedFieldDefinition };
}

View file

@ -8,7 +8,6 @@
import {
DateProcessorConfig,
DissectProcessorConfig,
FieldDefinitionType,
GrokProcessorConfig,
ProcessorDefinition,
ProcessorTypeOf,
@ -21,11 +20,6 @@ export type WithUIAttributes<T extends ProcessorDefinition> = T & {
export type ProcessorDefinitionWithUIAttributes = WithUIAttributes<ProcessorDefinition>;
export interface DetectedField {
name: string;
type?: FieldDefinitionType | 'system';
}
export type GrokFormState = Omit<GrokProcessorConfig, 'patterns'> & {
type: 'grok';
patterns: Array<{ value: string }>;