[Streams 🌊] Report ignored_fields error for enrichment simulation docs (#216877)

## 📓 Summary

Closes https://github.com/elastic/streams-program/issues/101

These changes update the simulation to detected ignored fields and
mapping failures.
As these failures are detected simulating an ingestion, the
`_ingest/_simulate` API won't tell us which processor caused the
failure, but we can associate it to the document.

### Wired streams

- `agent.name` is mapped with `ignore_above: 4` to simulate short string
ignored fields
- `long_number` is mapped as `long`

I tried parsing a string into `agent.name` to simulate the
ignored_fields, and similarly I tried parsing a string into
`long_number` to simulate the mapping failure.


https://github.com/user-attachments/assets/09b604da-ae45-43a6-a30a-737061ff0f90

### Classic streams

Tried mapping a word into `nginx.error.connection_id` field which is
mapped as `long`


https://github.com/user-attachments/assets/e6a1d47d-3080-452c-896a-2074f2f0c920
This commit is contained in:
Marco Antonio Ghiani 2025-04-04 16:58:45 +02:00 committed by GitHub
parent 46b000f5fa
commit 024f17a6b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 408 additions and 168 deletions

View file

@ -1,15 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StatusError } from './status_error';
export class DetectedMappingFailureError extends StatusError {
constructor(message: string) {
super(message, 400);
this.name = 'DetectedMappingFailureError';
}
}

View file

