🌊 Streams: Manual ingest pipeline processor (#219522)

Adds a new processor to streams that serves as an escape hatch to
regular Elasticsearch ingest pipelines (entered as JSON).

<img width="1139" alt="Screenshot 2025-05-28 at 15 54 41"
src="https://github.com/user-attachments/assets/67f1f4c4-982e-45d1-ae96-080545c5a0e2"
/>

Some details:
* If `on_failure` or `ignore_failure` are set via the JSON input, the
"outer" definition or ignore_failure flag are ignored
* Expands to multiple processors in the ingest pipeline
* Does minimal validation (needs to be one of a list of known existing
Elasticsearch processors based on the Elasticsearch API types), but
doesn't enforce valid Elasticsearch processors otherwise

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Joe Reuter 2025-06-05 15:25:56 +02:00 committed by GitHub
parent de8803a2bc
commit 78fe04ff7a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 6201 additions and 63 deletions

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -811,6 +811,7 @@ export const getDocLinks = ({ kibanaBranch, buildFlavor }: GetDocLinkOptions): D
pipelines: `${ELASTIC_DOCS}manage-data/ingest/transform-enrich/ingest-pipelines`,
csvPipelines: `${ELASTIC_DOCS}reference/ecs/ecs-converting`,
pipelineFailure: `${ELASTIC_DOCS}manage-data/ingest/transform-enrich/ingest-pipelines#handling-pipeline-failures`,
conditionalProcessor: `${ELASTIC_DOCS}manage-data/ingest/transform-enrich/ingest-pipelines#conditionally-run-processor`,
processors: `${ELASTIC_DOCS}reference/enrich-processor`,
arrayOrJson: `${ELASTIC_DOCS}reference/enrich-processor#ingest-process-category-array-json-handling`,
dataEnrichment: `${ELASTIC_DOCS}reference/enrich-processor#ingest-process-category-data-enrichment`,

View file

@ -36,6 +36,8 @@ export {
type DissectProcessorDefinition,
type GrokProcessorConfig,
type GrokProcessorDefinition,
type ManualIngestPipelineProcessorConfig as ManualIngestPipelineProcessorConfig,
type ManualIngestPipelineProcessorDefinition as ManualIngestPipelineProcessorDefinition,
getProcessorConfig,
getProcessorType,
processorWithIdDefinitionSchema,
@ -56,6 +58,8 @@ export {
export { getAdvancedParameters } from './src/helpers/get_advanced_parameters';
export { getInheritedFieldsFromAncestors } from './src/helpers/get_inherited_fields_from_ancestors';
export * from './src/ingest_pipeline_processors';
export {
type SampleDocument,
type FlattenRecord,

View file

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types';
export type ElasticsearchProcessorType = keyof IngestProcessorContainer;
const ensureFullProcessorTypeList = <T extends readonly ElasticsearchProcessorType[]>(
types: ElasticsearchProcessorType extends T[number]
? T
: `Missing elements from union: "${Exclude<ElasticsearchProcessorType, T[number]>}"`
) => types as T;
export const elasticsearchProcessorTypes = ensureFullProcessorTypeList([
'append',
'attachment',
'bytes',
'circle',
'community_id',
'convert',
'csv',
'date',
'date_index_name',
'dissect',
'dot_expander',
'drop',
'enrich',
'fail',
'fingerprint',
'foreach',
'ip_location',
'geo_grid',
'geoip',
'grok',
'gsub',
'html_strip',
'inference',
'join',
'json',
'kv',
'lowercase',
'network_direction',
'pipeline',
'redact',
'registered_domain',
'remove',
'rename',
'reroute',
'script',
'set',
'set_security_user',
'sort',
'split',
'terminate',
'trim',
'uppercase',
'urldecode',
'uri_parts',
'user_agent',
] as const);

View file

@ -7,6 +7,10 @@
import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
import {
ElasticsearchProcessorType,
elasticsearchProcessorTypes,
} from '../../../ingest_pipeline_processors';
import { Condition, conditionSchema } from '../../../conditions';
import { createIsNarrowSchema } from '../../../shared/type_guards';
@ -25,6 +29,33 @@ const processorBaseSchema = z.object({
ignore_failure: z.optional(z.boolean()),
});
/* Manual ingest pipeline processor */
// Not 100% accurate, but close enough for our use case to provide minimal safety
// without having to check all details
export type ElasticsearchProcessor = Partial<Record<ElasticsearchProcessorType, unknown>>;
export interface ManualIngestPipelineProcessorConfig extends ProcessorBase {
processors: ElasticsearchProcessor[];
ignore_failure?: boolean;
tag?: string;
on_failure?: Array<Record<string, unknown>>;
}
export interface ManualIngestPipelineProcessorDefinition {
manual_ingest_pipeline: ManualIngestPipelineProcessorConfig;
}
export const manualIngestPipelineProcessorDefinitionSchema = z.strictObject({
manual_ingest_pipeline: z.intersection(
processorBaseSchema,
z.object({
processors: z.array(z.record(z.enum(elasticsearchProcessorTypes), z.unknown())),
tag: z.optional(z.string()),
on_failure: z.optional(z.array(z.record(z.unknown()))),
})
),
}) satisfies z.Schema<ManualIngestPipelineProcessorDefinition>;
/**
* Grok processor
*/
@ -294,6 +325,7 @@ export type ProcessorDefinition =
| GeoIpProcessorDefinition
| RenameProcessorDefinition
| SetProcessorDefinition
| ManualIngestPipelineProcessorDefinition
| UrlDecodeProcessorDefinition
| UserAgentProcessorDefinition;
@ -313,6 +345,7 @@ export const processorDefinitionSchema: z.ZodType<ProcessorDefinition> = z.union
dateProcessorDefinitionSchema,
dissectProcessorDefinitionSchema,
grokProcessorDefinitionSchema,
manualIngestPipelineProcessorDefinitionSchema,
kvProcessorDefinitionSchema,
geoIpProcessorDefinitionSchema,
renameProcessorDefinitionSchema,
@ -325,6 +358,7 @@ export const processorWithIdDefinitionSchema: z.ZodType<ProcessorDefinitionWithI
dateProcessorDefinitionSchema.merge(z.object({ id: z.string() })),
dissectProcessorDefinitionSchema.merge(z.object({ id: z.string() })),
grokProcessorDefinitionSchema.merge(z.object({ id: z.string() })),
manualIngestPipelineProcessorDefinitionSchema.merge(z.object({ id: z.string() })),
kvProcessorDefinitionSchema.merge(z.object({ id: z.string() })),
geoIpProcessorDefinitionSchema.merge(z.object({ id: z.string() })),
renameProcessorDefinitionSchema.merge(z.object({ id: z.string() })),

View file

@ -5,21 +5,84 @@
* 2.0.
*/
import { ProcessorDefinition, getProcessorConfig, getProcessorType } from '@kbn/streams-schema';
import {
ManualIngestPipelineProcessorConfig,
ElasticsearchProcessorType,
ProcessorDefinition,
elasticsearchProcessorTypes,
getProcessorConfig,
getProcessorType,
} from '@kbn/streams-schema';
import { IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types';
import { conditionToPainless } from './condition_to_painless';
export function formatToIngestProcessors(
processing: ProcessorDefinition[]
processing: ProcessorDefinition[],
{
ignoreMalformedManualIngestPipeline,
}: {
ignoreMalformedManualIngestPipeline?: boolean;
} = {}
): IngestProcessorContainer[] {
return processing.map((processor) => {
return processing.flatMap((processor) => {
const config = getProcessorConfig(processor);
const type = getProcessorType(processor);
return {
[type]: {
...config,
...('if' in config && config.if ? { if: conditionToPainless(config.if) } : {}),
if (type === 'manual_ingest_pipeline') {
const manualIngestPipelineProcessorConfig = config as ManualIngestPipelineProcessorConfig;
// manual_ingest_pipeline processor is a special case, since it has nested Elasticsearch-level processors and doesn't support if
// directly - we need to add it to each nested processor
return manualIngestPipelineProcessorConfig.processors.flatMap((nestedProcessor) => {
const nestedType = Object.keys(nestedProcessor)[0];
if (!elasticsearchProcessorTypes.includes(nestedType as ElasticsearchProcessorType)) {
if (ignoreMalformedManualIngestPipeline) {
return [];
}
throw new Error(
`Invalid processor type "${nestedType}" in manual_ingest_pipeline processor. Supported types: ${elasticsearchProcessorTypes.join(
', '
)}`
);
}
const nestedConfig = nestedProcessor[nestedType as ElasticsearchProcessorType] as Record<
string,
unknown
>;
if (typeof nestedConfig !== 'object' || nestedConfig === null) {
if (ignoreMalformedManualIngestPipeline) {
return [];
}
throw new Error(
`Invalid processor config for "${nestedType}" in manual_ingest_pipeline processor. Expected an object.`
);
}
return {
[nestedType]: {
...nestedConfig,
tag: manualIngestPipelineProcessorConfig.tag ?? nestedConfig.tag,
ignore_failure:
nestedConfig.ignore_failure ?? manualIngestPipelineProcessorConfig.ignore_failure,
on_failure: nestedConfig.on_failure
? [
...(nestedConfig.on_failure as []),
...(manualIngestPipelineProcessorConfig.on_failure || []),
]
: manualIngestPipelineProcessorConfig.on_failure,
...(!nestedConfig.if && 'if' in config && config.if
? { if: conditionToPainless(config.if) }
: {}),
},
} as IngestProcessorContainer;
});
}
return [
{
[type]: {
...config,
...('if' in config && config.if ? { if: conditionToPainless(config.if) } : {}),
},
},
};
];
});
}

View file

@ -275,7 +275,9 @@ const prepareSimulationProcessors = (
},
];
const formattedProcessors = formatToIngestProcessors(processors);
const formattedProcessors = formatToIngestProcessors(processors, {
ignoreMalformedManualIngestPipeline: true,
});
return [...dotExpanderProcessors, ...formattedProcessors];
};

View file

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const serializeXJson = (v: unknown, defaultVal: string = '{}') => {
if (!v) {
return defaultVal;
}
if (typeof v === 'string') {
return formatXJsonString(v);
}
return JSON.stringify(v, null, 2);
};
export const deserializeJson = (input: string) => {
try {
return JSON.parse(input);
} catch (e) {
return input;
}
};
/**
* Format a XJson string input as parsed JSON. Replaces the invalid characters
* with a placeholder, parses the new string in a JSON format with the expected
* indentantion and then replaces the placeholders with the original values.
*/
const formatXJsonString = (input: string) => {
let placeholder = 'PLACEHOLDER';
const INVALID_STRING_REGEX = /"""(.*?)"""/gs;
while (input.includes(placeholder)) {
placeholder += '_';
}
const modifiedInput = input.replace(INVALID_STRING_REGEX, () => `"${placeholder}"`);
let jsonObject;
try {
jsonObject = JSON.parse(modifiedInput);
} catch (error) {
return input;
}
let formattedJsonString = JSON.stringify(jsonObject, null, 2);
const invalidStrings = input.match(INVALID_STRING_REGEX);
if (invalidStrings) {
invalidStrings.forEach((invalidString) => {
formattedJsonString = formattedJsonString.replace(`"${placeholder}"`, invalidString);
});
}
return formattedJsonString;
};

View file

@ -11,6 +11,7 @@ import { EuiFormRow } from '@elastic/eui';
import { CodeEditor } from '@kbn/code-editor';
import { i18n } from '@kbn/i18n';
import { ProcessorFormState } from '../../types';
import { deserializeJson, serializeXJson } from '../../helpers';
export const GrokPatternDefinition = () => {
const { field, fieldState } = useController<ProcessorFormState, 'pattern_definitions'>({
@ -34,8 +35,8 @@ export const GrokPatternDefinition = () => {
fullWidth
>
<CodeEditor
value={serialize(field.value)}
onChange={(value) => field.onChange(deserialize(value))}
value={serializeXJson(field.value)}
onChange={(value) => field.onChange(deserializeJson(value))}
languageId="xjson"
height={200}
aria-label={i18n.translate(
@ -46,50 +47,3 @@ export const GrokPatternDefinition = () => {
</EuiFormRow>
);
};
const serialize = (v: unknown) => {
if (!v) {
return '{}';
}
if (typeof v === 'string') {
return formatXJsonString(v);
}
return JSON.stringify(v, null, 2);
};
const deserialize = (input: string) => {
try {
return JSON.parse(input);
} catch (e) {
return input;
}
};
/**
* Format a XJson string input as parsed JSON. Replaces the invalid characters
* with a placeholder, parses the new string in a JSON format with the expected
* indentantion and then replaces the placeholders with the original values.
*/
const formatXJsonString = (input: string) => {
let placeholder = 'PLACEHOLDER';
const INVALID_STRING_REGEX = /"""(.*?)"""/gs;
while (input.includes(placeholder)) {
placeholder += '_';
}
const modifiedInput = input.replace(INVALID_STRING_REGEX, () => `"${placeholder}"`);
let jsonObject;
try {
jsonObject = JSON.parse(modifiedInput);
} catch (error) {
return input;
}
let formattedJsonString = JSON.stringify(jsonObject, null, 2);
const invalidStrings = input.match(INVALID_STRING_REGEX);
if (invalidStrings) {
invalidStrings.forEach((invalidString) => {
formattedJsonString = formattedJsonString.replace(`"${placeholder}"`, invalidString);
});
}
return formattedJsonString;
};

View file

@ -24,7 +24,7 @@ import { useSelector } from '@xstate5/react';
import { i18n } from '@kbn/i18n';
import { isEmpty } from 'lodash';
import React, { useEffect, useMemo, useCallback } from 'react';
import { useForm, SubmitHandler, FormProvider, useWatch } from 'react-hook-form';
import { useForm, SubmitHandler, FormProvider, useWatch, DeepPartial } from 'react-hook-form';
import { css } from '@emotion/react';
import { DiscardPromptOptions, useDiscardConfirm } from '../../../../hooks/use_discard_confirm';
import { DissectProcessorForm } from './dissect';
@ -53,6 +53,7 @@ import { DateProcessorForm } from './date';
import { ConfigDrivenProcessorFields } from './config_driven/components/fields';
import { ConfigDrivenProcessorType } from './config_driven/types';
import { selectPreviewDocuments } from '../state_management/simulation_state_machine/selectors';
import { ManualIngestPipelineProcessorForm } from './manual_ingest_pipeline';
export function AddProcessorPanel() {
const { euiTheme } = useEuiTheme();
@ -82,7 +83,8 @@ export function AddProcessorPanel() {
const initialDefaultValues = useMemo(() => defaultValuesGetter(), [defaultValuesGetter]);
const methods = useForm<ProcessorFormState>({
defaultValues: initialDefaultValues,
// cast necessary because DeepPartial does not work with `unknown`
defaultValues: initialDefaultValues as DeepPartial<ProcessorFormState>,
mode: 'onChange',
});
@ -198,6 +200,7 @@ export function AddProcessorPanel() {
{type === 'date' && <DateProcessorForm />}
{type === 'dissect' && <DissectProcessorForm />}
{type === 'grok' && <GrokProcessorForm />}
{type === 'manual_ingest_pipeline' && <ManualIngestPipelineProcessorForm />}
{!SPECIALISED_TYPES.includes(type) && (
<ConfigDrivenProcessorFields type={type as ConfigDrivenProcessorType} />
)}
@ -254,7 +257,7 @@ export function EditProcessorPanel({ processorRef, processorMetrics }: EditProce
);
const methods = useForm<ProcessorFormState>({
defaultValues,
defaultValues: defaultValues as DeepPartial<ProcessorFormState>,
mode: 'onChange',
});
@ -408,6 +411,7 @@ export function EditProcessorPanel({ processorRef, processorMetrics }: EditProce
{type === 'date' && <DateProcessorForm />}
{type === 'grok' && <GrokProcessorForm />}
{type === 'dissect' && <DissectProcessorForm />}
{type === 'manual_ingest_pipeline' && <ManualIngestPipelineProcessorForm />}
{!SPECIALISED_TYPES.includes(type) && (
<ConfigDrivenProcessorFields type={type as ConfigDrivenProcessorType} />
)}

View file

@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React from 'react';
import { EuiSpacer } from '@elastic/eui';
import { JsonEditor } from './json_editor';
import { FieldsAccordion } from '../optional_fields_accordion';
import { ProcessorConditionEditor } from '../processor_condition_editor';
import { IgnoreFailureToggle } from '../ignore_toggles';
export const ManualIngestPipelineProcessorForm = () => {
return (
<>
<JsonEditor />
<EuiSpacer size="m" />
<FieldsAccordion>
<ProcessorConditionEditor />
</FieldsAccordion>
<EuiSpacer size="m" />
<IgnoreFailureToggle />
</>
);
};

View file

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React from 'react';
import { useController } from 'react-hook-form';
import { EuiFormRow, EuiLink } from '@elastic/eui';
import { CodeEditor } from '@kbn/code-editor';
import { i18n } from '@kbn/i18n';
import { ElasticsearchProcessorType, elasticsearchProcessorTypes } from '@kbn/streams-schema';
import { FormattedMessage } from '@kbn/i18n-react';
import { useKibana } from '../../../../../hooks/use_kibana';
import { ProcessorFormState } from '../../types';
import { deserializeJson, serializeXJson } from '../../helpers';
export const JsonEditor = () => {
const {
core: { docLinks },
} = useKibana();
const { field, fieldState } = useController<ProcessorFormState, 'processors'>({
name: 'processors',
rules: {
validate: (value) => {
if (typeof value === 'string') {
return i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.ingestPipelineProcessorsInvalidJSON',
{
defaultMessage: 'Invalid JSON format',
}
);
}
if (!Array.isArray(value)) {
return i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.ingestPipelineProcessorsInvalidArray',
{
defaultMessage: 'Expected an array',
}
);
}
const invalidProcessor = value.find((processor) => {
const processorType = Object.keys(processor)[0];
return !elasticsearchProcessorTypes.includes(processorType as ElasticsearchProcessorType);
});
if (invalidProcessor) {
return i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.ingestPipelineProcessorsInvalidProcessorType',
{
defaultMessage: 'Invalid processor type: {processorType}',
values: {
processorType: Object.keys(invalidProcessor)[0],
},
}
);
}
},
},
});
return (
<EuiFormRow
label={i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.ingestPipelineProcessorsLabel',
{ defaultMessage: 'Ingest pipeline processors' }
)}
helpText={
<FormattedMessage
id="xpack.streams.streamDetailView.managementTab.enrichment.processor.ingestPipelineProcessorsHelpText"
defaultMessage={
'A JSON-encoded array of {ingestPipelineProcessors}. {conditions} defined in the processor JSON take precedence over conditions defined in "Optional fields".'
}
values={{
ingestPipelineProcessors: (
<EuiLink href={docLinks.links.ingest.processors} target="_blank" external>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.ingestPipelineProcessorsLabel',
{ defaultMessage: 'ingest pipeline processors' }
)}
</EuiLink>
),
conditions: (
<EuiLink href={docLinks.links.ingest.conditionalProcessor} target="_blank" external>
{i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.ingestPipelineProcessorsConditionallyLabel',
{ defaultMessage: 'Conditions' }
)}
</EuiLink>
),
}}
/>
}
error={fieldState.error?.message}
isInvalid={fieldState.invalid}
fullWidth
>
<CodeEditor
value={serializeXJson(field.value, '[]')}
onChange={(value) => field.onChange(deserializeJson(value))}
languageId="xjson"
height={200}
aria-label={i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processor.ingestPipelineProcessorsAriaLabel',
{ defaultMessage: 'Ingest pipeline processors editor' }
)}
/>
</EuiFormRow>
);
};

View file

@ -138,6 +138,16 @@ const availableProcessors: TAvailableProcessors = {
),
},
...configDrivenProcessors,
manual_ingest_pipeline: {
type: 'manual_ingest_pipeline',
inputDisplay: 'Manual pipeline configuration',
getDocUrl: () => (
<FormattedMessage
id="xpack.streams.streamDetailView.managementTab.enrichment.processor.manualIngestPipelineHelpText"
defaultMessage="Specify an array of ingest pipeline processors using JSON."
/>
),
},
};
const getProcessorDescription = (esDocUrl: string) => (type: ProcessorType) =>

View file

@ -36,7 +36,15 @@ export function composeSamplingCondition(
}
export function getSourceFields(processors: ProcessorDefinitionWithUIAttributes[]): string[] {
return processors.map((processor) => getProcessorConfig(processor).field.trim()).filter(Boolean);
return processors
.map((processor) => {
const config = getProcessorConfig(processor);
if ('field' in config) {
return config.field.trim();
}
return '';
})
.filter(Boolean);
}
export function getTableColumns(

View file

@ -13,6 +13,7 @@ import {
ProcessorTypeOf,
} from '@kbn/streams-schema';
import { ManualIngestPipelineProcessorConfig } from '@kbn/streams-schema';
import { DraftGrokExpression } from '@kbn/grok-ui';
import { ConfigDrivenProcessorFormState } from './processors/config_driven/types';
@ -32,7 +33,15 @@ export type DissectFormState = DissectProcessorConfig & { type: 'dissect' };
export type DateFormState = DateProcessorConfig & { type: 'date' };
export type SpecialisedFormState = GrokFormState | DissectFormState | DateFormState;
export type ManualIngestPipelineFormState = ManualIngestPipelineProcessorConfig & {
type: 'manual_ingest_pipeline';
};
export type SpecialisedFormState =
| GrokFormState
| DissectFormState
| DateFormState
| ManualIngestPipelineFormState;
export type ProcessorFormState = SpecialisedFormState | ConfigDrivenProcessorFormState;

View file

@ -24,6 +24,7 @@ import {
ProcessorFormState,
WithUIAttributes,
DateFormState,
ManualIngestPipelineFormState,
} from './types';
import { ALWAYS_CONDITION } from '../../../util/condition';
import { configDrivenProcessors } from './processors/config_driven';
@ -118,6 +119,13 @@ const defaultGrokProcessorFormState: (
if: ALWAYS_CONDITION,
});
const defaultManualIngestPipelineProcessorFormState = (): ManualIngestPipelineFormState => ({
type: 'manual_ingest_pipeline',
processors: [],
ignore_failure: true,
if: ALWAYS_CONDITION,
});
const configDrivenDefaultFormStates = mapValues(
configDrivenProcessors,
(config) => () => config.defaultFormState
@ -132,6 +140,7 @@ const defaultProcessorFormStateByType: Record<
date: defaultDateProcessorFormState,
dissect: defaultDissectProcessorFormState,
grok: defaultGrokProcessorFormState,
manual_ingest_pipeline: defaultManualIngestPipelineProcessorFormState,
...configDrivenDefaultFormStates,
};
@ -173,6 +182,15 @@ export const getFormStateFrom = (
});
}
if (isManualIngestPipelineJsonProcessor(processor)) {
const { manual_ingest_pipeline } = processor;
return structuredClone({
...manual_ingest_pipeline,
type: 'manual_ingest_pipeline',
});
}
if (isDateProcessor(processor)) {
const { date } = processor;
@ -236,6 +254,20 @@ export const convertFormStateToProcessor = (
};
}
if (formState.type === 'manual_ingest_pipeline') {
const { processors, ignore_failure } = formState;
return {
processorDefinition: {
manual_ingest_pipeline: {
if: formState.if,
processors,
ignore_failure,
},
},
};
}
if (formState.type === 'date') {
const { field, formats, locale, ignore_failure, target_field, timezone, output_format } =
formState;
@ -278,6 +310,8 @@ const createProcessorGuardByType =
export const isDateProcessor = createProcessorGuardByType('date');
export const isDissectProcessor = createProcessorGuardByType('dissect');
export const isManualIngestPipelineJsonProcessor =
createProcessorGuardByType('manual_ingest_pipeline');
export const isGrokProcessor = createProcessorGuardByType('grok');
const createId = htmlIdGenerator();

View file

@ -177,5 +177,124 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
expect((response.hits.total as SearchTotalHits).value).to.eql(0);
});
describe('Elasticsearch ingest pipeline enrichment', () => {
before(async () => {
const body: Streams.WiredStream.UpsertRequest = {
dashboards: [],
queries: [],
stream: {
description: '',
ingest: {
lifecycle: { inherit: {} },
processing: [
{
manual_ingest_pipeline: {
processors: [
{
// apply custom processor
uppercase: {
field: 'attributes.abc',
},
},
{
// apply condition
lowercase: {
field: 'attributes.def',
if: "ctx.attributes.def == 'yes'",
},
},
{
fail: {
message: 'Failing',
on_failure: [
// execute on failure pipeline
{
set: {
field: 'attributes.fail_failed',
value: 'yes',
},
},
],
},
},
],
if: { always: {} },
},
},
],
wired: {
routing: [],
fields: {},
},
},
},
};
const response = await putStream(apiClient, 'logs.nginx', body);
expect(response).to.have.property('acknowledged', true);
});
it('Transforms doc on index', async () => {
const doc = {
'@timestamp': '2024-01-01T00:00:11.000Z',
abc: 'should become uppercase',
def: 'SHOULD NOT BECOME LOWERCASE',
['host.name']: 'routeme',
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
const result = await fetchDocument(esClient, 'logs.nginx', response._id);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:11.000Z',
attributes: {
abc: 'SHOULD BECOME UPPERCASE',
def: 'SHOULD NOT BECOME LOWERCASE',
fail_failed: 'yes',
},
resource: {
attributes: {
'host.name': 'routeme',
},
},
stream: {
name: 'logs.nginx',
},
});
});
it('fails to store non-existing processor', async () => {
const body: Streams.WiredStream.UpsertRequest = {
dashboards: [],
queries: [],
stream: {
description: '',
ingest: {
lifecycle: { inherit: {} },
processing: [
{
manual_ingest_pipeline: {
processors: [
{
// apply custom processor
non_existing_processor: {
field: 'abc',
},
} as any,
],
if: { always: {} },
},
},
],
wired: {
routing: [],
fields: {},
},
},
},
};
await putStream(apiClient, 'logs.nginx', body, 400);
});
});
});
}

View file

@ -443,6 +443,44 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
]);
});
it('should correctly associate nested processors within Elasticsearch ingest pipeline', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [
{
id: 'draft',
manual_ingest_pipeline: {
processors: [
{
set: {
field: 'attributes.test',
value: 'test',
},
},
{
fail: {
message: 'Failing',
},
},
],
if: { always: {} },
},
},
],
documents: [createTestDocument('test message')],
});
const processorsMetrics = response.body.processors_metrics;
const processorMetrics = processorsMetrics.draft;
expect(processorMetrics.errors).to.eql([
{
processor_id: 'draft',
type: 'generic_processor_failure',
message: 'Failing',
},
]);
});
it('should gracefully return non-additive simulation errors', async () => {
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
processing: [