🌊 Show detected field types for classic streams enrichment (#222579)

Adds "detected fields" tab for classic streams enrichment editor

<img width="1005" alt="Screenshot 2025-06-04 at 17 58 57"
src="https://github.com/user-attachments/assets/3f3bc959-a27d-4e53-af96-153f0cd0fb54"
/>

This PR adds the "detected fields" tab for classic streams by fetching
the actual Elasticsearch field type from field caps and showing it along
with the detected fields. This currently doesn't work for fields that
are not mapped yet but would get added as part of the simulation
(Elasticsearch feature request here:
https://github.com/elastic/elasticsearch/issues/128760 ).

This adds a new column "Elasticsearch field type" to the schema editor
table. For wired streams, this column is not relevant at all, but it can
be helpful for classic streams to highlight the non-managed parts.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Joe Reuter 2025-06-05 13:45:32 +02:00 committed by GitHub
parent 8c7d1751c5
commit 36699ad4ae
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 117 additions and 55 deletions

View file

@ -18,6 +18,7 @@ import {
IndicesIndexState, IndicesIndexState,
SimulateIngestResponse, SimulateIngestResponse,
SimulateIngestSimulateIngestDocumentResult, SimulateIngestSimulateIngestDocumentResult,
FieldCapsResponse,
} from '@elastic/elasticsearch/lib/api/types'; } from '@elastic/elasticsearch/lib/api/types';
import { IScopedClusterClient } from '@kbn/core/server'; import { IScopedClusterClient } from '@kbn/core/server';
import { flattenObjectNestedLast, calculateObjectDiff } from '@kbn/object-utils'; import { flattenObjectNestedLast, calculateObjectDiff } from '@kbn/object-utils';
@ -138,10 +139,10 @@ export type IngestSimulationResult =
}; };
export type DetectedField = export type DetectedField =
| WithName | WithNameAndEsType
| WithName<FieldDefinitionConfig | InheritedFieldDefinitionConfig>; | WithNameAndEsType<FieldDefinitionConfig | InheritedFieldDefinitionConfig>;
export type WithName<TObj = {}> = TObj & { name: string }; export type WithNameAndEsType<TObj = {}> = TObj & { name: string; esType?: string };
export type WithRequired<TObj, TKey extends keyof TObj> = TObj & { [TProp in TKey]-?: TObj[TProp] }; export type WithRequired<TObj, TKey extends keyof TObj> = TObj & { [TProp in TKey]-?: TObj[TProp] };
export const simulateProcessing = async ({ export const simulateProcessing = async ({
@ -150,15 +151,20 @@ export const simulateProcessing = async ({
streamsClient, streamsClient,
}: SimulateProcessingDeps) => { }: SimulateProcessingDeps) => {
/* 0. Retrieve required data to prepare the simulation */ /* 0. Retrieve required data to prepare the simulation */
const [stream, streamIndex] = await Promise.all([ const [stream, { indexState: streamIndexState, fieldCaps: streamIndexFieldCaps }] =
streamsClient.getStream(params.path.name), await Promise.all([
getStreamIndex(scopedClusterClient, streamsClient, params.path.name), streamsClient.getStream(params.path.name),
]); getStreamIndex(scopedClusterClient, streamsClient, params.path.name),
]);
/* 1. Prepare data for either simulation types (ingest, pipeline), prepare simulation body for the mandatory pipeline simulation */ /* 1. Prepare data for either simulation types (ingest, pipeline), prepare simulation body for the mandatory pipeline simulation */
const simulationData = prepareSimulationData(params); const simulationData = prepareSimulationData(params);
const pipelineSimulationBody = preparePipelineSimulationBody(simulationData); const pipelineSimulationBody = preparePipelineSimulationBody(simulationData);
const ingestSimulationBody = prepareIngestSimulationBody(simulationData, streamIndex, params); const ingestSimulationBody = prepareIngestSimulationBody(
simulationData,
streamIndexState,
params
);
/** /**
* 2. Run both pipeline and ingest simulations in parallel. * 2. Run both pipeline and ingest simulations in parallel.
* - The pipeline simulation is used to extract the documents reports and the processor metrics. This always runs. * - The pipeline simulation is used to extract the documents reports and the processor metrics. This always runs.
@ -189,7 +195,12 @@ export const simulateProcessing = async ({
); );
/* 5. Extract valid detected fields asserting existing mapped fields from stream and ancestors */ /* 5. Extract valid detected fields asserting existing mapped fields from stream and ancestors */
const detectedFields = await computeDetectedFields(processorsMetrics, params, streamFields); const detectedFields = await computeDetectedFields(
processorsMetrics,
params,
streamFields,
streamIndexFieldCaps
);
/* 6. Derive general insights and process final response body */ /* 6. Derive general insights and process final response body */
return prepareSimulationResponse(docReports, processorsMetrics, detectedFields); return prepareSimulationResponse(docReports, processorsMetrics, detectedFields);
@ -744,18 +755,30 @@ const getStreamIndex = async (
scopedClusterClient: IScopedClusterClient, scopedClusterClient: IScopedClusterClient,
streamsClient: StreamsClient, streamsClient: StreamsClient,
streamName: string streamName: string
): Promise<IndicesIndexState> => { ): Promise<{
indexState: IndicesIndexState;
fieldCaps: FieldCapsResponse['fields'];
}> => {
const dataStream = await streamsClient.getDataStream(streamName); const dataStream = await streamsClient.getDataStream(streamName);
const lastIndex = dataStream.indices.at(-1); const lastIndexRef = dataStream.indices.at(-1);
if (!lastIndex) { if (!lastIndexRef) {
throw new Error(`No writing index found for stream ${streamName}`); throw new Error(`No writing index found for stream ${streamName}`);
} }
const lastIndexMapping = await scopedClusterClient.asCurrentUser.indices.get({ const [lastIndex, lastIndexFieldCaps] = await Promise.all([
index: lastIndex.index_name, scopedClusterClient.asCurrentUser.indices.get({
}); index: lastIndexRef.index_name,
}),
scopedClusterClient.asCurrentUser.fieldCaps({
index: lastIndexRef.index_name,
fields: '*',
}),
]);
return lastIndexMapping[lastIndex.index_name]; return {
indexState: lastIndex[lastIndexRef.index_name],
fieldCaps: lastIndexFieldCaps.fields,
};
}; };
const getStreamFields = async ( const getStreamFields = async (
@ -780,7 +803,8 @@ const getStreamFields = async (
const computeDetectedFields = async ( const computeDetectedFields = async (
processorsMetrics: Record<string, ProcessorMetrics>, processorsMetrics: Record<string, ProcessorMetrics>,
params: ProcessingSimulationParams, params: ProcessingSimulationParams,
streamFields: FieldDefinition streamFields: FieldDefinition,
streamFieldCaps: FieldCapsResponse['fields']
): Promise<DetectedField[]> => { ): Promise<DetectedField[]> => {
const fields = Object.values(processorsMetrics).flatMap((metrics) => metrics.detected_fields); const fields = Object.values(processorsMetrics).flatMap((metrics) => metrics.detected_fields);
@ -799,7 +823,11 @@ const computeDetectedFields = async (
return { name, ...existingField }; return { name, ...existingField };
} }
return { name, type: confirmedValidDetectedFields[name]?.type }; const existingFieldCaps = Object.keys(streamFieldCaps[name] || {});
const esType = existingFieldCaps.length > 0 ? existingFieldCaps[0] : undefined;
return { name, type: confirmedValidDetectedFields[name]?.type, esType };
}); });
}; };

View file

@ -25,6 +25,7 @@ export function SchemaEditor({
withControls = false, withControls = false,
withFieldSimulation = false, withFieldSimulation = false,
withTableActions = false, withTableActions = false,
withToolbar = true,
}: SchemaEditorProps) { }: SchemaEditorProps) {
const [controls, updateControls] = useControls(); const [controls, updateControls] = useControls();
@ -50,6 +51,7 @@ export function SchemaEditor({
)} )}
<FieldsTable <FieldsTable
controls={controls} controls={controls}
withToolbar={withToolbar}
defaultColumns={defaultColumns} defaultColumns={defaultColumns}
fields={fields} fields={fields}
stream={stream} stream={stream}

