[Streams 🌊] Enrichment simulation behaviour improvements (#209985)

## 📓 Summary

Part of https://github.com/elastic/streams-program/issues/127
Closes https://github.com/elastic/streams-program/issues/114

This update overhauls the internal logic of our processing simulation
endpoint. It now runs parallel simulations (pipeline and, conditionally,
ingest) to extract detailed document reports and processor metrics,
while also handling a host of edge cases.

The key improvements include:

- **Parallel Simulation Execution**  
Executes both pipeline and ingest simulations concurrently. The pipeline
simulation always runs to extract per-document reports and metrics. The
ingest simulation runs conditionally when detected fields are provided,
enabling fast failures on mapping mismatches.

- **Document Reporting & Metrics**  
Extracts granular differences between source and simulated documents.
Reports include:
- Field-level diffs indicating which processor added or updated fields.
- Detailed error messages (e.g., generic processor failure, generic
simulation failure, non-additive processor failure).
- Calculation of overall success and failure rates, as well as
per-processor metrics.

- **Sequential Processors & Field Overriding**  
Supports multiple sequential processors. In cases where later processors
override fields produced by earlier ones, the logic bypasses
non-additive checks to accept the new value.

- **Robust Handling of Partial & Failed Simulations**  
  Simulations now correctly mark documents as:
  - **Parsed** when all processors succeed.
  - **Partially parsed** when some processors fail.
- **Failed** when none of the processors processing the document
succeed.

- **Mapping Validation & Non-Additive Detection**  
The simulation verifies that the detected field mappings are compatible.
If a processor introduces non-additive changes—updating an existing
field rather than appending—the simulation flags the error and sets a
dedicated `is_non_additive_simulation` flag. Additionally, a failed
ingest simulation (e.g., due to incompatible mapping types) results in
an immediate failure.

The final returned API response adheres to the following TypeScript
type:

```typescript
interface SimulationResponse {
  detected_fields: DetectedField[];
  documents: SimulationDocReport[];
  processors_metrics: Record<string, ProcessorMetrics>;
  failure_rate: number;
  success_rate: number;
  is_non_additive_simulation: boolean;
}
```

## Updated tests
```
Processing Simulation
├── Successful simulations
│   ├── should simulate additive processing
│   ├── should simulate with detected fields
│   ├── should simulate multiple sequential processors
│   ├── should simulate partially parsed documents
│   ├── should return processor metrics
│   ├── should return accurate success/failure rates
│   ├── should allow overriding fields detected by previous simulation processors (skip non-additive check)
│   ├── should gracefully return the errors for each partially parsed or failed document
│   ├── should gracefully return failed simulation errors
│   ├── should gracefully return non-additive simulation errors
│   └── should return the is_non_additive_simulation simulation flag
└── Failed simulations
    └── should fail with incompatible detected field mappings
```

## 🚨 API Failure Conditions & Handler Corner Cases

The simulation API handles and reports the following corner cases:

- **Pipeline Simulation Failures** _(Gracefully reported)_
- Syntax errors in processor configurations (e.g., malformed grok
patterns) trigger a pipeline-level failure with detailed error
information (processor ID, error type, and message).

- **Non-Additive Processor Behavior**  _(Gracefully reported)_
- If a processor modifies fields already present in the source document
rather than strictly appending new fields, the simulation flags this as
a non-additive change.
- The error is recorded both at the document level (resulting in a
"partially_parsed" or "failed" status) and within per-processor metrics,
with the global flag `is_non_additive_simulation` set to true.
  
- **Partial Document Processing**  _(Gracefully reported)_
- In scenarios with sequential processors where the first processor
succeeds (e.g., a dissect processor) and the subsequent grok processor
fails, documents are marked as "partially_parsed."
- These cases are reflected in the overall success/failure rates and
detailed per-document error lists.

- **Field Overriding**  
- When a later processor intentionally overrides fields (for instance,
reassigning a previously calculated field), the simulation bypasses the
non-additive check, and detected fields are aggregated accordingly,
noting both the original and overridden values.

- **Mapping Inconsistencies**  _(API failure bad request)_
- When the ingest simulation detects incompatibility between the
provided detected field mappings (such as defining a field as a boolean
when it should be a date) and the source document, it immediately fails.
- The failure response includes an error message explaining the
incompatibility.

## 🔜 Follow-up Work

- **Integrate Schema Editor**  
Given the improved support for detected fields, a follow up PR will
introduced the Schema Editor and will allow mapping along the data
enrichment.
- **Granular filtering and report**
Having access to more granular details such as status, errors and
detected fields for each documents, we could enhance the table with
additional information and better filters. cc @LucaWintergerst @patpscal

## 🎥 Demo recordings


https://github.com/user-attachments/assets/29f804eb-6dd4-4452-a798-9d48786cbb7f

---------

Co-authored-by: Jean-Louis Leysens <jloleysens@gmail.com>
Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Marco Antonio Ghiani 2025-02-19 11:25:41 +01:00 committed by GitHub
parent 69a87194d1
commit c3fdb39dfb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 1366 additions and 459 deletions

View file

@ -7,10 +7,10 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { flattenObject } from './flatten_object';
import { flattenObject, flattenObjectNestedLast } from './flatten_object';
describe('flattenObject', () => {
it('should flat gamma object properties', () => {
it('should flatten nested object properties', () => {
const flattened = flattenObject({
alpha: {
gamma: {
@ -30,3 +30,28 @@ describe('flattenObject', () => {
});
});
});
describe('flattenObjectNestedLast', () => {
it('should give nested object properties precedence over already flattened entries', () => {
const flattened = flattenObjectNestedLast({
'alpha.beta': 99,
alpha: {
gamma: {
sigma: 1,
},
delta: {
sigma: 2,
},
},
beta: 3,
'alpha.gamma.sigma': 4,
});
expect(flattened).toEqual({
'alpha.beta': 99,
'alpha.gamma.sigma': 1,
'alpha.delta.sigma': 2,
beta: 3,
});
});
});

View file

@ -9,21 +9,28 @@
import { isPlainObject } from 'lodash';
type GetValuesTypes<T> = T extends Record<PropertyKey, any>
? { [K in keyof T]: GetValuesTypes<T[K]> }[keyof T]
: T;
/**
* Returns a flattened version of the input object also accounting for nested properties.
* @param obj - The input object.
* @param parentKey - The initial key used for recursive flattening.
* @returns An object containing all the flattened properties.
*/
export function flattenObject(obj: Record<PropertyKey, unknown>, parentKey: string = '') {
const result: Record<PropertyKey, unknown> = {};
export function flattenObject<TObj extends Record<PropertyKey, any>>(
obj: TObj,
parentKey: string = ''
) {
const result: Record<PropertyKey, GetValuesTypes<TObj>> = {};
for (const key in obj) {
if (Object.hasOwn(obj, key)) {
const value = obj[key];
const newKey = parentKey ? `${parentKey}.${key}` : key;
if (isPlainObject(value)) {
Object.assign(result, flattenObject(value as Record<PropertyKey, unknown>, newKey));
Object.assign(result, flattenObject(value, newKey));
} else {
result[newKey] = value;
}
@ -31,3 +38,26 @@ export function flattenObject(obj: Record<PropertyKey, unknown>, parentKey: stri
}
return result;
}
/**
* Returns a flattened version of the input object, giving higher priority to nested fields and flattening them after the other properties.
* @param obj - The input object.
* @returns An object containing all the flattened properties.
*/
export function flattenObjectNestedLast<TObj extends Record<PropertyKey, any>>(obj: TObj) {
const flattened: Record<PropertyKey, GetValuesTypes<TObj>> = {};
const nested: Record<PropertyKey, GetValuesTypes<TObj>> = {};
for (const key in obj) {
if (Object.hasOwn(obj, key)) {
const value = obj[key];
if (isPlainObject(value)) {
nested[key] = value;
} else {
flattened[key] = value;
}
}
}
return { ...flattened, ...flattenObject(nested) };
}

View file

@ -6,7 +6,7 @@
*/
import { omit } from 'lodash';
import { FieldDefinitionConfig } from '../models';
import { FieldDefinitionConfig, InheritedFieldDefinition, WiredStreamDefinition } from '../models';
// Parameters that we consider first class and provide a curated experience for
const FIRST_CLASS_PARAMETERS = ['type', 'format'];
@ -17,3 +17,12 @@ export const getAdvancedParameters = (fieldName: string, fieldConfig: FieldDefin
const additionalOmissions = fieldName === '@timestamp' ? ['ignore_malformed'] : [];
return omit(fieldConfig, FIRST_CLASS_PARAMETERS.concat(additionalOmissions));
};
export const getInheritedFieldsFromAncestors = (ancestors: WiredStreamDefinition[]) => {
return ancestors.reduce<InheritedFieldDefinition>((acc, def) => {
Object.entries(def.ingest.wired.fields).forEach(([key, fieldDef]) => {
acc[key] = { ...fieldDef, from: def.name };
});
return acc;
}, {});
};

View file

@ -33,6 +33,12 @@ export interface RecursiveRecord {
[key: PropertyKey]: Primitive | Primitive[] | RecursiveRecord;
}
export const recursiveRecord: z.ZodType<RecursiveRecord> = z.record(
z.union([primitive, z.array(primitive), z.lazy(() => recursiveRecord)])
export const recursiveRecord: z.ZodType<RecursiveRecord> = z.lazy(() =>
z.record(z.union([primitive, z.array(primitive), recursiveRecord]))
);
export type FlattenRecord = Record<PropertyKey, Primitive | Primitive[]>;
export const flattenRecord: z.ZodType<FlattenRecord> = z.record(
z.union([primitive, z.array(primitive)])
);

View file

@ -33,7 +33,7 @@ const processorBaseSchema = z.object({
ignore_failure: z.optional(z.boolean()),
});
export const grokProcessorDefinitionSchema: z.Schema<GrokProcessorDefinition> = z.strictObject({
export const grokProcessorDefinitionSchema = z.strictObject({
grok: z.intersection(
processorBaseSchema,
z.object({
@ -43,7 +43,7 @@ export const grokProcessorDefinitionSchema: z.Schema<GrokProcessorDefinition> =
ignore_missing: z.optional(z.boolean()),
})
),
});
}) satisfies z.Schema<GrokProcessorDefinition>;
export interface DissectProcessorConfig extends ProcessorBase {
field: string;
@ -56,20 +56,20 @@ export interface DissectProcessorDefinition {
dissect: DissectProcessorConfig;
}
export const dissectProcessorDefinitionSchema: z.Schema<DissectProcessorDefinition> =
z.strictObject({
dissect: z.intersection(
processorBaseSchema,
z.object({
field: NonEmptyString,
pattern: NonEmptyString,
append_separator: z.optional(NonEmptyString),
ignore_missing: z.optional(z.boolean()),
})
),
});
export const dissectProcessorDefinitionSchema = z.strictObject({
dissect: z.intersection(
processorBaseSchema,
z.object({
field: NonEmptyString,
pattern: NonEmptyString,
append_separator: z.optional(NonEmptyString),
ignore_missing: z.optional(z.boolean()),
})
),
}) satisfies z.Schema<DissectProcessorDefinition>;
export type ProcessorDefinition = DissectProcessorDefinition | GrokProcessorDefinition;
export type ProcessorDefinitionWithId = ProcessorDefinition & { id: string };
type UnionKeysOf<T extends Record<string, any>> = T extends T ? keyof T : never;
type BodyOf<T extends Record<string, any>> = T extends T ? T[keyof T] : never;
@ -86,6 +86,11 @@ export const processorDefinitionSchema: z.ZodType<ProcessorDefinition> = z.union
dissectProcessorDefinitionSchema,
]);
export const processorWithIdDefinitionSchema: z.ZodType<ProcessorDefinitionWithId> = z.union([
grokProcessorDefinitionSchema.merge(z.object({ id: z.string() })),
dissectProcessorDefinitionSchema.merge(z.object({ id: z.string() })),
]);
export const isGrokProcessorDefinition = createIsNarrowSchema(
processorDefinitionSchema,
grokProcessorDefinitionSchema

View file

@ -1,15 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StatusError } from './status_error';
export class NonAdditiveProcessorError extends StatusError {
constructor(message: string) {
super(message, 400);
this.name = 'NonAdditiveProcessorError';
}
}

View file

@ -1,22 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { errors } from '@elastic/elasticsearch';
import { StatusError } from './status_error';
export class SimulationFailedError extends StatusError {
constructor(error: errors.ResponseError) {
super(
error.body?.error?.reason ||
error.body?.error?.caused_by?.reason ||
error.message ||
'Unknown error',
error.statusCode ?? 500
);
this.name = 'SimulationFailedError';
}
}

View file

@ -6,10 +6,10 @@
*/
import {
InheritedFieldDefinition,
StreamGetResponse,
WiredStreamGetResponse,
findInheritedLifecycle,
getInheritedFieldsFromAncestors,
isGroupStreamDefinition,
isUnwiredStreamDefinition,
} from '@kbn/streams-schema';
@ -79,12 +79,7 @@ export async function readStream({
stream: streamDefinition,
dashboards,
effective_lifecycle: findInheritedLifecycle(streamDefinition, ancestors),
inherited_fields: ancestors.reduce((acc, def) => {
Object.entries(def.ingest.wired.fields).forEach(([key, fieldDef]) => {
acc[key] = { ...fieldDef, from: def.name };
});
return acc;
}, {} as InheritedFieldDefinition),
inherited_fields: getInheritedFieldsFromAncestors(ancestors),
};
return body;

View file

@ -5,37 +5,25 @@
* 2.0.
*/
/* eslint-disable @typescript-eslint/naming-convention */
import { IScopedClusterClient } from '@kbn/core/server';
import { calculateObjectDiff, flattenObject } from '@kbn/object-utils';
import {
FieldDefinitionConfig,
RecursiveRecord,
flattenRecord,
namedFieldDefinitionConfigSchema,
processorDefinitionSchema,
recursiveRecord,
processorWithIdDefinitionSchema,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { isEmpty } from 'lodash';
import { formatToIngestProcessors } from '../../../lib/streams/helpers/processing';
import { checkAccess } from '../../../lib/streams/stream_crud';
import { createServerRoute } from '../../create_server_route';
import { DefinitionNotFoundError } from '../../../lib/streams/errors/definition_not_found_error';
import { SimulationFailedError } from '../../../lib/streams/errors/simulation_failed_error';
import { DetectedMappingFailureError } from '../../../lib/streams/errors/detected_mapping_failure_error';
import { NonAdditiveProcessorError } from '../../../lib/streams/errors/non_additive_processor_error';
import { ProcessingSimulationParams, simulateProcessing } from './simulation_handler';
const paramsSchema = z.object({
path: z.object({ name: z.string() }),
body: z.object({
processing: z.array(processorDefinitionSchema),
documents: z.array(recursiveRecord),
processing: z.array(processorWithIdDefinitionSchema),
documents: z.array(flattenRecord),
detected_fields: z.array(namedFieldDefinitionConfigSchema).optional(),
}),
});
type ProcessingSimulateParams = z.infer<typeof paramsSchema>;
}) satisfies z.Schema<ProcessingSimulationParams>;
export const simulateProcessorRoute = createServerRoute({
endpoint: 'POST /api/streams/{name}/processing/_simulate',
@ -51,218 +39,17 @@ export const simulateProcessorRoute = createServerRoute({
},
params: paramsSchema,
handler: async ({ params, request, getScopedClients }) => {
const { scopedClusterClient } = await getScopedClients({ request });
const { scopedClusterClient, streamsClient } = await getScopedClients({ request });
const { read } = await checkAccess({ name: params.path.name, scopedClusterClient });
if (!read) {
throw new DefinitionNotFoundError(`Stream definition for ${params.path.name} not found.`);
}
const simulationBody = prepareSimulationBody(params);
const simulationResult = await executeSimulation(scopedClusterClient, simulationBody);
const simulationDiffs = prepareSimulationDiffs(simulationResult, simulationBody.docs);
assertSimulationResult(simulationResult, simulationDiffs);
return prepareSimulationResponse(
simulationResult,
simulationBody.docs,
simulationDiffs,
params.body.detected_fields
);
return simulateProcessing({ params, scopedClusterClient, streamsClient });
},
});
const prepareSimulationBody = (params: ProcessingSimulateParams) => {
const { path, body } = params;
const { processing, documents, detected_fields } = body;
const processors = formatToIngestProcessors(processing);
const docs = documents.map((doc, id) => ({
_index: path.name,
_id: id.toString(),
_source: doc,
}));
const simulationBody: any = {
docs,
pipeline_substitutions: {
[`${path.name}@stream.processing`]: {
processors,
},
},
};
if (detected_fields) {
const properties = computeMappingProperties(detected_fields);
simulationBody.component_template_substitutions = {
[`${path.name}@stream.layer`]: {
template: {
mappings: {
properties,
},
},
},
};
}
return simulationBody;
};
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const executeSimulation = async (
scopedClusterClient: IScopedClusterClient,
simulationBody: ReturnType<typeof prepareSimulationBody>
): Promise<any> => {
try {
// TODO: We should be using scopedClusterClient.asCurrentUser.simulate.ingest() once Kibana updates to elasticsearch-js 8.17
return await scopedClusterClient.asCurrentUser.transport.request({
method: 'POST',
path: `_ingest/_simulate`,
body: simulationBody,
});
} catch (error) {
throw new SimulationFailedError(error);
}
};
const assertSimulationResult = (
simulationResult: Awaited<ReturnType<typeof executeSimulation>>,
simulationDiffs: ReturnType<typeof prepareSimulationDiffs>
) => {
// Assert mappings are compatible with the documents
const entryWithError = simulationResult.docs.find(isMappingFailure);
if (entryWithError) {
throw new DetectedMappingFailureError(
`The detected field types might not be compatible with these documents. ${entryWithError.doc.error.reason}`
);
}
// Assert that the processors are purely additive to the documents
const updatedFields = computeUpdatedFields(simulationDiffs);
if (!isEmpty(updatedFields)) {
throw new NonAdditiveProcessorError(
`The processor is not additive to the documents. It might update fields [${updatedFields.join()}]`
);
}
};
const prepareSimulationResponse = (
simulationResult: any,
docs: Array<{ _source: RecursiveRecord }>,
simulationDiffs: ReturnType<typeof prepareSimulationDiffs>,
detectedFields?: ProcessingSimulateParams['body']['detected_fields']
) => {
const confirmedValidDetectedFields = computeMappingProperties(detectedFields ?? []);
const documents = computeSimulationDocuments(simulationResult, docs);
const detectedFieldsResult = computeDetectedFields(simulationDiffs, confirmedValidDetectedFields);
const successRate = computeSuccessRate(simulationResult);
const failureRate = 1 - successRate;
return {
documents,
success_rate: parseFloat(successRate.toFixed(2)),
failure_rate: parseFloat(failureRate.toFixed(2)),
detected_fields: detectedFieldsResult,
};
};
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const prepareSimulationDiffs = (
simulation: any,
sampleDocs: Array<{ _source: RecursiveRecord }>
) => {
// Since we filter out failed documents, we need to map the simulation docs to the sample docs for later retrieval
const samplesToSimulationMap = new Map<any, { _source: RecursiveRecord }>(
simulation.docs.map((entry: any, id: number) => [entry.doc, sampleDocs[id]])
);
const diffs = simulation.docs.filter(isSuccessfulDocument).map((entry: any) => {
const sample = samplesToSimulationMap.get(entry.doc);
if (sample) {
return calculateObjectDiff(sample._source, entry.doc._source);
}
return calculateObjectDiff({});
});
return diffs;
};
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const computeUpdatedFields = (simulationDiff: ReturnType<typeof prepareSimulationDiffs>) => {
const diffs = simulationDiff
.map((simulatedDoc: any) => flattenObject(simulatedDoc.updated))
.flatMap(Object.keys);
const uniqueFields = [...new Set(diffs)];
return uniqueFields;
};
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const computeSimulationDocuments = (
simulation: any,
sampleDocs: Array<{ _source: RecursiveRecord }>
): Array<{ isMatch: boolean; value: RecursiveRecord }> => {
return simulation.docs.map((entry: any, id: number) => {
// If every processor was successful, return and flatten the simulation doc from the last processor
if (isSuccessfulDocument(entry)) {
return {
value: flattenObject(entry.doc._source ?? sampleDocs[id]._source),
isMatch: true,
};
}
return {
value: flattenObject(sampleDocs[id]._source),
isMatch: false,
};
});
};
const computeDetectedFields = (
simulationDiff: ReturnType<typeof prepareSimulationDiffs>,
confirmedValidDetectedFields: Record<string, { type: FieldDefinitionConfig['type'] | 'unmapped' }>
): Array<{
name: string;
type: FieldDefinitionConfig['type'] | 'unmapped';
}> => {
const diffs: string[] = simulationDiff
.map((simulatedDoc: any) => flattenObject(simulatedDoc.added))
.flatMap(Object.keys);
const uniqueFields = [...new Set(diffs)];
return uniqueFields.map((name: string) => ({
name,
type: confirmedValidDetectedFields[name]?.type || 'unmapped',
}));
};
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const computeSuccessRate = (simulation: any) => {
const successfulCount = simulation.docs.reduce((rate: number, entry: any) => {
return (rate += isSuccessfulDocument(entry) ? 1 : 0);
}, 0);
return successfulCount / simulation.docs.length;
};
const computeMappingProperties = (
detectedFields: NonNullable<ProcessingSimulateParams['body']['detected_fields']>
) => {
return Object.fromEntries(detectedFields.map(({ name, type }) => [name, { type }]));
};
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const isSuccessfulDocument = (entry: any) => entry.doc.error === undefined;
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const isMappingFailure = (entry: any) =>
!isSuccessfulDocument(entry) && entry.doc.error.type === 'document_parsing_exception';
export const processingRoutes = {
...simulateProcessorRoute,
};

View file

@ -0,0 +1,616 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
/* eslint-disable @typescript-eslint/naming-convention */
import { errors as esErrors } from '@elastic/elasticsearch';
import {
IngestSimulateDocument,
IngestProcessorContainer,
IngestSimulateRequest,
IngestPipelineConfig,
ClusterComponentTemplateNode,
ErrorCauseKeys,
IngestSimulatePipelineSimulation,
IngestSimulateSimulateDocumentResult,
} from '@elastic/elasticsearch/lib/api/types';
import { IScopedClusterClient } from '@kbn/core/server';
import { flattenObjectNestedLast, calculateObjectDiff } from '@kbn/object-utils';
import {
FlattenRecord,
ProcessorDefinitionWithId,
getProcessorType,
ProcessorDefinition,
isWiredStreamDefinition,
getInheritedFieldsFromAncestors,
NamedFieldDefinitionConfig,
FieldDefinitionConfig,
InheritedFieldDefinitionConfig,
} from '@kbn/streams-schema';
import { mapValues, uniq, omit, isEmpty, uniqBy, some } from 'lodash';
import { StreamsClient } from '../../../lib/streams/client';
import { DetectedMappingFailureError } from '../../../lib/streams/errors/detected_mapping_failure_error';
import { formatToIngestProcessors } from '../../../lib/streams/helpers/processing';
export interface ProcessingSimulationParams {
path: {
name: string;
};
body: {
processing: ProcessorDefinitionWithId[];
documents: FlattenRecord[];
detected_fields?: NamedFieldDefinitionConfig[];
};
}
export interface SimulateProcessingDeps {
params: ProcessingSimulationParams;
scopedClusterClient: IScopedClusterClient;
streamsClient: StreamsClient;
}
export interface SimulationError {
message: string;
processor_id: string;
type:
| 'generic_processor_failure'
| 'generic_simulation_failure'
| 'non_additive_processor_failure';
}
export type DocSimulationStatus = 'parsed' | 'partially_parsed' | 'failed';
export interface SimulationDocReport {
detected_fields: Array<{ processor_id: string; name: string }>;
errors: SimulationError[];
status: DocSimulationStatus;
value: FlattenRecord;
}
export interface ProcessorMetrics {
detected_fields: string[];
errors: SimulationError[];
failure_rate: number;
success_rate: number;
}
// Narrow down the type to only successful processor results
export type SuccessfulIngestSimulateDocumentResult = WithRequired<
IngestSimulateSimulateDocumentResult,
'processor_results'
>;
export interface SuccessfulIngestSimulateResponse {
docs: SuccessfulIngestSimulateDocumentResult[];
}
export type PipelineSimulationResult =
| {
status: 'success';
simulation: SuccessfulIngestSimulateResponse;
}
| {
status: 'failure';
error: SimulationError;
};
export type DetectedField =
| WithName
| WithName<FieldDefinitionConfig | InheritedFieldDefinitionConfig>;
export type WithName<TObj = {}> = TObj & { name: string };
export type WithRequired<TObj, TKey extends keyof TObj> = TObj & { [TProp in TKey]-?: TObj[TProp] };
export const simulateProcessing = async ({
params,
scopedClusterClient,
streamsClient,
}: SimulateProcessingDeps) => {
/* 1. Prepare data for either simulation types (ingest, pipeline), prepare simulation body for the mandatory pipeline simulation */
const simulationData = prepareSimulationData(params);
const pipelineSimulationBody = preparePipelineSimulationBody(simulationData);
/**
* 2. Run both pipeline and ingest simulations in parallel.
* - The pipeline simulation is used to extract the documents reports and the processor metrics. This always runs.
* - The ingest simulation is used to fail fast on mapping failures. This runs only if `detected_fields` is provided.
*/
const [pipelineSimulationResult] = await Promise.all([
executePipelineSimulation(scopedClusterClient, pipelineSimulationBody),
conditionallyExecuteIngestSimulation(scopedClusterClient, simulationData, params),
]);
/* 3. Fail fast on pipeline simulation errors and return the generic error response gracefully */
if (pipelineSimulationResult.status === 'failure') {
return prepareSimulationFailureResponse(pipelineSimulationResult.error);
}
/* 4. Extract all the documents reports and processor metrics from the pipeline simulation */
const { docReports, processorsMetrics } = computePipelineSimulationResult(
pipelineSimulationResult.simulation,
simulationData.docs,
params.body.processing
);
/* 5. Extract valid detected fields asserting existing mapped fields from stream and ancestors */
const detectedFields = await computeDetectedFields(processorsMetrics, streamsClient, params);
/* 6. Derive general insights and process final response body */
return prepareSimulationResponse(docReports, processorsMetrics, detectedFields);
};
const prepareSimulationDocs = (
documents: FlattenRecord[],
streamName: string
): IngestSimulateDocument[] => {
return documents.map((doc, id) => ({
_index: streamName,
_id: id.toString(),
_source: doc,
}));
};
const prepareSimulationProcessors = (
processing: ProcessorDefinitionWithId[]
): IngestProcessorContainer[] => {
//
/**
* We want to simulate processors logic and collect data indipendently from the user config for simulation purposes.
* 1. Force each processor to not ignore failures to collect all errors
* 2. Append the error message to the `_errors` field on failure
*/
const processors = processing.map((processor) => {
const { id, ...processorConfig } = processor;
const type = getProcessorType(processorConfig);
return {
[type]: {
...(processorConfig as any)[type], // Safe to use any here due to type structure
ignore_failure: false,
tag: id,
on_failure: [
{
append: {
field: '_errors',
value: {
message: '{{{ _ingest.on_failure_message }}}',
processor_id: '{{{ _ingest.on_failure_processor_tag }}}',
type: 'generic_processor_failure',
},
},
},
],
},
} as ProcessorDefinition;
});
return formatToIngestProcessors(processors);
};
const prepareSimulationData = (params: ProcessingSimulationParams) => {
const { path, body } = params;
const { processing, documents } = body;
return {
docs: prepareSimulationDocs(documents, path.name),
processors: prepareSimulationProcessors(processing),
};
};
const preparePipelineSimulationBody = (
simulationData: ReturnType<typeof prepareSimulationData>
): IngestSimulateRequest => {
const { docs, processors } = simulationData;
return {
docs,
pipeline: { processors },
verbose: true,
};
};
const prepareIngestSimulationBody = (
simulationData: ReturnType<typeof prepareSimulationData>,
params: ProcessingSimulationParams
) => {
const { path, body } = params;
const { detected_fields } = body;
const { docs, processors } = simulationData;
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const simulationBody: {
docs: IngestSimulateDocument[];
pipeline_substitutions: Record<string, IngestPipelineConfig>;
component_template_substitutions?: Record<string, ClusterComponentTemplateNode>;
} = {
docs,
pipeline_substitutions: {
[`${path.name}@stream.processing`]: {
processors,
},
},
};
if (detected_fields) {
const properties = computeMappingProperties(detected_fields);
simulationBody.component_template_substitutions = {
[`${path.name}@stream.layer`]: {
template: {
mappings: {
properties,
},
},
},
};
}
return simulationBody;
};
/**
* When running a pipeline simulation, we want to fail fast on syntax failures, such as grok patterns.
* If the simulation fails, we won't be able to extract the documents reports and the processor metrics.
* In case any other error occurs, we delegate the error handling to currently in draft processor.
*/
const executePipelineSimulation = async (
scopedClusterClient: IScopedClusterClient,
simulationBody: IngestSimulateRequest
): Promise<PipelineSimulationResult> => {
try {
const simulation = await scopedClusterClient.asCurrentUser.ingest.simulate(simulationBody);
return {
status: 'success',
simulation: simulation as SuccessfulIngestSimulateResponse,
};
} catch (error) {
if (error instanceof esErrors.ResponseError) {
const { processor_tag, reason } = error.body?.error;
return {
status: 'failure',
error: {
message: reason,
processor_id: processor_tag,
type: 'generic_simulation_failure',
},
};
}
return {
status: 'failure',
error: {
message: error.message,
processor_id: 'draft',
type: 'generic_simulation_failure',
},
};
}
};
// TODO: update type to built-in once Kibana updates to elasticsearch-js 8.17
interface IngestSimulationResult {
docs: Array<{ doc: IngestSimulateDocument & { error?: ErrorCauseKeys } }>;
}
const conditionallyExecuteIngestSimulation = async (
scopedClusterClient: IScopedClusterClient,
simulationData: ReturnType<typeof prepareSimulationData>,
params: ProcessingSimulationParams
): Promise<IngestSimulationResult | null> => {
if (!params.body.detected_fields) return null;
const simulationBody = prepareIngestSimulationBody(simulationData, params);
let simulationResult: IngestSimulationResult;
try {
// TODO: We should be using scopedClusterClient.asCurrentUser.simulate.ingest() once Kibana updates to elasticsearch-js 8.17
simulationResult = await scopedClusterClient.asCurrentUser.transport.request({
method: 'POST',
path: `_ingest/_simulate`,
body: simulationBody,
});
} catch (error) {
// To prevent a race condition on simulation erros, this return early and delegates the error handling to the pipeline simulation
return null;
}
const entryWithError = simulationResult.docs.find(isMappingFailure);
if (entryWithError) {
throw new DetectedMappingFailureError(
`The detected field types might not be compatible with these documents. ${entryWithError.doc.error?.reason}`
);
}
return simulationResult;
};
/**
* Computing simulation insights for each document and processor takes a few steps:
* 1. Extract the last document source and the status of the simulation.
* 2. Compute the diff between the sample document and the simulation document to detect added fields and non-additive changes.
* 3. Track the detected fields and errors for each processor.
*
* To keep this process at the O(n) complexity, we iterate over the documents and processors only once.
* This requires a closure on the processor metrics map to keep track of the processor state while iterating over the documents.
*/
const computePipelineSimulationResult = (
simulationResult: SuccessfulIngestSimulateResponse,
sampleDocs: Array<{ _source: FlattenRecord }>,
processing: ProcessorDefinitionWithId[]
): {
docReports: SimulationDocReport[];
processorsMetrics: Record<string, ProcessorMetrics>;
} => {
const processorsMap = initProcessorMetricsMap(processing);
const docReports = simulationResult.docs.map((docResult, id) => {
const { errors, status, value } = getLastDoc(docResult);
const diff = computeSimulationDocDiff(docResult, sampleDocs[id]._source);
diff.detected_fields.forEach(({ processor_id, name }) => {
processorsMap[processor_id].detected_fields.push(name);
});
errors.push(...diff.errors);
errors.forEach((error) => {
const procId = error.processor_id;
processorsMap[procId].errors.push(error);
processorsMap[procId].failure_rate++;
});
return {
detected_fields: diff.detected_fields,
errors,
status,
value,
};
});
const processorsMetrics = extractProcessorMetrics({
processorsMap,
sampleSize: docReports.length,
});
return { docReports, processorsMetrics };
};
const initProcessorMetricsMap = (
processing: ProcessorDefinitionWithId[]
): Record<string, ProcessorMetrics> => {
const processorMetricsEntries = processing.map((processor) => [
processor.id,
{
detected_fields: [],
errors: [],
failure_rate: 0,
success_rate: 1,
},
]);
return Object.fromEntries(processorMetricsEntries);
};
const extractProcessorMetrics = ({
processorsMap,
sampleSize,
}: {
processorsMap: Record<string, ProcessorMetrics>;
sampleSize: number;
}) => {
return mapValues(processorsMap, (metrics) => {
const failureRate = metrics.failure_rate / sampleSize;
const successRate = 1 - failureRate;
const detected_fields = uniq(metrics.detected_fields);
const errors = uniqBy(metrics.errors, (error) => error.message);
return {
detected_fields,
errors,
failure_rate: parseFloat(failureRate.toFixed(2)),
success_rate: parseFloat(successRate.toFixed(2)),
};
});
};
const getDocumentStatus = (doc: SuccessfulIngestSimulateDocumentResult): DocSimulationStatus => {
if (doc.processor_results.every(isSuccessfulProcessor)) return 'parsed';
if (doc.processor_results.some(isSuccessfulProcessor)) return 'partially_parsed';
return 'failed';
};
const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult) => {
const status = getDocumentStatus(docResult);
const lastDocSource = docResult.processor_results.at(-1)?.doc?._source ?? {};
if (status === 'parsed') {
return {
value: flattenObjectNestedLast(lastDocSource),
errors: [] as SimulationError[],
status,
};
} else {
const { _errors, ...value } = lastDocSource;
return { value: flattenObjectNestedLast(value), errors: _errors as SimulationError[], status };
}
};
/**
* Computing how a simulation document differs from the sample document is not enough
* to determine if the processor fails on additive changes.
* To improve tracking down the errors and the fields detection to the individual processor,
* this function computes the detected fields and the errors for each processor.
*/
const computeSimulationDocDiff = (
docResult: SuccessfulIngestSimulateDocumentResult,
sample: FlattenRecord
) => {
// Keep only the successful processors defined from the user, skipping the on_failure processors from the simulation
const successfulProcessors = docResult.processor_results.filter(isSuccessfulProcessor);
const comparisonDocs = [
{ processor_id: 'sample', value: sample },
...successfulProcessors.map((proc) => ({
processor_id: proc.tag,
value: omit(proc.doc._source, ['_errors']),
})),
];
const diffResult: Pick<SimulationDocReport, 'detected_fields' | 'errors'> = {
detected_fields: [],
errors: [],
};
// Compare each document outcome with the previous one, flattening for standard comparison and detecting added/udpated fields.
// When updated fields are detected compared to the original document, the processor is not additive to the documents, and an error is added to the diff result.
while (comparisonDocs.length > 1) {
const currentDoc = comparisonDocs.shift()!; // Safe to use ! here since we check the length
const nextDoc = comparisonDocs[0];
const { added, updated } = calculateObjectDiff(
flattenObjectNestedLast(currentDoc.value),
flattenObjectNestedLast(nextDoc.value)
);
const addedFields = Object.keys(flattenObjectNestedLast(added));
const updatedFields = Object.keys(flattenObjectNestedLast(updated));
// Sort list to have deterministic list of results
const processorDetectedFields = [...addedFields, ...updatedFields].sort().map((name) => ({
processor_id: nextDoc.processor_id,
name,
}));
diffResult.detected_fields.push(...processorDetectedFields);
// We might have updated fields that are not present in the original document because are generated by the previous processors.
// We exclude them from the list of fields that make the processor non-additive.
const originalUpdatedFields = updatedFields.filter((field) => field in sample);
if (!isEmpty(originalUpdatedFields)) {
diffResult.errors.push({
processor_id: nextDoc.processor_id,
type: 'non_additive_processor_failure',
message: `The processor is not additive to the documents. It might update fields [${originalUpdatedFields.join()}]`,
});
}
}
return diffResult;
};
const prepareSimulationResponse = async (
docReports: SimulationDocReport[],
processorsMetrics: Record<string, ProcessorMetrics>,
detectedFields: DetectedField[]
) => {
const successRate = computeSuccessRate(docReports);
const failureRate = 1 - successRate;
const isNotAdditiveSimulation = some(processorsMetrics, (metrics) =>
metrics.errors.some(isNonAdditiveSimulationError)
);
return {
detected_fields: detectedFields,
documents: docReports,
processors_metrics: processorsMetrics,
failure_rate: parseFloat(failureRate.toFixed(2)),
success_rate: parseFloat(successRate.toFixed(2)),
is_non_additive_simulation: isNotAdditiveSimulation,
};
};
const prepareSimulationFailureResponse = (error: SimulationError) => {
return {
detected_fields: [],
documents: [],
processors_metrics: {
[error.processor_id]: {
detected_fields: [],
errors: [error],
failure_rate: 1,
success_rate: 0,
},
},
failure_rate: 1,
success_rate: 0,
is_non_additive_simulation: isNonAdditiveSimulationError(error),
};
};
const getStreamFields = async (streamsClient: StreamsClient, streamName: string) => {
const [stream, ancestors] = await Promise.all([
streamsClient.getStream(streamName),
streamsClient.getAncestors(streamName),
]);
if (isWiredStreamDefinition(stream)) {
return { ...stream.ingest.wired.fields, ...getInheritedFieldsFromAncestors(ancestors) };
}
return {};
};
/**
* In case new fields have been detected, we want to tell the user which ones are inherited and already mapped.
*/
const computeDetectedFields = async (
processorsMetrics: Record<string, ProcessorMetrics>,
streamsClient: StreamsClient,
params: ProcessingSimulationParams
): Promise<DetectedField[]> => {
const streamName = params.path.name;
const fields = Object.values(processorsMetrics).flatMap((metrics) => metrics.detected_fields);
const uniqueFields = uniq(fields);
// Short-circuit to avoid fetching streams fields if none is detected
if (isEmpty(uniqueFields)) {
return [];
}
const streamFields = await getStreamFields(streamsClient, streamName);
const confirmedValidDetectedFields = computeMappingProperties(params.body.detected_fields ?? []);
return uniqueFields.map((name) => {
const existingField = streamFields[name];
if (existingField) {
return { name, ...existingField };
}
return { name, type: confirmedValidDetectedFields[name]?.type };
});
};
const computeSuccessRate = (docs: SimulationDocReport[]) => {
const successfulCount = docs.reduce((rate, doc) => (rate += doc.status === 'parsed' ? 1 : 0), 0);
return successfulCount / docs.length;
};
const computeMappingProperties = (detectedFields: NamedFieldDefinitionConfig[]) => {
return Object.fromEntries(detectedFields.map(({ name, type }) => [name, { type }]));
};
/**
* Guard helpers
*/
const isSuccessfulProcessor = (
processor: IngestSimulatePipelineSimulation
): processor is WithRequired<IngestSimulatePipelineSimulation, 'doc' | 'tag'> =>
processor.status === 'success' && !!processor.tag;
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const isMappingFailure = (entry: any) => entry.doc?.error?.type === 'document_parsing_exception';
const isNonAdditiveSimulationError = (error: SimulationError) =>
error.type === 'non_additive_processor_failure';

View file

@ -12,15 +12,15 @@ import { useState } from 'react';
const imageSets = {
welcome: {
light: import('./welcome_light.png'),
dark: import('./welcome_dark.png'),
light: () => import('./welcome_light.png'),
dark: () => import('./welcome_dark.png'),
alt: i18n.translate('xpack.streams.streamDetailView.welcomeImage', {
defaultMessage: 'Welcome image for the streams app',
}),
},
noResults: {
light: import('./no_results_light.png'),
dark: import('./no_results_dark.png'),
light: () => import('./no_results_light.png'),
dark: () => import('./no_results_dark.png'),
alt: i18n.translate('xpack.streams.streamDetailView.noResultsImage', {
defaultMessage: 'No results image for the streams app',
}),
@ -38,7 +38,7 @@ export function AssetImage({ type = 'welcome', ...props }: AssetImageProps) {
const [imageSrc, setImageSrc] = useState<string>();
useEffect(() => {
const dynamicImageImport = colorMode === 'LIGHT' ? light : dark;
const dynamicImageImport = colorMode === 'LIGHT' ? light() : dark();
dynamicImageImport.then((module) => setImageSrc(module.default));
}, [colorMode, dark, light]);

View file

@ -6,12 +6,13 @@
*/
import React from 'react';
import { EuiButton, EuiButtonEmpty, EuiFlexGroup } from '@elastic/eui';
import { EuiButton, EuiButtonEmpty, EuiFlexGroup, EuiToolTip, EuiToolTipProps } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { useDiscardConfirm } from '../../hooks/use_discard_confirm';
interface ManagementBottomBarProps {
confirmButtonText?: string;
confirmTooltip?: Partial<EuiToolTipProps>;
disabled?: boolean;
isLoading?: boolean;
onCancel: () => void;
@ -20,6 +21,7 @@ interface ManagementBottomBarProps {
export function ManagementBottomBar({
confirmButtonText = defaultConfirmButtonText,
confirmTooltip,
disabled = false,
isLoading = false,
onCancel,
@ -32,6 +34,27 @@ export function ManagementBottomBar({
cancelButtonText: keepEditingLabel,
});
const confirmButtonContent = (
<EuiButton
data-test-subj="streamsAppManagementBottomBarButton"
disabled={disabled}
color="primary"
fill
size="s"
iconType="check"
onClick={onConfirm}
isLoading={isLoading}
>
{confirmButtonText}
</EuiButton>
);
const confirmButton = confirmTooltip ? (
<EuiToolTip {...confirmTooltip}>{confirmButtonContent}</EuiToolTip>
) : (
confirmButtonContent
);
return (
<EuiFlexGroup justifyContent="flexEnd" alignItems="center" responsive={false} gutterSize="s">
<EuiButtonEmpty
@ -40,24 +63,12 @@ export function ManagementBottomBar({
size="s"
iconType="cross"
onClick={handleCancel}
disabled={disabled}
>
{i18n.translate('xpack.streams.streamDetailView.managementTab.bottomBar.cancel', {
defaultMessage: 'Cancel changes',
})}
</EuiButtonEmpty>
<EuiButton
data-test-subj="streamsAppManagementBottomBarButton"
disabled={disabled}
color="primary"
fill
size="s"
iconType="check"
onClick={onConfirm}
isLoading={isLoading}
>
{confirmButtonText}
</EuiButton>
{confirmButton}
</EuiFlexGroup>
);
}

View file

@ -15,7 +15,11 @@ import { useStreamsAppParams } from '../../hooks/use_streams_app_params';
export function RedirectTo<
TPath extends PathsOf<StreamsAppRoutes>,
TParams extends TypeOf<StreamsAppRoutes, TPath, false>
>({ path, params }: { path: TPath; params?: DeepPartial<TParams> }) {
>({
children,
path,
params,
}: React.PropsWithChildren<{ path: TPath; params?: DeepPartial<TParams> }>) {
const router = useStreamsAppRouter();
const currentParams = useStreamsAppParams('/*');
useLayoutEffect(() => {
@ -23,5 +27,5 @@ export function RedirectTo<
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
return <></>;
return children ?? null;
}

View file

@ -209,7 +209,7 @@ const mergeFields = (
if (
!(field.name in currentFields) &&
!(field.name in definition.inherited_fields) &&
field.type !== 'unmapped'
field.type !== undefined
) {
acc[field.name] = { type: field.type };
}

View file

@ -14,17 +14,20 @@ import {
Condition,
processorDefinitionSchema,
isSchema,
RecursiveRecord,
FlattenRecord,
} from '@kbn/streams-schema';
import { IHttpFetchError, ResponseErrorBody } from '@kbn/core/public';
import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range';
import { APIReturnType } from '@kbn/streams-plugin/public/api';
import { flattenObjectNestedLast } from '@kbn/object-utils';
import { useStreamsAppFetch } from '../../../hooks/use_streams_app_fetch';
import { useKibana } from '../../../hooks/use_kibana';
import { DetectedField, ProcessorDefinitionWithUIAttributes } from '../types';
import { processorConverter } from '../utils';
type Simulation = APIReturnType<'POST /api/streams/{name}/processing/_simulate'>;
export type Simulation = APIReturnType<'POST /api/streams/{name}/processing/_simulate'>;
export type ProcessorMetrics =
Simulation['processors_metrics'][keyof Simulation['processors_metrics']];
export interface TableColumn {
name: string;
@ -40,7 +43,7 @@ export interface UseProcessingSimulatorReturn {
hasLiveChanges: boolean;
error?: IHttpFetchError<ResponseErrorBody>;
isLoading: boolean;
samples: RecursiveRecord[];
samples: FlattenRecord[];
simulation?: Simulation | null;
tableColumns: TableColumn[];
refreshSamples: () => void;
@ -105,7 +108,7 @@ export const useProcessingSimulator = ({
});
}
},
500
800
),
[]
);
@ -117,15 +120,15 @@ export const useProcessingSimulator = ({
const {
loading: isLoadingSamples,
value: samples,
value: sampleDocs,
refresh: refreshSamples,
} = useStreamsAppFetch(
({ signal }) => {
async ({ signal }) => {
if (!definition) {
return { documents: [] };
return [];
}
return streamsRepositoryClient.fetch('POST /api/streams/{name}/_sample', {
const samplesBody = await streamsRepositoryClient.fetch('POST /api/streams/{name}/_sample', {
signal,
params: {
path: { name: definition.stream.name },
@ -137,21 +140,22 @@ export const useProcessingSimulator = ({
},
},
});
return samplesBody.documents.map((doc) => flattenObjectNestedLast(doc)) as FlattenRecord[];
},
[definition, streamsRepositoryClient, start, end, samplingCondition],
{ disableToastOnError: true }
);
const sampleDocs = samples?.documents;
const {
loading: isLoadingSimulation,
value: simulation,
error: simulationError,
} = useStreamsAppFetch(
({ signal }) => {
if (!definition || isEmpty<RecursiveRecord[]>(sampleDocs) || isEmpty(liveDraftProcessors)) {
return Promise.resolve(null);
({ signal }): Promise<Simulation> => {
if (!definition || isEmpty<FlattenRecord[]>(sampleDocs) || isEmpty(liveDraftProcessors)) {
// This is a hack to avoid losing the previous value of the simulation once the conditions are not met. The state management refactor will fix this.
return Promise.resolve(simulation!);
}
const processing = liveDraftProcessors.map(processorConverter.toAPIDefinition);
@ -162,7 +166,8 @@ export const useProcessingSimulator = ({
// Each processor should meet the minimum schema requirements to run the simulation
if (!hasValidProcessors) {
return Promise.resolve(null);
// This is a hack to avoid losing the previous value of the simulation once the conditions are not met. The state management refactor will fix this.
return Promise.resolve(simulation!);
}
return streamsRepositoryClient.fetch('POST /api/streams/{name}/processing/_simulate', {
@ -171,7 +176,7 @@ export const useProcessingSimulator = ({
path: { name: definition.stream.name },
body: {
documents: sampleDocs,
processing: liveDraftProcessors.map(processorConverter.toAPIDefinition),
processing: liveDraftProcessors.map(processorConverter.toSimulateDefinition),
},
},
});

View file

@ -82,20 +82,39 @@ export function StreamDetailEnrichmentContent({
return <RootStreamEmptyPrompt />;
}
const isNonAdditiveSimulation = simulation && simulation.is_non_additive_simulation;
const isSubmitDisabled = Boolean(!hasChanges || isNonAdditiveSimulation);
const confirmTooltip = isNonAdditiveSimulation
? {
title: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.nonAdditiveProcessorsTooltip.title',
{ defaultMessage: 'Non additive simulation detected' }
),
content: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.nonAdditiveProcessorsTooltip.content',
{
defaultMessage:
'We currently prevent adding processors that change/remove existing data. Please update your processor configurations to continue.',
}
),
}
: undefined;
return (
<EuiSplitPanel.Outer grow hasBorder hasShadow={false}>
<EuiSplitPanel.Inner
paddingSize="none"
css={css`
display: flex;
overflow: auto;
overflow: hidden auto;
`}
>
<EuiResizableContainer>
{(EuiResizablePanel, EuiResizableButton) => (
<>
<EuiResizablePanel
initialSize={25}
initialSize={30}
minSize="400px"
tabIndex={0}
paddingSize="none"
@ -109,13 +128,12 @@ export function StreamDetailEnrichmentContent({
onWatchProcessor={watchProcessor}
onAddProcessor={addProcessor}
onReorderProcessor={reorderProcessors}
simulation={simulation}
/>
</EuiResizablePanel>
<EuiResizableButton indicator="border" accountForScrollbars="both" />
<EuiResizablePanel
initialSize={75}
initialSize={70}
minSize="300px"
tabIndex={0}
paddingSize="s"
@ -136,10 +154,11 @@ export function StreamDetailEnrichmentContent({
</EuiSplitPanel.Inner>
<EuiSplitPanel.Inner grow={false} color="subdued">
<ManagementBottomBar
confirmTooltip={confirmTooltip}
onCancel={resetChanges}
onConfirm={saveChanges}
isLoading={isSavingChanges}
disabled={!hasChanges}
disabled={isSubmitDisabled}
/>
</EuiSplitPanel.Inner>
</EuiSplitPanel.Outer>
@ -154,6 +173,7 @@ interface ProcessorsEditorProps {
onReorderProcessor: UseDefinitionReturn['reorderProcessors'];
onUpdateProcessor: UseDefinitionReturn['updateProcessor'];
onWatchProcessor: UseProcessingSimulatorReturn['watchProcessor'];
simulation: UseProcessingSimulatorReturn['simulation'];
}
const ProcessorsEditor = React.memo(
@ -165,6 +185,7 @@ const ProcessorsEditor = React.memo(
onReorderProcessor,
onUpdateProcessor,
onWatchProcessor,
simulation,
}: ProcessorsEditorProps) => {
const { euiTheme } = useEuiTheme();
@ -228,6 +249,7 @@ const ProcessorsEditor = React.memo(
onDeleteProcessor={onDeleteProcessor}
onUpdateProcessor={onUpdateProcessor}
onWatchProcessor={onWatchProcessor}
processorMetrics={simulation?.processors_metrics[processor.id]}
/>
))}
</SortableList>
@ -237,6 +259,7 @@ const ProcessorsEditor = React.memo(
definition={definition}
onAddProcessor={onAddProcessor}
onWatchProcessor={onWatchProcessor}
processorMetrics={simulation?.processors_metrics.draft}
/>
</EuiPanel>
</>

View file

@ -18,7 +18,7 @@ import {
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { TimeRange } from '@kbn/es-query';
import { flattenObject } from '@kbn/object-utils';
import { flattenObjectNestedLast } from '@kbn/object-utils';
import { isEmpty } from 'lodash';
import { RecursiveRecord } from '@kbn/streams-schema';
import { useKibana } from '../../hooks/use_kibana';
@ -52,15 +52,15 @@ export const ProcessorOutcomePreview = ({
const simulationDocuments = useMemo(() => {
if (!simulation?.documents) {
return samples.map((doc) => flattenObject(doc)) as RecursiveRecord[];
return samples.map((doc) => flattenObjectNestedLast(doc)) as RecursiveRecord[];
}
const filterDocuments = (filter: DocsFilterOption) => {
switch (filter) {
case 'outcome_filter_matched':
return simulation.documents.filter((doc) => doc.isMatch);
return simulation.documents.filter((doc) => doc.status === 'parsed');
case 'outcome_filter_unmatched':
return simulation.documents.filter((doc) => !doc.isMatch);
return simulation.documents.filter((doc) => doc.status !== 'parsed');
case 'outcome_filter_all':
default:
return simulation.documents;

View file

@ -65,7 +65,7 @@ export const GrokPatternsEditor = () => {
{ defaultMessage: 'Grok patterns editor' }
)}
>
<EuiPanel color="subdued" paddingSize="s">
<EuiPanel color="subdued" paddingSize="none">
<SortableList onDragItem={handlerPatternDrag}>
{fieldsWithError.map((field, idx) => (
<DraggablePatternInput
@ -132,7 +132,7 @@ const DraggablePatternInput = ({
<EuiFlexGroup gutterSize="s" responsive={false} alignItems="center">
<EuiPanel
color="transparent"
paddingSize="xs"
paddingSize="s"
{...provided.dragHandleProps}
aria-label={i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.grokEditor.dragHandleLabel',

View file

@ -22,7 +22,7 @@ import {
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { ProcessorType, IngestStreamGetResponse } from '@kbn/streams-schema';
import { isEqual } from 'lodash';
import { isEmpty, isEqual } from 'lodash';
import React, { useEffect, useMemo, useState } from 'react';
import { useForm, SubmitHandler, FormProvider, useWatch } from 'react-hook-form';
import { css } from '@emotion/react';
@ -39,10 +39,12 @@ import {
} from '../utils';
import { useDiscardConfirm } from '../../../hooks/use_discard_confirm';
import { UseDefinitionReturn } from '../hooks/use_definition';
import { UseProcessingSimulatorReturn } from '../hooks/use_processing_simulator';
import { ProcessorMetrics, UseProcessingSimulatorReturn } from '../hooks/use_processing_simulator';
import { ProcessorErrors, ProcessorMetricBadges } from './processor_metrics';
export interface ProcessorPanelProps {
definition: IngestStreamGetResponse;
processorMetrics?: ProcessorMetrics;
onWatchProcessor: UseProcessingSimulatorReturn['watchProcessor'];
}
@ -57,7 +59,11 @@ export interface EditProcessorPanelProps extends ProcessorPanelProps {
onUpdateProcessor: UseDefinitionReturn['updateProcessor'];
}
export function AddProcessorPanel({ onAddProcessor, onWatchProcessor }: AddProcessorPanelProps) {
export function AddProcessorPanel({
onAddProcessor,
onWatchProcessor,
processorMetrics,
}: AddProcessorPanelProps) {
const { euiTheme } = useEuiTheme();
const [hasChanges, setHasChanges] = useState(false);
@ -151,6 +157,7 @@ export function AddProcessorPanel({ onAddProcessor, onWatchProcessor }: AddProce
<EuiButton
data-test-subj="streamsAppAddProcessorPanelAddProcessorButton"
size="s"
fill
onClick={methods.handleSubmit(handleSubmit)}
disabled={!methods.formState.isValid && methods.formState.isSubmitted}
>
@ -165,12 +172,16 @@ export function AddProcessorPanel({ onAddProcessor, onWatchProcessor }: AddProce
>
<EuiSpacer size="s" />
<FormProvider {...methods}>
<ProcessorMetricsHeader metrics={processorMetrics} />
<EuiForm component="form" fullWidth onSubmit={methods.handleSubmit(handleSubmit)}>
<ProcessorTypeSelector />
<EuiSpacer size="m" />
{type === 'grok' && <GrokProcessorForm />}
{type === 'dissect' && <DissectProcessorForm />}
</EuiForm>
{processorMetrics && !isEmpty(processorMetrics.errors) && (
<ProcessorErrors metrics={processorMetrics} />
)}
</FormProvider>
</EuiAccordion>
</EuiPanel>
@ -195,6 +206,7 @@ export function EditProcessorPanel({
onUpdateProcessor,
onWatchProcessor,
processor,
processorMetrics,
}: EditProcessorPanelProps) {
const { euiTheme } = useEuiTheme();
@ -307,6 +319,7 @@ export function EditProcessorPanel({
<EuiButton
data-test-subj="streamsAppEditProcessorPanelUpdateProcessorButton"
size="s"
fill
onClick={methods.handleSubmit(handleSubmit)}
disabled={!methods.formState.isValid}
>
@ -317,7 +330,8 @@ export function EditProcessorPanel({
</EuiButton>
</EuiFlexGroup>
) : (
<EuiFlexGroup alignItems="center" gutterSize="s">
<EuiFlexGroup alignItems="center" gutterSize="xs">
{processorMetrics && <ProcessorMetricBadges {...processorMetrics} />}
{isUnsaved && (
<EuiBadge>
{i18n.translate(
@ -343,6 +357,7 @@ export function EditProcessorPanel({
>
<EuiSpacer size="s" />
<FormProvider {...methods}>
<ProcessorMetricsHeader metrics={processorMetrics} />
<EuiForm component="form" fullWidth onSubmit={methods.handleSubmit(handleSubmit)}>
<ProcessorTypeSelector disabled />
<EuiSpacer size="m" />
@ -357,12 +372,26 @@ export function EditProcessorPanel({
{deleteProcessorLabel}
</EuiButton>
</EuiForm>
{processorMetrics && !isEmpty(processorMetrics.errors) && (
<ProcessorErrors metrics={processorMetrics} />
)}
</FormProvider>
</EuiAccordion>
</EuiPanel>
);
}
const ProcessorMetricsHeader = ({ metrics }: { metrics?: ProcessorMetrics }) => {
if (!metrics) return null;
return (
<>
<ProcessorMetricBadges {...metrics} />
<EuiSpacer size="m" />
</>
);
};
const deleteProcessorLabel = i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.deleteProcessorLabel',
{ defaultMessage: 'Delete processor' }

View file

@ -0,0 +1,161 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
/* eslint-disable @typescript-eslint/naming-convention */
import {
EuiBadge,
EuiBadgeGroup,
EuiButtonEmpty,
EuiCallOut,
EuiCallOutProps,
EuiFlexGroup,
useEuiTheme,
} from '@elastic/eui';
import React from 'react';
import { i18n } from '@kbn/i18n';
import useToggle from 'react-use/lib/useToggle';
import { css } from '@emotion/react';
import { ProcessorMetrics } from '../hooks/use_processing_simulator';
type ProcessorMetricBadgesProps = ProcessorMetrics;
const formatter = new Intl.NumberFormat('en-US', {
style: 'percent',
maximumFractionDigits: 0,
});
export const ProcessorMetricBadges = ({
detected_fields,
failure_rate,
success_rate,
}: ProcessorMetricBadgesProps) => {
const detectedFieldsCount = detected_fields.length;
const failureRate = failure_rate > 0 ? formatter.format(failure_rate) : null;
const successRate = success_rate > 0 ? formatter.format(success_rate) : null;
return (
<EuiBadgeGroup gutterSize="xs">
{failureRate && (
<EuiBadge
color="hollow"
iconType="warning"
title={i18n.translate('xpack.streams.processorMetricBadges.euiBadge.failureRate', {
defaultMessage:
'{failureRate} of the sampled documents were not parsed due to an error',
values: { failureRate },
})}
>
{failureRate}
</EuiBadge>
)}
{successRate && (
<EuiBadge
color="hollow"
iconType="check"
title={i18n.translate('xpack.streams.processorMetricBadges.euiBadge.successRate', {
defaultMessage:
'{successRate} of the sampled documents were successfully parsed by this processor',
values: { successRate },
})}
>
{successRate}
</EuiBadge>
)}
{detectedFieldsCount > 0 && (
<EuiBadge
color="hollow"
title={i18n.translate('xpack.streams.processorMetricBadges.euiBadge.detectedFields', {
defaultMessage:
'{detectedFieldsCount, plural, one {# field was parsed on the sampled documents: } other {# fields were parsed on the sampled documents:\n}}{detectedFields}',
values: { detectedFieldsCount, detectedFields: detected_fields.join('\n') },
})}
>
{i18n.translate('xpack.streams.processorMetricBadges.fieldsBadgeLabel', {
defaultMessage: '{detectedFieldsCount, plural, one {# field } other {# fields}}',
values: { detectedFieldsCount },
})}
</EuiBadge>
)}
</EuiBadgeGroup>
);
};
const errorTitle = i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processorErrors.title',
{ defaultMessage: "Processor configuration invalid or doesn't match." }
);
export const ProcessorErrors = ({ metrics }: { metrics: ProcessorMetrics }) => {
const { errors, success_rate } = metrics;
const { euiTheme } = useEuiTheme();
const [isErrorListExpanded, toggleErrorListExpanded] = useToggle(false);
const visibleErrors = isErrorListExpanded ? errors : errors.slice(0, 2);
const remainingCount = errors.length - 2;
const shouldDisplayErrorToggle = remainingCount > 0;
const getCalloutProps = (type: ProcessorMetrics['errors'][number]['type']): EuiCallOutProps => {
const isWarningError = type === 'generic_processor_failure' && success_rate > 0;
return {
color: isWarningError ? 'warning' : 'danger',
};
};
return (
<>
<EuiFlexGroup
gutterSize="xs"
direction="column"
css={css`
margin-top: ${euiTheme.size.m};
`}
>
{visibleErrors.map((error, id) => (
<EuiCallOut
key={id}
{...getCalloutProps(error.type)}
iconType="warning"
size="s"
title={errorTitle}
>
{error.message}
</EuiCallOut>
))}
</EuiFlexGroup>
{shouldDisplayErrorToggle && !isErrorListExpanded && (
<EuiButtonEmpty
data-test-subj="streamsAppProcessorErrorsShowMoreButton"
onClick={toggleErrorListExpanded}
size="xs"
>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processorErrors.showMore',
{
defaultMessage: 'Show {remainingCount} similar errors...',
values: { remainingCount },
}
)}
</EuiButtonEmpty>
)}
{shouldDisplayErrorToggle && isErrorListExpanded && (
<EuiButtonEmpty
data-test-subj="streamsAppProcessorErrorsShowLessButton"
onClick={toggleErrorListExpanded}
size="xs"
>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processorErrors.showLess',
{ defaultMessage: 'Show less errors' }
)}
</EuiButtonEmpty>
)}
</>
);
};

View file

@ -23,7 +23,7 @@ export type ProcessorDefinitionWithUIAttributes = WithUIAttributes<ProcessorDefi
export interface DetectedField {
name: string;
type: FieldDefinitionType | 'unmapped';
type?: FieldDefinitionType;
}
interface BaseFormState {

View file

@ -7,7 +7,12 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { ProcessorDefinition, ProcessorType, getProcessorType } from '@kbn/streams-schema';
import {
ProcessorDefinition,
ProcessorDefinitionWithId,
ProcessorType,
getProcessorType,
} from '@kbn/streams-schema';
import { htmlIdGenerator } from '@elastic/eui';
import { isEmpty } from 'lodash';
import {
@ -134,7 +139,15 @@ const toAPIDefinition = (processor: ProcessorDefinitionWithUIAttributes): Proces
return processorConfig;
};
const toSimulateDefinition = (
processor: ProcessorDefinitionWithUIAttributes
): ProcessorDefinitionWithId => {
const { status, type, ...processorConfig } = processor;
return processorConfig;
};
export const processorConverter = {
toAPIDefinition,
toSimulateDefinition,
toUIDefinition,
};

View file

@ -84,7 +84,6 @@ export function PreviewPanel({
)}
</EuiText>
</>
;
</PreviewPanelIllustration>
);
}

View file

@ -59,6 +59,7 @@ export function StreamsAppSearchBar({
showQueryMenu={false}
showDatePicker={Boolean(dateRangeFrom && dateRangeTo)}
showSubmitButton={true}
submitButtonStyle="iconOnly"
dateRangeFrom={dateRangeFrom}
dateRangeTo={dateRangeTo}
onRefresh={onRefresh}

View file

@ -34,16 +34,17 @@ const streamsAppRoutes = {
),
children: {
'/{key}': {
element: <Outlet />,
element: (
<RedirectTo path="/{key}/{tab}" params={{ path: { tab: 'overview' } }}>
<Outlet />
</RedirectTo>
),
params: t.type({
path: t.type({
key: t.string,
}),
}),
children: {
'/{key}': {
element: <RedirectTo path="/{key}/{tab}" params={{ path: { tab: 'overview' } }} />,
},
'/{key}/management': {
element: (
<RedirectTo

View file

@ -5,10 +5,12 @@
* 2.0.
*/
/* eslint-disable @typescript-eslint/naming-convention */
import expect from '@kbn/expect';
import { ClientRequestParamsOf } from '@kbn/server-route-repository-utils';
import { StreamsRouteRepository } from '@kbn/streams-plugin/server';
import { errors } from '@elastic/elasticsearch';
import { errors as esErrors } from '@elastic/elasticsearch';
import { disableStreams, enableStreams, forkStream, indexDocument } from './helpers/requests';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import {
@ -53,7 +55,17 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
'log.level': 'error',
};
const basicDissectProcessor = {
id: 'dissect-uuid',
dissect: {
field: 'message',
pattern: '%{parsed_timestamp} %{parsed_level} %{parsed_message}',
if: { always: {} },
},
};
const basicGrokProcessor = {
id: 'draft',
grok: {
field: 'message',
patterns: [
@ -94,109 +106,338 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
describe('Successful simulations', () => {
describe('with valid documents', () => {
it('should simulate additive processing', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [basicGrokProcessor],
documents: [createTestDocument()],
});
expect(response.body.success_rate).to.be(1);
expect(response.body.failure_rate).to.be(0);
const { isMatch, value } = response.body.documents[0];
expect(isMatch).to.be(true);
expect(value).to.have.property('parsed_timestamp', TEST_TIMESTAMP);
expect(value).to.have.property('parsed_level', 'error');
expect(value).to.have.property('parsed_message', 'test');
it('should simulate additive processing', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [basicGrokProcessor],
documents: [createTestDocument()],
});
it('should simulate with detected fields', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [basicGrokProcessor],
documents: [createTestDocument()],
detected_fields: [
{ name: 'parsed_timestamp', type: 'date' },
{ name: 'parsed_level', type: 'keyword' },
],
});
expect(response.body.success_rate).to.be(1);
expect(response.body.failure_rate).to.be(0);
const findField = (name: string) =>
response.body.detected_fields.find((f: { name: string }) => f.name === name);
expect(response.body.detected_fields).to.have.length(3); // Including parsed_message
expect(findField('parsed_timestamp')).to.have.property('type', 'date');
expect(findField('parsed_level')).to.have.property('type', 'keyword');
});
const { detected_fields, errors, status, value } = response.body.documents[0];
expect(status).to.be('parsed');
expect(errors).to.eql([]);
expect(detected_fields).to.eql([
{ processor_id: 'draft', name: 'parsed_level' },
{ processor_id: 'draft', name: 'parsed_message' },
{ processor_id: 'draft', name: 'parsed_timestamp' },
]);
expect(value).to.have.property('parsed_level', 'error');
expect(value).to.have.property('parsed_message', 'test');
expect(value).to.have.property('parsed_timestamp', TEST_TIMESTAMP);
});
describe('with mixed success/failure documents', () => {
it('should provide accurate success/failure rates', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [basicGrokProcessor],
documents: [
createTestDocument(),
createTestDocument('invalid format'),
createTestDocument(`${TEST_TIMESTAMP} info test`),
],
});
expect(response.body.success_rate).to.be(0.67);
expect(response.body.failure_rate).to.be(0.33);
expect(response.body.documents).to.have.length(3);
expect(response.body.documents[0].isMatch).to.be(true);
expect(response.body.documents[1].isMatch).to.be(false);
expect(response.body.documents[2].isMatch).to.be(true);
it('should simulate with detected fields', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [basicGrokProcessor],
documents: [createTestDocument()],
detected_fields: [
{ name: 'parsed_timestamp', type: 'date' },
{ name: 'parsed_level', type: 'keyword' },
],
});
const findField = (name: string) =>
response.body.detected_fields.find((f: { name: string }) => f.name === name);
expect(response.body.detected_fields).to.have.length(3); // Including parsed_message
expect(findField('parsed_timestamp')).to.have.property('type', 'date');
expect(findField('parsed_level')).to.have.property('type', 'keyword');
});
it('should simulate multiple sequential processors', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [
basicDissectProcessor,
{
id: 'draft',
grok: {
field: 'parsed_message',
patterns: ['%{IP:parsed_ip}'],
if: { always: {} },
},
},
],
documents: [createTestDocument(`${TEST_MESSAGE} 127.0.0.1`)],
});
expect(response.body.success_rate).to.be(1);
expect(response.body.failure_rate).to.be(0);
const { detected_fields, status, value } = response.body.documents[0];
expect(status).to.be('parsed');
expect(detected_fields).to.eql([
{ processor_id: 'dissect-uuid', name: 'parsed_level' },
{ processor_id: 'dissect-uuid', name: 'parsed_message' },
{ processor_id: 'dissect-uuid', name: 'parsed_timestamp' },
{ processor_id: 'draft', name: 'parsed_ip' },
]);
expect(value).to.have.property('parsed_level', 'error');
expect(value).to.have.property('parsed_message', 'test 127.0.0.1');
expect(value).to.have.property('parsed_timestamp', TEST_TIMESTAMP);
expect(value).to.have.property('parsed_ip', '127.0.0.1');
});
it('should simulate partially parsed documents', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [
basicDissectProcessor, // This processor will correctly extract fields
{
id: 'draft',
grok: {
field: 'parsed_message',
patterns: ['%{TIMESTAMP_ISO8601:other_date}'], // This processor will fail, as won't match another date from the remaining message
if: { always: {} },
},
},
],
documents: [createTestDocument(`${TEST_MESSAGE} 127.0.0.1`)],
});
expect(response.body.success_rate).to.be(0);
expect(response.body.failure_rate).to.be(1);
const { detected_fields, status, value } = response.body.documents[0];
expect(status).to.be('partially_parsed');
expect(detected_fields).to.eql([
{ processor_id: 'dissect-uuid', name: 'parsed_level' },
{ processor_id: 'dissect-uuid', name: 'parsed_message' },
{ processor_id: 'dissect-uuid', name: 'parsed_timestamp' },
]);
expect(value).to.have.property('parsed_level', 'error');
expect(value).to.have.property('parsed_message', 'test 127.0.0.1');
expect(value).to.have.property('parsed_timestamp', TEST_TIMESTAMP);
});
it('should return processor metrics', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [
basicDissectProcessor, // This processor will correctly extract fields
{
id: 'draft',
grok: {
field: 'parsed_message',
patterns: ['%{TIMESTAMP_ISO8601:other_date}'], // This processor will fail, as won't match another date from the remaining message
if: { always: {} },
},
},
],
documents: [createTestDocument(`${TEST_MESSAGE} 127.0.0.1`)],
});
const processorsMetrics = response.body.processors_metrics;
const dissectMetrics = processorsMetrics['dissect-uuid'];
const grokMetrics = processorsMetrics.draft;
expect(dissectMetrics.detected_fields).to.eql([
'parsed_level',
'parsed_message',
'parsed_timestamp',
]);
expect(dissectMetrics.errors).to.eql([]);
expect(dissectMetrics.failure_rate).to.be(0);
expect(dissectMetrics.success_rate).to.be(1);
expect(grokMetrics.detected_fields).to.eql([]);
expect(grokMetrics.errors).to.eql([
{
processor_id: 'draft',
type: 'generic_processor_failure',
message: 'Provided Grok expressions do not match field value: [test 127.0.0.1]',
},
]);
expect(grokMetrics.failure_rate).to.be(1);
expect(grokMetrics.success_rate).to.be(0);
});
it('should return accurate success/failure rates', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [
basicDissectProcessor,
{
id: 'draft',
grok: {
field: 'parsed_message',
patterns: ['%{IP:parsed_ip}'],
if: { always: {} },
},
},
],
documents: [
createTestDocument(`${TEST_MESSAGE} 127.0.0.1`),
createTestDocument(),
createTestDocument(`${TEST_TIMESTAMP} info test`),
createTestDocument('invalid format'),
],
});
expect(response.body.success_rate).to.be(0.25);
expect(response.body.failure_rate).to.be(0.75);
expect(response.body.documents).to.have.length(4);
expect(response.body.documents[0].status).to.be('parsed');
expect(response.body.documents[1].status).to.be('partially_parsed');
expect(response.body.documents[2].status).to.be('partially_parsed');
expect(response.body.documents[3].status).to.be('failed');
const processorsMetrics = response.body.processors_metrics;
const dissectMetrics = processorsMetrics['dissect-uuid'];
const grokMetrics = processorsMetrics.draft;
expect(dissectMetrics.failure_rate).to.be(0.25);
expect(dissectMetrics.success_rate).to.be(0.75);
expect(grokMetrics.failure_rate).to.be(0.75);
expect(grokMetrics.success_rate).to.be(0.25);
});
it('should allow overriding fields detected by previous simulation processors (skip non-additive check)', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [
basicDissectProcessor,
{
id: 'draft',
grok: {
field: 'parsed_message',
patterns: ['%{WORD:ignored_field} %{IP:parsed_ip} %{GREEDYDATA:parsed_message}'], // Try overriding parsed_message previously computed by dissect
if: { always: {} },
},
},
],
documents: [createTestDocument(`${TEST_MESSAGE} 127.0.0.1 greedy data message`)],
});
expect(response.body.success_rate).to.be(1);
expect(response.body.failure_rate).to.be(0);
const { detected_fields, status, value } = response.body.documents[0];
expect(status).to.be('parsed');
expect(detected_fields).to.eql([
{ processor_id: 'dissect-uuid', name: 'parsed_level' },
{ processor_id: 'dissect-uuid', name: 'parsed_message' },
{ processor_id: 'dissect-uuid', name: 'parsed_timestamp' },
{ processor_id: 'draft', name: 'ignored_field' },
{ processor_id: 'draft', name: 'parsed_ip' },
{ processor_id: 'draft', name: 'parsed_message' },
]);
expect(value).to.have.property('parsed_message', 'greedy data message');
});
it('should gracefully return the errors for each partially parsed or failed document', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [
basicDissectProcessor, // This processor will correctly extract fields
{
id: 'draft',
grok: {
field: 'parsed_message',
patterns: ['%{TIMESTAMP_ISO8601:other_date}'], // This processor will fail, as won't match another date from the remaining message
if: { always: {} },
},
},
],
documents: [createTestDocument(`${TEST_MESSAGE} 127.0.0.1`)],
});
const { errors, status } = response.body.documents[0];
expect(status).to.be('partially_parsed');
expect(errors).to.eql([
{
processor_id: 'draft',
type: 'generic_processor_failure',
message: 'Provided Grok expressions do not match field value: [test 127.0.0.1]',
},
]);
});
it('should gracefully return failed simulation errors', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [
{
id: 'draft',
grok: {
field: 'message',
patterns: ['%{INVALID_PATTERN:field}'],
if: { always: {} },
},
},
],
documents: [createTestDocument('test message')],
});
const processorsMetrics = response.body.processors_metrics;
const grokMetrics = processorsMetrics.draft;
expect(grokMetrics.errors).to.eql([
{
processor_id: 'draft',
type: 'generic_simulation_failure',
message:
"[patterns] Invalid regex pattern found in: [%{INVALID_PATTERN:field}]. Unable to find pattern [INVALID_PATTERN] in Grok's pattern dictionary",
},
]);
});
it('should gracefully return non-additive simulation errors', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [
{
id: 'draft',
grok: {
field: 'message',
patterns: [
// This overwrite the exising log.level and message values
'%{TIMESTAMP_ISO8601:parsed_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message}',
],
if: { always: {} },
},
},
],
documents: [{ ...createTestDocument(), 'log.level': 'info' }],
});
const processorsMetrics = response.body.processors_metrics;
const grokMetrics = processorsMetrics.draft;
expect(grokMetrics.errors).to.eql([
{
processor_id: 'draft',
type: 'non_additive_processor_failure',
message:
'The processor is not additive to the documents. It might update fields [log.level,message]',
},
]);
});
it('should return the is_non_additive_simulation simulation flag', async () => {
const [additiveParsingResponse, nonAdditiveParsingResponse] = await Promise.all([
simulateProcessingForStream(apiClient, 'logs.test', {
processing: [basicGrokProcessor],
documents: [createTestDocument()],
}),
simulateProcessingForStream(apiClient, 'logs.test', {
processing: [
{
id: 'draft',
grok: {
field: 'message',
patterns: [
// This overwrite the exising log.level and message values
'%{TIMESTAMP_ISO8601:parsed_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message}',
],
if: { always: {} },
},
},
],
documents: [{ ...createTestDocument(), 'log.level': 'info' }],
}),
]);
expect(additiveParsingResponse.body.is_non_additive_simulation).to.be(false);
expect(nonAdditiveParsingResponse.body.is_non_additive_simulation).to.be(true);
});
});
describe('Failed simulations', () => {
it('should fail with invalid processor configurations', async () => {
await simulateProcessingForStream(
apiClient,
'logs.test',
{
processing: [
{
grok: {
field: 'message',
patterns: ['%{INVALID_PATTERN:field}'],
if: { always: {} },
},
},
],
documents: [createTestDocument('test message')],
},
// this should be a 400, but ES reports this as a 500
500
);
});
it('should fail when attempting to update existing fields', async () => {
const response = await simulateProcessingForStream(
apiClient,
'logs.test',
{
processing: [
{
grok: {
field: 'message',
patterns: ['%{TIMESTAMP_ISO8601:parsed_timestamp} %{GREEDYDATA:message}'], // Overwrites existing message field
if: { always: {} },
},
},
],
documents: [createTestDocument(`${TEST_TIMESTAMP} original message`)],
},
400
);
expect((response.body as errors.ResponseError['body']).message).to.contain(
'The processor is not additive to the documents. It might update fields [message]'
);
});
it('should fail with incompatible detected field mappings', async () => {
const response = await simulateProcessingForStream(
apiClient,
@ -211,27 +452,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
400
);
expect((response.body as errors.ResponseError['body']).message).to.contain(
expect((response.body as esErrors.ResponseError['body']).message).to.contain(
'The detected field types might not be compatible with these documents.'
);
});
});
describe('Partial success simulations', () => {
it('should handle mixed success/failure documents', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [basicGrokProcessor],
documents: [
createTestDocument(), // Will succeed
createTestDocument('invalid format'), // Will fail
],
});
expect(response.body.success_rate).to.be(0.5);
expect(response.body.failure_rate).to.be(0.5);
expect(response.body.documents[0].isMatch).to.be(true);
expect(response.body.documents[1].isMatch).to.be(false);
});
});
});
}