@ -12,11 +12,12 @@ import {
IngestDocument,
IngestProcessorContainer,
IngestSimulateRequest,
IngestPipelineConfig,
ClusterComponentTemplateNode,
ErrorCauseKeys,
IngestPipelineSimulation,
IngestSimulateDocumentResult,
SimulateIngestRequest,
IndicesIndexState,
SimulateIngestResponse,
SimulateIngestSimulateIngestDocumentResult,
} from '@elastic/elasticsearch/lib/api/types';
import { IScopedClusterClient } from '@kbn/core/server';
import { flattenObjectNestedLast, calculateObjectDiff } from '@kbn/object-utils';
@ -34,7 +35,6 @@ import {
} from '@kbn/streams-schema';
import { mapValues, uniq, omit, isEmpty, uniqBy, some } from 'lodash';
import { StreamsClient } from '../../../../lib/streams/client';
import { DetectedMappingFailureError } from '../../../../lib/streams/errors/detected_mapping_failure_error';
import { formatToIngestProcessors } from '../../../../lib/streams/helpers/processing';
export interface ProcessingSimulationParams {
@ -54,16 +54,37 @@ export interface SimulateProcessingDeps {
streamsClient: StreamsClient;
}
export interface SimulationError {
export interface BaseSimulationError {
message: string;
processor_id: string;
type:
| 'generic_processor_failure'
| 'generic_simulation_failure'
| 'reserved_field_failure'
| 'non_additive_processor_failure';
}
export type SimulationError = BaseSimulationError &
(
| {
type: 'field_mapping_failure';
}
| {
type: 'generic_processor_failure';
processor_id: string;
}
| {
type: 'generic_simulation_failure';
processor_id?: string;
}
| {
type: 'ignored_fields_failure';
ignored_fields: Array<Record<string, string>>;
}
| {
type: 'non_additive_processor_failure';
processor_id: string;
}
| {
type: 'reserved_field_failure';
processor_id: string;
}
);
export type DocSimulationStatus = 'parsed' | 'partially_parsed' | 'skipped' | 'failed';
export interface SimulationDocReport {
@ -82,19 +103,29 @@ export interface ProcessorMetrics {
}
// Narrow down the type to only successful processor results
export type SuccessfulIngestSimulateDocumentResult = WithRequired<
export type SuccessfulPipelineSimulateDocumentResult = WithRequired<
IngestSimulateDocumentResult,
'processor_results'
>;
export interface SuccessfulIngestSimulateResponse {
docs: SuccessfulIngestSimulateDocumentResult[];
export interface SuccessfulPipelineSimulateResponse {
docs: SuccessfulPipelineSimulateDocumentResult[];
}
export type PipelineSimulationResult =
| {
status: 'success';
simulation: SuccessfulIngestSimulateResponse;
simulation: SuccessfulPipelineSimulateResponse;
}
| {
status: 'failure';
error: SimulationError;
};
export type IngestSimulationResult =
| {
status: 'success';
simulation: SimulateIngestResponse;
}
| {
status: 'failure';
@ -113,41 +144,43 @@ export const simulateProcessing = async ({
scopedClusterClient,
streamsClient,
}: SimulateProcessingDeps) => {
/* 0. Retrieve required data to prepare the simulation */
const streamIndex = await getStreamIndex(scopedClusterClient, streamsClient, params.path.name);
/* 1. Prepare data for either simulation types (ingest, pipeline), prepare simulation body for the mandatory pipeline simulation */
const simulationData = prepareSimulationData(params);
const pipelineSimulationBody = preparePipelineSimulationBody(simulationData);
const ingestSimulationBody = prepareIngestSimulationBody(simulationData, streamIndex, params);
/**
* 2. Run both pipeline and ingest simulations in parallel.
* - The pipeline simulation is used to extract the documents reports and the processor metrics. This always runs.
* - The ingest simulation is used to fail fast on mapping failures. This runs only if `detected_fields` is provided.
*/
const [pipelineSimulationResult] = await Promise.all([
const [pipelineSimulationResult, ingestSimulationResult] = await Promise.all([
executePipelineSimulation(scopedClusterClient, pipelineSimulationBody),
conditionallyExecuteIngestSimulation(scopedClusterClient, simulationData, params),
executeIngestSimulation(scopedClusterClient, ingestSimulationBody),
]);
/* 3. Fail fast on pipeline simulation errors and return the generic error response gracefully */
/* 3. Fail fast on pipeline simulations errors and return the generic error response gracefully */
if (pipelineSimulationResult.status === 'failure') {
return prepareSimulationFailureResponse(pipelineSimulationResult.error);
} else if (ingestSimulationResult.status === 'failure') {
return prepareSimulationFailureResponse(ingestSimulationResult.error);
}
const streamFields = await getStreamFields(streamsClient, params.path.name);
/* 4. Extract all the documents reports and processor metrics from the pipeline simulation */
/* 4. Extract all the documents reports and processor metrics from the simulations */
const { docReports, processorsMetrics } = computePipelineSimulationResult(
pipelineSimulationResult.simulation,
ingestSimulationResult.simulation,
simulationData.docs,
params.body.processing,
streamFields
);
/* 5. Extract valid detected fields asserting existing mapped fields from stream and ancestors */
const detectedFields = await computeDetectedFields(
processorsMetrics,
streamsClient,
params,
streamFields
);
const detectedFields = await computeDetectedFields(processorsMetrics, params, streamFields);
/* 6. Derive general insights and process final response body */
return prepareSimulationResponse(docReports, processorsMetrics, detectedFields);
@ -234,40 +267,39 @@ const preparePipelineSimulationBody = (
const prepareIngestSimulationBody = (
simulationData: ReturnType<typeof prepareSimulationData>,
streamIndex: IndicesIndexState,
params: ProcessingSimulationParams
) => {
const { path, body } = params;
): SimulateIngestRequest => {
const { body } = params;
const { detected_fields } = body;
const { docs, processors } = simulationData;
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const simulationBody: {
docs: IngestDocument[];
pipeline_substitutions: Record<string, IngestPipelineConfig>;
component_template_substitutions?: Record<string, ClusterComponentTemplateNode>;
} = {
const defaultPipelineName = streamIndex.settings?.index?.default_pipeline;
const mappings = streamIndex.mappings;
const simulationBody: SimulateIngestRequest = {
docs,
pipeline_substitutions: {
[`${path.name}@stream.processing`]: {
processors,
...(defaultPipelineName && {
pipeline_substitutions: {
[defaultPipelineName]: {
processors,
},
},
}),
// Ideally we should not need to retrieve and merge the mappings from the stream index.
// But the ingest simulation API does not validate correctly the mappings unless they are specified in the simulation body.
// So we need to merge the mappings from the stream index with the detected fields.
// This is a workaround until the ingest simulation API works as expected.
mapping_addition: {
...mappings,
properties: {
...(mappings && mappings.properties),
...(detected_fields && computeMappingProperties(detected_fields)),
},
},
};
if (detected_fields) {
const properties = computeMappingProperties(detected_fields);
simulationBody.component_template_substitutions = {
[`${path.name}@stream.layer`]: {
template: {
mappings: {
properties,
},
},
},
};
}
return simulationBody;
};
@ -285,7 +317,7 @@ const executePipelineSimulation = async (
return {
status: 'success',
simulation: simulation as SuccessfulIngestSimulateResponse,
simulation: simulation as SuccessfulPipelineSimulateResponse,
};
} catch (error) {
if (error instanceof esErrors.ResponseError) {
@ -305,49 +337,45 @@ const executePipelineSimulation = async (
status: 'failure',
error: {
message: error.message,
processor_id: 'draft',
type: 'generic_simulation_failure',
},
};
}
};
// TODO: update type to built-in once Kibana updates to elasticsearch-js 8.17
interface IngestSimulationResult {
docs: Array<{ doc: IngestDocument & { error?: ErrorCauseKeys } }>;
}
const conditionallyExecuteIngestSimulation = async (
const executeIngestSimulation = async (
scopedClusterClient: IScopedClusterClient,
simulationData: ReturnType<typeof prepareSimulationData>,
params: ProcessingSimulationParams
): Promise<IngestSimulationResult | null> => {
if (!params.body.detected_fields) return null;
const simulationBody = prepareIngestSimulationBody(simulationData, params);
let simulationResult: IngestSimulationResult;
simulationBody: SimulateIngestRequest
): Promise<IngestSimulationResult> => {
try {
// TODO: We should be using scopedClusterClient.asCurrentUser.simulate.ingest() once Kibana updates to elasticsearch-js 8.17
simulationResult = await scopedClusterClient.asCurrentUser.transport.request({
method: 'POST',
path: `_ingest/_simulate`,
body: simulationBody,
});
const simulation = await scopedClusterClient.asCurrentUser.simulate.ingest(simulationBody);
return {
status: 'success',
simulation: simulation as SimulateIngestResponse,
};
} catch (error) {
// To prevent a race condition on simulation erros, this return early and delegates the error handling to the pipeline simulation
return null;
if (error instanceof esErrors.ResponseError) {
const { processor_tag, reason } = error.body?.error;
return {
status: 'failure',
error: {
message: reason,
processor_id: processor_tag,
type: 'generic_simulation_failure',
},
};
}
return {
status: 'failure',
error: {
message: error.message,
type: 'generic_simulation_failure',
},
};
}
const entryWithError = simulationResult.docs.find(isMappingFailure);
if (entryWithError) {
throw new DetectedMappingFailureError(
`The detected field types might not be compatible with these documents. ${entryWithError.doc.error?.reason}`
);
}
return simulationResult;
};
/**
@ -360,7 +388,8 @@ const conditionallyExecuteIngestSimulation = async (
* This requires a closure on the processor metrics map to keep track of the processor state while iterating over the documents.
*/
const computePipelineSimulationResult = (
simulationResult: SuccessfulIngestSimulateResponse,
pipelineSimulationResult: SuccessfulPipelineSimulateResponse,
ingestSimulationResult: SimulateIngestResponse,
sampleDocs: Array<{ _source: FlattenRecord }>,
processing: ProcessorDefinitionWithId[],
streamFields: FieldDefinition
@ -374,12 +403,23 @@ const computePipelineSimulationResult = (
.filter(([, { type }]) => type === 'system')
.map(([name]) => name);
const docReports = simulationResult.docs.map((docResult, id) => {
const { errors, status, value } = getLastDoc(docResult, sampleDocs[id]._source);
const docReports = pipelineSimulationResult.docs.map((pipelineDocResult, id) => {
const ingestDocResult = ingestSimulationResult.docs[id];
const ingestDocErrors = collectIngestDocumentErrors(ingestDocResult);
const diff = computeSimulationDocDiff(docResult, sampleDocs[id]._source, forbiddenFields);
const { errors, status, value } = getLastDoc(
pipelineDocResult,
sampleDocs[id]._source,
ingestDocErrors
);
docResult.processor_results.forEach((processor) => {
const diff = computeSimulationDocDiff(
pipelineDocResult,
sampleDocs[id]._source,
forbiddenFields
);
pipelineDocResult.processor_results.forEach((processor) => {
const procId = processor.tag;
if (procId && isSkippedProcessor(processor)) {
@ -391,13 +431,16 @@ const computePipelineSimulationResult = (
processorsMap[processor_id].detected_fields.push(name);
});
errors.push(...diff.errors);
errors.push(...diff.errors); // Add diffing errors to the document errors list, such as reserved fields or non-additive changes
errors.push(...ingestDocErrors); // Add ingestion errors to the document errors list, such as ignored_fields or mapping errors
errors.forEach((error) => {
const procId = error.processor_id;
const procId = 'processor_id' in error && error.processor_id;
processorsMap[procId].errors.push(error);
if (error.type !== 'non_additive_processor_failure') {
processorsMap[procId].failed_rate++;
if (procId && processorsMap[procId]) {
processorsMap[procId].errors.push(error);
if (error.type !== 'non_additive_processor_failure') {
processorsMap[procId].failed_rate++;
}
}
});
@ -458,7 +501,14 @@ const extractProcessorMetrics = ({
});
};
const getDocumentStatus = (doc: SuccessfulIngestSimulateDocumentResult): DocSimulationStatus => {
const getDocumentStatus = (
doc: SuccessfulPipelineSimulateDocumentResult,
ingestDocErrors: SimulationError[]
): DocSimulationStatus => {
// If there is an ingestion mapping error, the document parsing should be considered failed
if (ingestDocErrors.some((error) => error.type === 'field_mapping_failure')) {
return 'failed';
}
// Remove the always present base processor for dot expander
const processorResults = doc.processor_results.slice(1);
@ -477,8 +527,12 @@ const getDocumentStatus = (doc: SuccessfulIngestSimulateDocumentResult): DocSimu
return 'failed';
};
const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult, sample: FlattenRecord) => {
const status = getDocumentStatus(docResult);
const getLastDoc = (
docResult: SuccessfulPipelineSimulateDocumentResult,
sample: FlattenRecord,
ingestDocErrors: SimulationError[]
) => {
const status = getDocumentStatus(docResult, ingestDocErrors);
const lastDocSource =
docResult.processor_results
.slice(1) // Remove the always present base processor for dot expander
@ -504,7 +558,7 @@ const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult, sample: F
* this function computes the detected fields and the errors for each processor.
*/
const computeSimulationDocDiff = (
docResult: SuccessfulIngestSimulateDocumentResult,
docResult: SuccessfulPipelineSimulateDocumentResult,
sample: FlattenRecord,
forbiddenFields: string[]
) => {
@ -570,6 +624,27 @@ const computeSimulationDocDiff = (
return diffResult;
};
const collectIngestDocumentErrors = (docResult: SimulateIngestSimulateIngestDocumentResult) => {
const errors: SimulationError[] = [];
if (isMappingFailure(docResult)) {
errors.push({
type: 'field_mapping_failure',
message: `Some field types might not be compatible with this document: ${docResult.doc?.error?.reason}`,
});
}
if (docResult.doc?.ignored_fields) {
errors.push({
type: 'ignored_fields_failure',
message: 'Some fields were ignored while simulating this document ingestion.',
ignored_fields: docResult.doc.ignored_fields,
});
}
return errors;
};
const prepareSimulationResponse = async (
docReports: SimulationDocReport[],
processorsMetrics: Record<string, ProcessorMetrics>,
@ -605,13 +680,16 @@ const prepareSimulationFailureResponse = (error: SimulationError) => {
detected_fields: [],
documents: [],
processors_metrics: {
[error.processor_id]: {
detected_fields: [],
errors: [error],
failed_rate: 1,
skipped_rate: 0,
parsed_rate: 0,
},
...('processor_id' in error &&
error.processor_id && {
[error.processor_id]: {
detected_fields: [],
errors: [error],
failed_rate: 1,
skipped_rate: 0,
parsed_rate: 0,
},
}),
},
documents_metrics: {
failed_rate: 1,
@ -623,6 +701,24 @@ const prepareSimulationFailureResponse = (error: SimulationError) => {
};
};
const getStreamIndex = async (
scopedClusterClient: IScopedClusterClient,
streamsClient: StreamsClient,
streamName: string
): Promise<IndicesIndexState> => {
const dataStream = await streamsClient.getDataStream(streamName);
const lastIndex = dataStream.indices.at(-1);
if (!lastIndex) {
throw new Error(`No writing index found for stream ${streamName}`);
}
const lastIndexMapping = await scopedClusterClient.asCurrentUser.indices.get({
index: lastIndex.index_name,
});
return lastIndexMapping[lastIndex.index_name];
};
const getStreamFields = async (
streamsClient: StreamsClient,
streamName: string
@ -644,7 +740,6 @@ const getStreamFields = async (
*/
const computeDetectedFields = async (
processorsMetrics: Record<string, ProcessorMetrics>,
streamsClient: StreamsClient,
params: ProcessingSimulationParams,
streamFields: FieldDefinition
): Promise<DetectedField[]> => {
@ -699,8 +794,8 @@ const isSkippedProcessor = (
// @ts-expect-error Looks like the IngestPipelineSimulation.status is not typed correctly and misses the 'skipped' status
): processor is WithRequired<IngestPipelineSimulation, 'tag'> => processor.status === 'skipped';
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const isMappingFailure = (entry: any) => entry.doc?.error?.type === 'document_parsing_exception';
const isMappingFailure = (entry: SimulateIngestSimulateIngestDocumentResult) =>
entry.doc?.error?.type === 'document_parsing_exception';
const isNonAdditiveSimulationError = (error: SimulationError) =>
error.type === 'non_additive_processor_failure';

View file

@ -154,7 +154,7 @@ describe('handleProcessingSuggestion', () => {
});
});
it('filters out simulation when simulateProcessing returns an unsuccessful result', async () => {
it('returns non-matching simulations only when there are not matching simulations at all', async () => {
const messages = [{ message: 'Error 999: failed' }, { message: 'Error 999: failed duplicate' }];
const newBody = {
field: 'message',
@ -184,9 +184,8 @@ describe('handleProcessingSuggestion', () => {
streamsClientMock
);
// Expect that unsuccessful simulation is filtered, so no simulation is returned.
expect(result.simulations.length).toBe(0);
expect(result.patterns).toEqual([]);
expect(result.simulations.length).toBe(1);
expect(result.patterns).toEqual(['%{common:message}']);
});
});

View file

@ -6,7 +6,7 @@
*/
import { IScopedClusterClient } from '@kbn/core/server';
import { get, groupBy, mapValues, orderBy, shuffle, uniq, uniqBy } from 'lodash';
import { get, groupBy, isEmpty, mapValues, orderBy, shuffle, uniq, uniqBy } from 'lodash';
import { InferenceClient } from '@kbn/inference-plugin/server';
import { FlattenRecord } from '@kbn/streams-schema';
import { StreamsClient } from '../../../../lib/streams/client';
@ -194,10 +194,6 @@ async function processPattern(
streamsClient,
});
if (simulationResult.documents_metrics.parsed_rate === 0) {
return null;
}
// TODO if success rate is zero, try to strip out the date part and try again
return {
@ -208,8 +204,13 @@ async function processPattern(
)
).filter((simulation): simulation is SimulationWithPattern => simulation !== null);
const matchingSimulations = simulations.filter(
(simulation) => simulation.documents_metrics.parsed_rate > 0
);
return {
chatResponse,
simulations,
// When no simulation is successful, we return all of them, otherwise we return only the successful ones
simulations: isEmpty(matchingSimulations) ? simulations : matchingSimulations,
};
}

View file

@ -5,16 +5,18 @@
* 2.0.
*/
import React from 'react';
import React, { useMemo } from 'react';
import {
DragDropContextProps,
EuiAccordion,
EuiCode,
EuiFlexGroup,
EuiPanel,
EuiResizableContainer,
EuiSplitPanel,
EuiText,
EuiTitle,
euiDragDropReorder,
useEuiShadow,
useEuiTheme,
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
@ -22,6 +24,7 @@ import { IngestStreamGetResponse } from '@kbn/streams-schema';
import { useUnsavedChangesPrompt } from '@kbn/unsaved-changes-prompt';
import { css } from '@emotion/react';
import { isEmpty } from 'lodash';
import { FormattedMessage } from '@kbn/i18n-react';
import { useKibana } from '../../../hooks/use_kibana';
import { DraggableProcessorListItem } from './processors_list';
import { SortableList } from './sortable_list';
@ -138,7 +141,35 @@ const ProcessorsEditor = React.memo(() => {
)
);
const simulationSnapshot = useSimulatorSelector((s) => s);
const simulation = useSimulatorSelector((snapshot) => snapshot.context.simulation);
const errors = useMemo(() => {
if (!simulation) {
return { ignoredFields: [], mappingFailures: [] };
}
const ignoredFieldsSet = new Set<string>();
const mappingFailuresSet = new Set<string>();
simulation.documents.forEach((doc) => {
doc.errors.forEach((error) => {
if (error.type === 'ignored_fields_failure') {
error.ignored_fields.forEach((ignored) => {
ignoredFieldsSet.add(ignored.field);
});
}
if (error.type === 'field_mapping_failure' && mappingFailuresSet.size < 2) {
mappingFailuresSet.add(error.message);
}
});
});
return {
ignoredFields: Array.from(ignoredFieldsSet),
mappingFailures: Array.from(mappingFailuresSet),
};
}, [simulation]);
const handlerItemDrag: DragDropContextProps['onDragEnd'] = ({ source, destination }) => {
if (source && destination) {
@ -157,8 +188,7 @@ const ProcessorsEditor = React.memo(() => {
borderRadius="none"
grow={false}
css={css`
z-index: ${euiTheme.levels.maskBelowHeader};
${useEuiShadow('xs')};
border-bottom: ${euiTheme.border.thin};
`}
>
<EuiTitle size="xxs">
@ -195,15 +225,95 @@ const ProcessorsEditor = React.memo(() => {
key={processorRef.id}
idx={idx}
processorRef={processorRef}
processorMetrics={
simulationSnapshot.context.simulation?.processors_metrics[processorRef.id]
}
processorMetrics={simulation?.processors_metrics[processorRef.id]}
/>
))}
</SortableList>
)}
<AddProcessorPanel />
</EuiPanel>
<EuiPanel paddingSize="m" hasShadow={false} grow={false}>
{!isEmpty(errors.ignoredFields) && (
<EuiPanel paddingSize="s" hasShadow={false} grow={false} color="danger">
<EuiAccordion
id="ignored-fields-failures-accordion"
initialIsOpen
buttonContent={
<strong>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.ignoredFieldsFailure.title',
{ defaultMessage: 'Some fields were ignored during the simulation.' }
)}
</strong>
}
>
<EuiText component="p" size="s">
<p>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.ignoredFieldsFailure.description',
{
defaultMessage:
'Some fields in these documents were ignored during the ingestion simulation. Review the fields mapping limits.',
}
)}
</p>
<p>
<FormattedMessage
id="xpack.streams.streamDetailView.managementTab.enrichment.ignoredFieldsFailure.fieldsList"
defaultMessage="The ignored fields are: {fields}"
values={{
fields: (
<EuiFlexGroup
gutterSize="s"
css={css`
margin-top: ${euiTheme.size.s};
`}
>
{errors.ignoredFields.map((field) => (
<EuiCode>{field}</EuiCode>
))}
</EuiFlexGroup>
),
}}
/>
</p>
</EuiText>
</EuiAccordion>
</EuiPanel>
)}
{!isEmpty(errors.mappingFailures) && (
<EuiPanel paddingSize="s" hasShadow={false} grow={false} color="danger">
<EuiAccordion
id="mapping-failures-accordion"
initialIsOpen
buttonContent={i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.fieldMappingsFailure.title',
{
defaultMessage: 'Field conflicts during simulation',
}
)}
>
<EuiText size="s">
<p>
<FormattedMessage
id="xpack.streams.streamDetailView.managementTab.enrichment.fieldMappingsFailure.fieldsList"
defaultMessage="These are some mapping failures that occurred during the simulation:"
/>
</p>
<ul>
{errors.mappingFailures.map((failureMessage, id) => (
<li key={id}>
<EuiText css={clampTwoLines} size="s">
{failureMessage}
</EuiText>
</li>
))}
</ul>
</EuiText>
</EuiAccordion>
</EuiPanel>
)}
</EuiPanel>
</>
);
});
@ -212,3 +322,11 @@ const verticalFlexCss = css`
display: flex;
flex-direction: column;
`;
const clampTwoLines = css`
display: -webkit-box;
-webkit-line-clamp: 2;
-webkit-box-orient: vertical;
overflow: hidden;
text-overflow: ellipsis;
`;

View file

@ -18,7 +18,7 @@ export const DraggableProcessorListItem = ({
spacing="m"
draggableId={props.processorRef.id}
hasInteractiveChildren
style={{
css={{
paddingLeft: 0,
paddingRight: 0,
}}

View file

@ -9,6 +9,7 @@ import { createSelector } from 'reselect';
import { FlattenRecord, SampleDocument } from '@kbn/streams-schema';
import { isPlainObject, uniq } from 'lodash';
import { flattenObjectNestedLast } from '@kbn/object-utils';
import { SimulationContext } from './types';
import { filterSimulationDocuments } from './utils';
@ -27,7 +28,7 @@ export const selectPreviewDocuments = createSelector(
return (
((previewDocsFilter && documents
? filterSimulationDocuments(documents, previewDocsFilter)
: samples) as FlattenRecord[]) || EMPTY_ARRAY
: samples.map(flattenObjectNestedLast)) as FlattenRecord[]) || EMPTY_ARRAY
);
}
);

View file

@ -202,6 +202,7 @@ export const simulationMachine = setup({
actions: [{ type: 'mapField', params: ({ event }) => event }],
},
'simulation.fields.unmap': {
target: 'assertingSimulationRequirements',
actions: [{ type: 'unmapField', params: ({ event }) => event }],
},
},
@ -237,10 +238,20 @@ export const simulationMachine = setup({
streamName: context.streamName,
absoluteTimeRange: context.dateRangeRef.getSnapshot().context.absoluteTimeRange,
}),
onDone: {
target: 'assertingSimulationRequirements',
actions: [{ type: 'storeSamples', params: ({ event }) => ({ samples: event.output }) }],
},
onDone: [
{
guard: {
type: 'hasProcessors',
params: ({ context }) => ({ processors: context.processors }),
},
target: 'assertingSimulationRequirements',
actions: [{ type: 'storeSamples', params: ({ event }) => ({ samples: event.output }) }],
},
{
target: 'idle',
actions: [{ type: 'storeSamples', params: ({ event }) => ({ samples: event.output }) }],
},
],
onError: {
target: 'idle',
actions: [

View file

@ -10,7 +10,6 @@
import expect from '@kbn/expect';
import { ClientRequestParamsOf } from '@kbn/server-route-repository-utils';
import { StreamsRouteRepository } from '@kbn/streams-plugin/server';
import { errors as esErrors } from '@elastic/elasticsearch';
import { disableStreams, enableStreams, forkStream, indexDocument } from './helpers/requests';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import {
@ -448,6 +447,58 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(grokMetrics.failed_rate).to.be(0);
});
it('should gracefully return mappings simulation errors', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [
{
id: 'draft',
grok: {
field: 'message',
patterns: ['%{TIMESTAMP_ISO8601:@timestamp}'],
if: { always: {} },
},
},
],
documents: [createTestDocument('2025-04-04 00:00:00,000')], // This date doesn't exactly match the mapping for @timestamp
});
expect(response.body.documents[0].errors).to.eql([
{
message:
'The processor is not additive to the documents. It might update fields [@timestamp]',
processor_id: 'draft',
type: 'non_additive_processor_failure',
},
{
message:
"Some field types might not be compatible with this document: [1:15] failed to parse field [@timestamp] of type [date] in document with id '0'. Preview of field's value: '2025-04-04 00:00:00,000'",
type: 'field_mapping_failure',
},
]);
expect(response.body.documents[0].status).to.be('failed');
// Simulate detected fields mapping issue
const detectedFieldsFailureResponse = await simulateProcessingForStream(
apiClient,
'logs.test',
{
processing: [basicGrokProcessor],
documents: [createTestDocument()],
detected_fields: [
{ name: 'parsed_timestamp', type: 'boolean' }, // Incompatible type
],
}
);
expect(detectedFieldsFailureResponse.body.documents[0].errors).to.eql([
{
type: 'field_mapping_failure',
message: `Some field types might not be compatible with this document: [1:44] failed to parse field [parsed_timestamp] of type [boolean] in document with id '0'. Preview of field's value: '${TEST_TIMESTAMP}'`,
},
]);
expect(detectedFieldsFailureResponse.body.documents[0].status).to.be('failed');
});
it('should return the is_non_additive_simulation simulation flag', async () => {
const [additiveParsingResponse, nonAdditiveParsingResponse] = await Promise.all([
simulateProcessingForStream(apiClient, 'logs.test', {
@ -476,26 +527,5 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(nonAdditiveParsingResponse.body.is_non_additive_simulation).to.be(true);
});
});
describe('Failed simulations', () => {
it('should fail with incompatible detected field mappings', async () => {
const response = await simulateProcessingForStream(
apiClient,
'logs.test',
{
processing: [basicGrokProcessor],
documents: [createTestDocument()],
detected_fields: [
{ name: 'parsed_timestamp', type: 'boolean' }, // Incompatible type
],
},
400
);
expect((response.body as esErrors.ResponseError['body']).message).to.contain(
'The detected field types might not be compatible with these documents.'
);
});
});
});
}