View file

@ -13,6 +13,8 @@ import {
EuiDataGrid, EuiDataGrid,
EuiDataGridCellProps, EuiDataGridCellProps,
EuiDataGridControlColumn, EuiDataGridControlColumn,
EuiIconTip,
EuiFlexGroup,
} from '@elastic/eui'; } from '@elastic/eui';
import { i18n } from '@kbn/i18n'; import { i18n } from '@kbn/i18n';
import { Streams } from '@kbn/streams-schema'; import { Streams } from '@kbn/streams-schema';
@ -31,12 +33,14 @@ export function FieldsTable({
fields, fields,
stream, stream,
withTableActions, withTableActions,
withToolbar,
}: { }: {
controls: TControls; controls: TControls;
defaultColumns: TableColumnName[]; defaultColumns: TableColumnName[];
fields: SchemaField[]; fields: SchemaField[];
stream: Streams.ingest.all.Definition; stream: Streams.ingest.all.Definition;
withTableActions: boolean; withTableActions: boolean;
withToolbar: boolean;
}) { }) {
// Column visibility // Column visibility
const [visibleColumns, setVisibleColumns] = useState<string[]>(defaultColumns); const [visibleColumns, setVisibleColumns] = useState<string[]>(defaultColumns);
@ -75,7 +79,7 @@ export function FieldsTable({
canDragAndDropColumns: false, canDragAndDropColumns: false,
}} }}
sorting={{ columns: sortingColumns, onSort: setSortingColumns }} sorting={{ columns: sortingColumns, onSort: setSortingColumns }}
toolbarVisibility={true} toolbarVisibility={withToolbar}
rowCount={filteredFields.length} rowCount={filteredFields.length}
renderCellValue={RenderCellValue} renderCellValue={RenderCellValue}
trailingControlColumns={trailingColumns} trailingControlColumns={trailingColumns}
@ -100,7 +104,26 @@ const createCellRenderer =
const { parent, status } = field; const { parent, status } = field;
if (columnId === 'type') { if (columnId === 'type') {
if (!field.type) return EMPTY_CONTENT; if (!field.type) {
if (field.status === 'unmapped' && field.esType) {
return (
<EuiFlexGroup alignItems="center" gutterSize="xs" responsive={false}>
{field.esType}
<EuiIconTip
content={i18n.translate(
'xpack.streams.streamDetailSchemaEditorFieldsTableTypeEsTypeTooltip',
{
defaultMessage:
'This field is not managed by Streams, but is defined in Elasticsearch. It can be controlled via the underlying index template and component templates available in the "Advanced" tab.',
}
)}
position="right"
/>
</EuiFlexGroup>
);
}
return EMPTY_CONTENT;
}
return <FieldType type={field.type} aliasFor={field.alias_for} />; return <FieldType type={field.type} aliasFor={field.alias_for} />;
} }

