[Streams 🌊] Enrichment - Fix broken results due to condition and add skipped metric (#212757)

## 📓 Summary

When the condition is not met, the processing simulation reports wrong
metrics and fails on a unhandler error.

This work fix the issue and also update the document simulation metrics,
reporting how many documents are skipped by a processor during the
simulation.

A follow-up work will update the filters on the date to better reflect
the available states of the documents (parsed, partially parsed,
skipped, failed).

<img width="701" alt="Screenshot 2025-02-28 at 12 47 10"
src="https://github.com/user-attachments/assets/1b6979e4-78a1-4db3-af72-faaf06c0e249"
/>
This commit is contained in:
Marco Antonio Ghiani 2025-03-04 07:43:30 +01:00 committed by GitHub
parent 6b6eb43183
commit 6e2a1033b8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 85 additions and 15 deletions

View file

@ -62,7 +62,7 @@ export interface SimulationError {
| 'non_additive_processor_failure';
}
export type DocSimulationStatus = 'parsed' | 'partially_parsed' | 'failed';
export type DocSimulationStatus = 'parsed' | 'partially_parsed' | 'skipped' | 'failed';
export interface SimulationDocReport {
detected_fields: Array<{ processor_id: string; name: string }>;
@ -75,6 +75,7 @@ export interface ProcessorMetrics {
detected_fields: string[];
errors: SimulationError[];
failure_rate: number;
skipped_rate: number;
success_rate: number;
}
@ -113,7 +114,6 @@ export const simulateProcessing = async ({
/* 1. Prepare data for either simulation types (ingest, pipeline), prepare simulation body for the mandatory pipeline simulation */
const simulationData = prepareSimulationData(params);
const pipelineSimulationBody = preparePipelineSimulationBody(simulationData);
/**
* 2. Run both pipeline and ingest simulations in parallel.
* - The pipeline simulation is used to extract the documents reports and the processor metrics. This always runs.
@ -188,7 +188,16 @@ const prepareSimulationProcessors = (
} as ProcessorDefinition;
});
return formatToIngestProcessors(processors);
const dotExpanderProcessor: Pick<IngestProcessorContainer, 'dot_expander'> = {
dot_expander: {
field: '*',
override: true,
},
};
const formattedProcessors = formatToIngestProcessors(processors);
return [dotExpanderProcessor, ...formattedProcessors];
};
const prepareSimulationData = (params: ProcessingSimulationParams) => {
@ -351,10 +360,18 @@ const computePipelineSimulationResult = (
const processorsMap = initProcessorMetricsMap(processing);
const docReports = simulationResult.docs.map((docResult, id) => {
const { errors, status, value } = getLastDoc(docResult);
const { errors, status, value } = getLastDoc(docResult, sampleDocs[id]._source);
const diff = computeSimulationDocDiff(docResult, sampleDocs[id]._source);
docResult.processor_results.forEach((processor) => {
const procId = processor.tag;
if (procId && isSkippedProcessor(processor)) {
processorsMap[procId].skipped_rate++;
}
});
diff.detected_fields.forEach(({ processor_id, name }) => {
processorsMap[processor_id].detected_fields.push(name);
});
@ -392,6 +409,7 @@ const initProcessorMetricsMap = (
detected_fields: [],
errors: [],
failure_rate: 0,
skipped_rate: 0,
success_rate: 1,
},
]);
@ -408,7 +426,8 @@ const extractProcessorMetrics = ({
}) => {
return mapValues(processorsMap, (metrics) => {
const failureRate = metrics.failure_rate / sampleSize;
const successRate = 1 - failureRate;
const skippedRate = metrics.skipped_rate / sampleSize;
const successRate = 1 - skippedRate - failureRate;
const detected_fields = uniq(metrics.detected_fields);
const errors = uniqBy(metrics.errors, (error) => error.message);
@ -416,22 +435,38 @@ const extractProcessorMetrics = ({
detected_fields,
errors,
failure_rate: parseFloat(failureRate.toFixed(2)),
skipped_rate: parseFloat(skippedRate.toFixed(2)),
success_rate: parseFloat(successRate.toFixed(2)),
};
});
};
const getDocumentStatus = (doc: SuccessfulIngestSimulateDocumentResult): DocSimulationStatus => {
if (doc.processor_results.every(isSuccessfulProcessor)) return 'parsed';
// Remove the always present base processor for dot expander
const processorResults = doc.processor_results.slice(1);
if (doc.processor_results.some(isSuccessfulProcessor)) return 'partially_parsed';
if (processorResults.every(isSkippedProcessor)) {
return 'skipped';
}
if (processorResults.every((proc) => isSuccessfulProcessor(proc) || isSkippedProcessor(proc))) {
return 'parsed';
}
if (processorResults.some(isSuccessfulProcessor)) {
return 'partially_parsed';
}
return 'failed';
};
const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult) => {
const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult, sample: FlattenRecord) => {
const status = getDocumentStatus(docResult);
const lastDocSource = docResult.processor_results.at(-1)?.doc?._source ?? {};
const lastDocSource =
docResult.processor_results
.slice(1) // Remove the always present base processor for dot expander
.filter((proc) => !isSkippedProcessor(proc))
.at(-1)?.doc?._source ?? sample;
if (status === 'parsed') {
return {
@ -440,7 +475,7 @@ const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult) => {
status,
};
} else {
const { _errors, ...value } = lastDocSource;
const { _errors = [], ...value } = lastDocSource;
return { value: flattenObjectNestedLast(value), errors: _errors as SimulationError[], status };
}
};
@ -459,7 +494,7 @@ const computeSimulationDocDiff = (
const successfulProcessors = docResult.processor_results.filter(isSuccessfulProcessor);
const comparisonDocs = [
{ processor_id: 'sample', value: sample },
{ processor_id: 'base', value: docResult.processor_results[0]!.doc!._source },
...successfulProcessors.map((proc) => ({
processor_id: proc.tag,
value: omit(proc.doc._source, ['_errors']),
@ -495,7 +530,7 @@ const computeSimulationDocDiff = (
// We might have updated fields that are not present in the original document because are generated by the previous processors.
// We exclude them from the list of fields that make the processor non-additive.
const originalUpdatedFields = updatedFields.filter((field) => field in sample);
const originalUpdatedFields = updatedFields.filter((field) => field in sample).sort();
if (!isEmpty(originalUpdatedFields)) {
diffResult.errors.push({
processor_id: nextDoc.processor_id,
@ -514,7 +549,8 @@ const prepareSimulationResponse = async (
detectedFields: DetectedField[]
) => {
const successRate = computeSuccessRate(docReports);
const failureRate = 1 - successRate;
const skippedRate = computeSkippedRate(docReports);
const failureRate = 1 - skippedRate - successRate;
const isNotAdditiveSimulation = some(processorsMetrics, (metrics) =>
metrics.errors.some(isNonAdditiveSimulationError)
);
@ -524,6 +560,7 @@ const prepareSimulationResponse = async (
documents: docReports,
processors_metrics: processorsMetrics,
failure_rate: parseFloat(failureRate.toFixed(2)),
skipped_rate: parseFloat(skippedRate.toFixed(2)),
success_rate: parseFloat(successRate.toFixed(2)),
is_non_additive_simulation: isNotAdditiveSimulation,
};
@ -538,10 +575,12 @@ const prepareSimulationFailureResponse = (error: SimulationError) => {
detected_fields: [],
errors: [error],
failure_rate: 1,
skipped_rate: 0,
success_rate: 0,
},
},
failure_rate: 1,
skipped_rate: 0,
success_rate: 0,
is_non_additive_simulation: isNonAdditiveSimulationError(error),
};
@ -597,6 +636,12 @@ const computeSuccessRate = (docs: SimulationDocReport[]) => {
return successfulCount / docs.length;
};
const computeSkippedRate = (docs: SimulationDocReport[]) => {
const skippedCount = docs.reduce((rate, doc) => (rate += doc.status === 'skipped' ? 1 : 0), 0);
return skippedCount / docs.length;
};
const computeMappingProperties = (detectedFields: NamedFieldDefinitionConfig[]) => {
return Object.fromEntries(detectedFields.map(({ name, type }) => [name, { type }]));
};
@ -609,6 +654,11 @@ const isSuccessfulProcessor = (
): processor is WithRequired<IngestPipelineSimulation, 'doc' | 'tag'> =>
processor.status === 'success' && !!processor.tag;
const isSkippedProcessor = (
processor: IngestPipelineSimulation
// @ts-expect-error Looks like the IngestPipelineSimulation.status is not typed correctly and misses the 'skipped' status
): processor is WithRequired<IngestPipelineSimulation, 'tag'> => processor.status === 'skipped';
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const isMappingFailure = (entry: any) => entry.doc?.error?.type === 'document_parsing_exception';

View file

@ -67,6 +67,11 @@ export const ProcessorOutcomePreview = ({
}
}, [columns, selectedDocsFilter]);
const simulationFailureRate = simulation
? simulation?.failure_rate + simulation?.skipped_rate
: undefined;
const simulationSuccessRate = simulation?.success_rate;
return (
<>
<EuiFlexItem grow={false}>
@ -76,8 +81,8 @@ export const ProcessorOutcomePreview = ({
timeRange={timeRange}
onTimeRangeChange={setTimeRange}
onTimeRangeRefresh={onRefreshSamples}
simulationFailureRate={simulation?.failure_rate}
simulationSuccessRate={simulation?.success_rate}
simulationFailureRate={simulationFailureRate}
simulationSuccessRate={simulationSuccessRate}
/>
</EuiFlexItem>
<EuiSpacer size="m" />

View file

@ -32,10 +32,12 @@ const formatter = new Intl.NumberFormat('en-US', {
export const ProcessorMetricBadges = ({
detected_fields,
failure_rate,
skipped_rate,
success_rate,
}: ProcessorMetricBadgesProps) => {
const detectedFieldsCount = detected_fields.length;
const failureRate = failure_rate > 0 ? formatter.format(failure_rate) : null;
const skippedRate = skipped_rate > 0 ? formatter.format(skipped_rate) : null;
const successRate = success_rate > 0 ? formatter.format(success_rate) : null;
return (
@ -53,6 +55,19 @@ export const ProcessorMetricBadges = ({
{failureRate}
</EuiBadge>
)}
{skippedRate && (
<EuiBadge
color="hollow"
iconType="minus"
title={i18n.translate('xpack.streams.processorMetricBadges.euiBadge.skippedRate', {
defaultMessage:
'{skippedRate} of the sampled documents were skipped due to the set condition',
values: { skippedRate },
})}
>
{skippedRate}
</EuiBadge>
)}
{successRate && (
<EuiBadge
color="hollow"