🌊 Streams: Use better default field (#217478)

This PR passes the current sample documents to the default form state
generation for new processors to pick a good default field.

The logic that's actually employed for `dissect` and `grok` is the
following:
* Go through all docs and order string fields occurring by how many
values they have
* Pick the top one from a list of "well known" fields that probably make
sense (in case of a tie, go by a the ordering of the well known fields)
* If no field is found this way, just leave it empty - this still shows
the full table and the user can pick the field they care about

Especially for otel this should be helpful.
This commit is contained in:
Joe Reuter 2025-04-15 14:29:09 +02:00 committed by GitHub
parent dee4dfbe59
commit e6cdba65ed
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 233 additions and 28 deletions

View file

@ -23,7 +23,7 @@ import {
import { useSelector } from '@xstate5/react';
import { i18n } from '@kbn/i18n';
import { isEmpty } from 'lodash';
import React, { useEffect, useMemo } from 'react';
import React, { useEffect, useMemo, useCallback } from 'react';
import { useForm, SubmitHandler, FormProvider, useWatch } from 'react-hook-form';
import { css } from '@emotion/react';
import { DiscardPromptOptions, useDiscardConfirm } from '../../../../hooks/use_discard_confirm';
@ -46,11 +46,13 @@ import {
useStreamsEnrichmentSelector,
useSimulatorSelector,
StreamEnrichmentContextType,
useGetStreamEnrichmentState,
} from '../state_management/stream_enrichment_state_machine';
import { ProcessorMetrics } from '../state_management/simulation_state_machine';
import { DateProcessorForm } from './date';
import { ConfigDrivenProcessorFields } from './config_driven/components/fields';
import { ConfigDrivenProcessorType } from './config_driven/types';
import { selectPreviewDocuments } from '../state_management/simulation_state_machine/selectors';
export function AddProcessorPanel() {
const { euiTheme } = useEuiTheme();
@ -63,20 +65,31 @@ export function AddProcessorPanel() {
const processorMetrics = useSimulatorSelector(
(state) => processorRef && state.context.simulation?.processors_metrics[processorRef.id]
);
const getEnrichmentState = useGetStreamEnrichmentState();
const isOpen = Boolean(processorRef);
const defaultValuesGetter = useCallback(
() =>
getDefaultFormStateByType(
'grok',
selectPreviewDocuments(getEnrichmentState().context.simulatorRef?.getSnapshot().context)
),
[getEnrichmentState]
);
const initialDefaultValues = useMemo(() => defaultValuesGetter(), [defaultValuesGetter]);
const defaultValues = useMemo(() => getDefaultFormStateByType('grok'), []);
const methods = useForm<ProcessorFormState>({ defaultValues, mode: 'onChange' });
const methods = useForm<ProcessorFormState>({
defaultValues: initialDefaultValues,
mode: 'onChange',
});
const type = useWatch({ control: methods.control, name: 'type' });
useEffect(() => {
if (!processorRef) {
methods.reset(defaultValues);
methods.reset(defaultValuesGetter());
}
}, [defaultValues, methods, processorRef]);
}, [defaultValuesGetter, methods, processorRef]);
useEffect(() => {
if (processorRef) {
@ -99,6 +112,8 @@ export function AddProcessorPanel() {
};
const handleOpen = () => {
const defaultValues = defaultValuesGetter();
methods.reset(defaultValues);
const draftProcessor = createDraftProcessorFromForm(defaultValues);
addProcessor(draftProcessor);
};
@ -207,6 +222,7 @@ export interface EditProcessorPanelProps {
export function EditProcessorPanel({ processorRef, processorMetrics }: EditProcessorPanelProps) {
const { euiTheme } = useEuiTheme();
const state = useSelector(processorRef, (s) => s);
const getEnrichmentState = useGetStreamEnrichmentState();
const canEdit = useStreamsEnrichmentSelector((s) => s.context.definition.privileges.simulate);
const previousProcessor = state.context.previousProcessor;
const processor = state.context.processor;
@ -217,7 +233,14 @@ export function EditProcessorPanel({ processorRef, processorMetrics }: EditProce
const isNew = state.context.isNew;
const isUnsaved = isNew || state.context.isUpdated;
const defaultValues = useMemo(() => getFormStateFrom(processor), [processor]);
const defaultValues = useMemo(
() =>
getFormStateFrom(
selectPreviewDocuments(getEnrichmentState().context.simulatorRef?.getSnapshot().context),
processor
),
[getEnrichmentState, processor]
);
const methods = useForm<ProcessorFormState>({
defaultValues,
@ -239,11 +262,16 @@ export function EditProcessorPanel({ processorRef, processorMetrics }: EditProce
useEffect(() => {
const subscription = processorRef.on('processor.changesDiscarded', () => {
methods.reset(getFormStateFrom(previousProcessor));
methods.reset(
getFormStateFrom(
selectPreviewDocuments(getEnrichmentState().context.simulatorRef?.getSnapshot().context),
previousProcessor
)
);
});
return () => subscription.unsubscribe();
}, [methods, previousProcessor, processorRef]);
}, [getEnrichmentState, methods, previousProcessor, processorRef]);
const handleCancel = useDiscardConfirm(
() => processorRef?.send({ type: 'processor.cancel' }),

View file

@ -15,6 +15,8 @@ import { useKibana } from '../../../../hooks/use_kibana';
import { getDefaultFormStateByType } from '../utils';
import { ProcessorFormState } from '../types';
import { configDrivenProcessors } from './config_driven';
import { useGetStreamEnrichmentState } from '../state_management/stream_enrichment_state_machine';
import { selectPreviewDocuments } from '../state_management/simulation_state_machine/selectors';
interface TAvailableProcessor {
type: ProcessorType;
@ -29,6 +31,7 @@ export const ProcessorTypeSelector = ({
}: Pick<EuiSuperSelectProps, 'disabled'>) => {
const { core } = useKibana();
const esDocUrl = core.docLinks.links.elasticsearch.docsBase;
const getEnrichmentState = useGetStreamEnrichmentState();
const { reset } = useFormContext();
const { field, fieldState } = useController<ProcessorFormState, 'type'>({
@ -39,7 +42,10 @@ export const ProcessorTypeSelector = ({
const processorType = useWatch<{ type: ProcessorType }>({ name: 'type' });
const handleChange = (type: ProcessorType) => {
const formState = getDefaultFormStateByType(type);
const formState = getDefaultFormStateByType(
type,
selectPreviewDocuments(getEnrichmentState().context.simulatorRef?.getSnapshot().context)
);
reset(formState);
};

View file

@ -20,15 +20,15 @@ const EMPTY_ARRAY: [] = [];
*/
export const selectPreviewDocuments = createSelector(
[
(context: SimulationContext) => context.samples,
(context: SimulationContext) => context.previewDocsFilter,
(context: SimulationContext) => context.simulation?.documents,
(context: SimulationContext | undefined) => context?.samples,
(context: SimulationContext | undefined) => context?.previewDocsFilter,
(context: SimulationContext | undefined) => context?.simulation?.documents,
],
(samples, previewDocsFilter, documents) => {
return (
((previewDocsFilter && documents
? filterSimulationDocuments(documents, previewDocsFilter)
: samples.map(flattenObjectNestedLast)) as FlattenRecord[]) || EMPTY_ARRAY
: samples?.map(flattenObjectNestedLast)) as FlattenRecord[]) || EMPTY_ARRAY
);
}
);

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import React, { useEffect, useMemo } from 'react';
import React, { useCallback, useEffect, useMemo } from 'react';
import { createActorContext, useSelector } from '@xstate5/react';
import { createConsoleInspector } from '@kbn/xstate-utils';
import {
@ -26,6 +26,11 @@ export const useStreamsEnrichmentSelector = StreamEnrichmentContext.useSelector;
export type StreamEnrichmentEvents = ReturnType<typeof useStreamEnrichmentEvents>;
export const useGetStreamEnrichmentState = () => {
const service = StreamEnrichmentContext.useActorRef();
return useCallback(() => service.getSnapshot(), [service]);
};
export const useStreamEnrichmentEvents = () => {
const service = StreamEnrichmentContext.useActorRef();

View file

@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { FlattenRecord } from '@kbn/streams-schema';
import { getDefaultFormStateByType } from './utils';
import { ALWAYS_CONDITION } from '../../../util/condition';
describe('utils', () => {
describe('defaultGrokProcessorFormState', () => {
it('should return default form state with empty field when no well known text fields are present', () => {
const sampleDocs: FlattenRecord[] = [
{
'unknown.field': 'some value',
'another.field': 'another value',
},
{
'random.field': 'random value',
},
];
const result = getDefaultFormStateByType('grok', sampleDocs);
expect(result).toEqual({
type: 'grok',
field: '',
patterns: [{ value: '' }],
pattern_definitions: {},
ignore_failure: true,
ignore_missing: true,
if: ALWAYS_CONDITION,
});
});
it('should select the only well known text field present', () => {
const sampleDocs: FlattenRecord[] = [
{
'error.message': 'This is an error',
'another.field': 'another value',
},
{
'error.message': 'Another error',
'random.field': 'random value',
},
];
const result = getDefaultFormStateByType('grok', sampleDocs);
expect(result).toEqual({
type: 'grok',
field: 'error.message',
patterns: [{ value: '' }],
pattern_definitions: {},
ignore_failure: true,
ignore_missing: true,
if: ALWAYS_CONDITION,
});
});
it('should select the most common well known text field when multiple are present', () => {
const sampleDocs: FlattenRecord[] = [
{
message: 'Log message 1',
'error.message': 'Error message 1',
},
{
message: 'Log message 2',
'error.message': 'Error message 2',
},
{
'error.message': 'Error message 3',
},
];
const result = getDefaultFormStateByType('grok', sampleDocs);
expect(result).toEqual({
type: 'grok',
field: 'error.message', // 'error.message' appears 3 times vs 'message' 2 times
patterns: [{ value: '' }],
pattern_definitions: {},
ignore_failure: true,
ignore_missing: true,
if: ALWAYS_CONDITION,
});
});
it('should select based on WELL_KNOWN_TEXT_FIELDS order when frequencies are equal', () => {
const sampleDocs: FlattenRecord[] = [
{
message: 'Log message 1',
'error.message': 'Error message 1',
'event.original': 'Original event 1',
},
{
message: 'Log message 2',
'error.message': 'Error message 2',
'event.original': 'Original event 2',
},
];
const result = getDefaultFormStateByType('grok', sampleDocs);
// In WELL_KNOWN_TEXT_FIELDS, 'message' comes before 'error.message' and 'event.original'
expect(result).toEqual({
type: 'grok',
field: 'message',
patterns: [{ value: '' }],
pattern_definitions: {},
ignore_failure: true,
ignore_missing: true,
if: ALWAYS_CONDITION,
});
});
});
});

View file

@ -8,6 +8,7 @@
/* eslint-disable @typescript-eslint/naming-convention */
import {
FlattenRecord,
ProcessorDefinition,
ProcessorDefinitionWithId,
ProcessorType,
@ -35,7 +36,7 @@ import {
*/
export const SPECIALISED_TYPES = ['date', 'dissect', 'grok'];
const defaultDateProcessorFormState: DateFormState = {
const defaultDateProcessorFormState: () => DateFormState = () => ({
type: 'date',
field: '',
formats: [],
@ -45,48 +46,94 @@ const defaultDateProcessorFormState: DateFormState = {
output_format: '',
ignore_failure: true,
if: ALWAYS_CONDITION,
});
const WELL_KNOWN_TEXT_FIELDS = [
'message',
'body.text',
'error.message',
'event.original',
'attributes.exception.message',
];
const getDefaultTextField = (sampleDocs: FlattenRecord[]) => {
const stringFieldCounts = sampleDocs
.map((doc) =>
Object.keys(doc).filter(
(key) => doc[key] && typeof doc[key] === 'string' && WELL_KNOWN_TEXT_FIELDS.includes(key)
)
)
.reduce((acc, keys) => {
keys.forEach((key) => {
acc[key] = (acc[key] || 0) + 1;
});
return acc;
}, {} as Record<string, number>);
// sort by count descending first, then by order of field in WELL_KNOWN_TEXT_FIELDS
const sortedFields = Object.entries(stringFieldCounts).sort(
([fieldA, countA], [fieldB, countB]) => {
const countSorting = countB - countA;
if (countSorting !== 0) {
return countSorting;
}
const indexA = WELL_KNOWN_TEXT_FIELDS.indexOf(fieldA);
const indexB = WELL_KNOWN_TEXT_FIELDS.indexOf(fieldB);
return indexA - indexB;
}
);
const mostCommonField = sortedFields[0];
return mostCommonField ? mostCommonField[0] : '';
};
const defaultDissectProcessorFormState: DissectFormState = {
const defaultDissectProcessorFormState: (sampleDocs: FlattenRecord[]) => DissectFormState = (
sampleDocs: FlattenRecord[]
) => ({
type: 'dissect',
field: 'message',
field: getDefaultTextField(sampleDocs),
pattern: '',
ignore_failure: true,
ignore_missing: true,
if: ALWAYS_CONDITION,
};
});
const defaultGrokProcessorFormState: GrokFormState = {
const defaultGrokProcessorFormState: (sampleDocs: FlattenRecord[]) => GrokFormState = (
sampleDocs: FlattenRecord[]
) => ({
type: 'grok',
field: 'message',
field: getDefaultTextField(sampleDocs),
patterns: [{ value: '' }],
pattern_definitions: {},
ignore_failure: true,
ignore_missing: true,
if: ALWAYS_CONDITION,
};
});
const configDrivenDefaultFormStates = mapValues(
configDrivenProcessors,
(config) => config.defaultFormState
(config) => () => config.defaultFormState
) as {
[TKey in ConfigDrivenProcessorType]: ConfigDrivenProcessors[TKey]['defaultFormState'];
[TKey in ConfigDrivenProcessorType]: () => ConfigDrivenProcessors[TKey]['defaultFormState'];
};
const defaultProcessorFormStateByType: Record<ProcessorType, ProcessorFormState> = {
const defaultProcessorFormStateByType: Record<
ProcessorType,
(sampleDocs: FlattenRecord[]) => ProcessorFormState
> = {
date: defaultDateProcessorFormState,
dissect: defaultDissectProcessorFormState,
grok: defaultGrokProcessorFormState,
...configDrivenDefaultFormStates,
};
export const getDefaultFormStateByType = (type: ProcessorType) =>
defaultProcessorFormStateByType[type];
export const getDefaultFormStateByType = (type: ProcessorType, sampleDocuments: FlattenRecord[]) =>
defaultProcessorFormStateByType[type](sampleDocuments);
export const getFormStateFrom = (
sampleDocuments: FlattenRecord[],
processor?: ProcessorDefinitionWithUIAttributes
): ProcessorFormState => {
if (!processor) return defaultGrokProcessorFormState;
if (!processor) return defaultGrokProcessorFormState(sampleDocuments);
if (isGrokProcessor(processor)) {
const { grok } = processor;