View file

@ -31,6 +31,10 @@ export interface MappedSchemaField extends BaseSchemaField {
export interface UnmappedSchemaField extends BaseSchemaField { export interface UnmappedSchemaField extends BaseSchemaField {
status: 'unmapped'; status: 'unmapped';
type?: SchemaFieldType; type?: SchemaFieldType;
/**
* Elasticsearch-level type of the field - only available for fields of classic streams that are not mapped through streams but from the underlying index.
*/
esType?: string;
additionalParameters?: FieldDefinitionConfigAdvancedParameters; additionalParameters?: FieldDefinitionConfigAdvancedParameters;
} }
@ -47,6 +51,7 @@ export interface SchemaEditorProps {
withControls?: boolean; withControls?: boolean;
withFieldSimulation?: boolean; withFieldSimulation?: boolean;
withTableActions?: boolean; withTableActions?: boolean;
withToolbar?: boolean;
} }
export const isSchemaFieldTyped = (field: SchemaField): field is MappedSchemaField => { export const isSchemaFieldTyped = (field: SchemaField): field is MappedSchemaField => {

View file

@ -16,7 +16,7 @@ import { SchemaField } from '../schema_editor/types';
import { useStreamEnrichmentEvents } from './state_management/stream_enrichment_state_machine'; import { useStreamEnrichmentEvents } from './state_management/stream_enrichment_state_machine';
interface DetectedFieldsEditorProps { interface DetectedFieldsEditorProps {
definition: Streams.WiredStream.GetResponse; definition: Streams.ingest.all.GetResponse;
detectedFields: SchemaField[]; detectedFields: SchemaField[];
} }
@ -24,6 +24,7 @@ export const DetectedFieldsEditor = ({ definition, detectedFields }: DetectedFie
const { euiTheme } = useEuiTheme(); const { euiTheme } = useEuiTheme();
const { mapField, unmapField } = useStreamEnrichmentEvents(); const { mapField, unmapField } = useStreamEnrichmentEvents();
const isWiredStream = Streams.WiredStream.GetResponse.is(definition);
const hasFields = detectedFields.length > 0; const hasFields = detectedFields.length > 0;
@ -49,26 +50,32 @@ export const DetectedFieldsEditor = ({ definition, detectedFields }: DetectedFie
return ( return (
<> <>
<EuiText {isWiredStream && (
component="p" <EuiText
color="subdued" component="p"
size="xs" color="subdued"
css={css` size="xs"
margin-bottom: ${euiTheme.size.base}; css={css`
`} margin-bottom: ${euiTheme.size.base};
> `}
{i18n.translate( >
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFieldsHeadline', {i18n.translate(
{ defaultMessage: 'You can review and adjust saved fields further in the Schema Editor.' } 'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFieldsHeadline',
)} {
</EuiText> defaultMessage:
'You can review and adjust saved fields further in the Schema Editor.',
}
)}
</EuiText>
)}
<SchemaEditor <SchemaEditor
defaultColumns={['name', 'type', 'format', 'status']} defaultColumns={isWiredStream ? ['name', 'type', 'format', 'status'] : ['name', 'type']}
fields={detectedFields} fields={detectedFields}
stream={definition.stream} stream={definition.stream}
onFieldUnmap={unmapField} onFieldUnmap={unmapField}
onFieldUpdate={mapField} onFieldUpdate={mapField}
withTableActions withTableActions={isWiredStream}
withToolbar={isWiredStream}
/> />
</> </>
); );

View file

@ -15,7 +15,6 @@ import {
EuiTab, EuiTab,
EuiTabs, EuiTabs,
} from '@elastic/eui'; } from '@elastic/eui';
import { Streams } from '@kbn/streams-schema';
import { ProcessorOutcomePreview } from './processor_outcome_preview'; import { ProcessorOutcomePreview } from './processor_outcome_preview';
import { import {
useSimulatorSelector, useSimulatorSelector,
@ -47,7 +46,6 @@ export const SimulationPlayground = () => {
); );
const definition = useStreamsEnrichmentSelector((state) => state.context.definition); const definition = useStreamsEnrichmentSelector((state) => state.context.definition);
const canViewDetectedFields = Streams.WiredStream.GetResponse.is(definition);
return ( return (
<> <>
@ -59,28 +57,26 @@ export const SimulationPlayground = () => {
{ defaultMessage: 'Data preview' } { defaultMessage: 'Data preview' }
)} )}
</EuiTab> </EuiTab>
{canViewDetectedFields && ( <EuiTab
<EuiTab isSelected={isViewingDetectedFields}
isSelected={isViewingDetectedFields} onClick={viewSimulationDetectedFields}
onClick={viewSimulationDetectedFields} append={
append={ detectedFields.length > 0 ? (
detectedFields.length > 0 ? ( <EuiNotificationBadge size="m">{detectedFields.length}</EuiNotificationBadge>
<EuiNotificationBadge size="m">{detectedFields.length}</EuiNotificationBadge> ) : undefined
) : undefined }
} >
> {i18n.translate(
{i18n.translate( 'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFields',
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFields', { defaultMessage: 'Detected fields' }
{ defaultMessage: 'Detected fields' } )}
)} </EuiTab>
</EuiTab>
)}
</EuiTabs> </EuiTabs>
{isLoading && <EuiProgress size="xs" color="accent" position="absolute" />} {isLoading && <EuiProgress size="xs" color="accent" position="absolute" />}
</EuiFlexItem> </EuiFlexItem>
<EuiSpacer size="m" /> <EuiSpacer size="m" />
{isViewingDataPreview && <ProcessorOutcomePreview />} {isViewingDataPreview && <ProcessorOutcomePreview />}
{isViewingDetectedFields && canViewDetectedFields && ( {isViewingDetectedFields && (
<DetectedFieldsEditor definition={definition} detectedFields={detectedFields} /> <DetectedFieldsEditor definition={definition} detectedFields={detectedFields} />
)} )}
</> </>

View file

@ -111,6 +111,7 @@ export function getSchemaFieldsFromSimulation(
// Detected field still unmapped // Detected field still unmapped
return { return {
status: 'unmapped', status: 'unmapped',
esType: field.esType,
name: field.name, name: field.name,
parent: streamName, parent: streamName,
}; };