[Automatic Import] Fixes the CSV header bug (#212513)

Fixes https://github.com/elastic/kibana/issues/211911

The CSV processing is now a three-stage process: 

1. Parse the samples with the temporary column names of the form
`column1`.
2. Test parsing with the actual pipeline that parses into
`package.dataStream.columnName`.
3. Convert the samples into JSON form `{"columnName": "value", ...}` for
further processing.

Now the pipeline works as expected:

```yaml
  - csv:
      tag: parse_csv
      field: message
      target_fields:
        - ai_202502211453.logs._timestamp
        - ai_202502211453.logs.message
      description: Parse CSV input
  - drop:
      ignore_failure: true
      if: >-
        ctx.ai_202502211453?.logs?._timestamp == '@timestamp' &&
        ctx.ai_202502211453?.logs?.message == 'message'
      tag: remove_csv_header
      description: Remove the CSV header line by comparing the values
```

There are unit tests tests for the CSV functionality that include a mock
CSV processing pipeline.
This commit is contained in:
Ilya Nikokoshev 2025-02-28 15:31:56 +01:00 committed by GitHub
parent 83f787ac24
commit ab46ddeef2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 314 additions and 45 deletions

View file

@ -30,22 +30,58 @@ export function toSafeColumnName(columnName: unknown): string | undefined {
const safeName = columnName.replace(/[^a-zA-Z0-9_]/g, '_');
return /^[0-9]/.test(safeName) ? `Column${safeName}` : safeName;
}
// Returns the column list from a header row. We skip values that are not strings.
/**
* Extracts column names from the provided header doc by truncating unnecessary columns
* and converting each name into a normalized format.
*
* @param tempColumnNames - The list of temporary column names (integer-based).
* @param headerObject - The processed first document (corresponding to the header row).
* @returns A filtered array of valid column names in a safe format or undefined where the value was neither string nor numbers.
*/
export function columnsFromHeader(
tempColumnNames: string[],
headerObject: { [key: string]: unknown }
): Array<string | undefined> {
return valuesFromHeader(tempColumnNames, headerObject).map(toSafeColumnName);
}
/**
* Extracts values from a header object based on column names, converting non-string/numeric values to undefined.
* The function processes the array up to the last non-undefined value in the header object.
*
* @param tempColumnNames - Array of column names to look up in the header object
* @param headerObject - Object containing header values indexed by column names
* @returns Array of string/number values or undefined for non-string/number values, truncated at the last non-undefined entry
*
* @example
* const columns = ['col1', 'col2', 'col3', 'col4'];
* const header = { col1: 'value1', col2: 123, col3: 'value3', 'col4': null };
* valuesFromHeader(columns, header); // ['value1', 123, 'value3', undefined]
*/
export function valuesFromHeader(
tempColumnNames: string[],
headerObject: { [key: string]: unknown }
): Array<string | number | undefined> {
const maxIndex = tempColumnNames.findLastIndex(
(columnName) => headerObject[columnName] !== undefined
);
return tempColumnNames
.slice(0, maxIndex + 1)
.map((columnName) => headerObject[columnName])
.map(toSafeColumnName);
.map((value) => (typeof value === 'string' || typeof value === 'number' ? value : undefined));
}
// Count the number of columns actually present in the rows.
/**
* Calculates the total number of columns in a CSV by going through the processed
* documents to find the last defined value across all rows.
*
* @param tempColumnNames - An array of column names used to reference CSV row properties.
* @param csvRows - An array of row objects representing CSV data, where each key
* corresponds to a column name from `tempColumnNames`.
* @returns The total number of columns, determined by the position of the last
* defined value across all rows.
*/
export function totalColumnCount(
tempColumnNames: string[],
csvRows: Array<{ [key: string]: unknown }>

View file

@ -0,0 +1,155 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IScopedClusterClient } from '@kbn/core/server';
import { handleCSV } from './csv';
import { ESProcessorItem } from '../../../common';
import { DocTemplate } from '../../util/pipeline';
interface SimpleCSVPipelineSimulationParams {
pipeline: { processors: ESProcessorItem[] };
docs: DocTemplate[];
}
/**
* Simulates processing a list of documents with a defined pipeline of processors,
* specifically handling 'csv' and 'drop' processors in the way they are used in our CSV processing.
*
* @param params - An object containing the pipeline of processors and the documents to be transformed.
* @returns An object containing the processed list of documents after all processors in the pipeline have been applied.
*/
export const simpleCSVPipelineSimulation = (
params: SimpleCSVPipelineSimulationParams
): { docs: Array<{ doc: DocTemplate }> } => {
const { pipeline, docs } = params;
for (const processor of pipeline.processors) {
if ('remove' in processor) {
// do nothing
} else if ('csv' in processor) {
// Not a real CSV parser, of course. It only handles the "json.*" field names.
const fields = processor.csv.target_fields as string[];
for (const doc of docs) {
const message = doc._source.message;
const values = message.split(',');
const unpacked: Record<string, unknown> = {};
for (let i = 0; i < fields.length; i++) {
const field = fields[i].startsWith('json.') ? fields[i].slice(5) : fields[i];
// The only error it handles is: CSV value starts with " and does not end with ".
if (values[i].startsWith('"') && !values[i].endsWith('"')) {
throw new Error('Mismatched quote');
}
unpacked[field] = values[i].startsWith('"') ? values[i].slice(1, -1) : values[i];
}
// eslint-disable-next-line dot-notation
doc._source['json'] = unpacked;
}
} else if ('drop' in processor) {
docs.shift();
} else {
throw new Error('Unknown processor');
}
}
return { docs: docs.map((doc) => ({ doc })) };
};
describe('handleCSV', () => {
const mockClient = {
asCurrentUser: {
ingest: {
simulate: simpleCSVPipelineSimulation,
},
},
} as unknown as IScopedClusterClient;
it('should successfully parse valid CSV logs without header', async () => {
const mockParams = {
state: {
packageName: 'testPackage',
dataStreamName: 'testDataStream',
logSamples: ['123,"string",456', '"123",Some Value,"456"'],
samplesFormat: {
columns: [],
header: false,
},
additionalProcessors: [],
},
client: mockClient,
};
const result = await handleCSV(mockParams);
expect(result.jsonSamples).toBeDefined();
expect(result.additionalProcessors).toHaveLength(1); // Must be CSV and drop processor
if (!result.additionalProcessors) {
fail('additionalProcessors is undefined, logic error after expectation');
}
const csvProcessor = result.additionalProcessors[0].csv;
expect(csvProcessor).toBeDefined();
expect(csvProcessor.target_fields).toEqual([
'testPackage.testDataStream.column1',
'testPackage.testDataStream.column2',
'testPackage.testDataStream.column3',
]);
expect(result.jsonSamples).toEqual([
'{"column1":"123","column2":"string","column3":"456"}',
'{"column1":"123","column2":"Some Value","column3":"456"}',
]);
expect(result.lastExecutedChain).toBe('handleCSV');
});
it('should successfully parse valid CSV logs with header', async () => {
const mockParams = {
state: {
packageName: 'testPackage',
dataStreamName: 'testDataStream',
logSamples: ['header1,header2,header3', 'value1,value2,value3'],
samplesFormat: {
columns: ['first column', 'second column'],
header: true,
},
additionalProcessors: [],
},
client: mockClient,
};
const result = await handleCSV(mockParams);
expect(result.jsonSamples).toBeDefined();
expect(result.additionalProcessors).toHaveLength(2); // Must be CSV and drop processor
if (!result.additionalProcessors) {
fail('additionalProcessors is undefined, logic error after expectation');
}
const csvProcessor = result.additionalProcessors[0].csv;
expect(csvProcessor).toBeDefined();
expect(csvProcessor.target_fields).toEqual([
'testPackage.testDataStream.first_column',
'testPackage.testDataStream.second_column',
'testPackage.testDataStream.header3',
]);
const dropProcessor = result.additionalProcessors[1].drop;
expect(dropProcessor).toBeDefined();
expect(dropProcessor.if).toContain('header1'); // column value, not column name!
expect(result.lastExecutedChain).toBe('handleCSV');
});
it('should throw UnparseableCSVFormatError when CSV parsing fails', async () => {
const mockParams = {
state: {
packageName: 'testPackage',
dataStreamName: 'testDataStream',
// Intentionally malformed according to our simple CSV parser
logSamples: ['header1,header2', '"values...'],
samplesFormat: {
columns: ['col1', 'col2'],
header: true,
},
additionalProcessors: [],
},
client: mockClient,
};
await expect(handleCSV(mockParams)).rejects.toThrow('unparseable-csv-data');
});
});

View file

@ -4,15 +4,17 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IScopedClusterClient } from '@kbn/core/server';
import type { LogFormatDetectionState } from '../../types';
import type { LogDetectionNodeParams } from '../log_type_detection/types';
import { createJSONInput } from '../../util';
import type { ESProcessorItem } from '../../../common';
import { createCSVProcessor, createDropProcessor } from '../../util/processors';
import { CSVParseError, UnparseableCSVFormatError } from '../../lib/errors/unparseable_csv_error';
import {
generateColumnNames,
upperBoundForColumnCount,
columnsFromHeader,
valuesFromHeader,
toSafeColumnName,
totalColumnCount,
yieldUniqueColumnNames,
@ -22,11 +24,71 @@ import {
// We will only create the processor for the first MAX_CSV_COLUMNS columns.
const MAX_CSV_COLUMNS = 100;
// Converts CSV samples into JSON samples.
interface HandleCSVState {
packageName: string;
dataStreamName: string;
logSamples: string[];
samplesFormat: {
columns?: string[];
header?: boolean;
};
additionalProcessors: ESProcessorItem[];
}
interface HandleCSVParams {
state: HandleCSVState;
client: IScopedClusterClient;
}
function createCSVPipeline(
prefix: string[],
columns: string[],
headerValues: Array<string | number | undefined>
): ESProcessorItem[] {
const prefixedColumns = prefixColumns(columns, prefix);
const dropProcessors: ESProcessorItem[] = [];
if (headerValues.length !== 0) {
const dropValues = columns.reduce((acc, column, index) => {
const headerValue = headerValues[index];
if (headerValue !== undefined) {
acc[column] = headerValue;
}
return acc;
}, {} as Record<string, string | number>);
const dropProcessor = createDropProcessor(
dropValues,
prefix,
'remove_csv_header',
'Remove the CSV header by comparing row values to the header row.'
);
dropProcessors.push(dropProcessor);
}
return [createCSVProcessor('message', prefixedColumns), ...dropProcessors];
}
/**
* Processes CSV log data by parsing, testing, and converting to JSON format.
*
* The process follows three stages:
* 1. Initial parsing with temporary column names (column1, column2, etc.)
* 2. Testing with actual pipeline using package.dataStream.columnName format
* 3. Converting to JSON format for further processing
*
* Final column names are determined by combining LLM suggestions, header row parsing,
* and temporary columns as fallback. Includes header row handling and CSV-to-JSON conversion.
*
* @param param0 - Object containing state (log samples, format info) and Elasticsearch client
* @returns Promise with JSON samples, processors, and chain label
* @throws UnparseableCSVFormatError if CSV parsing fails
*/
export async function handleCSV({
state,
client,
}: LogDetectionNodeParams): Promise<Partial<LogFormatDetectionState>> {
}: HandleCSVParams): Promise<Partial<LogFormatDetectionState>> {
const jsonKey = 'json';
const packageName = state.packageName;
const dataStreamName = state.dataStreamName;
@ -34,10 +96,10 @@ export async function handleCSV({
const temporaryColumns = generateColumnNames(
Math.min(upperBoundForColumnCount(samples), MAX_CSV_COLUMNS)
);
const temporaryProcessor = createCSVProcessor('message', temporaryColumns);
const temporaryPipeline = createCSVPipeline([jsonKey], temporaryColumns, []);
const { pipelineResults: tempResults, errors: tempErrors } = await createJSONInput(
[temporaryProcessor],
temporaryPipeline,
samples,
client
);
@ -46,51 +108,56 @@ export async function handleCSV({
throw new UnparseableCSVFormatError(tempErrors as CSVParseError[]);
}
const headerColumns = state.samplesFormat.header
? columnsFromHeader(temporaryColumns, tempResults[0])
: [];
const llmProvidedColumns = (state.samplesFormat.columns || []).map(toSafeColumnName);
const needColumns = totalColumnCount(temporaryColumns, tempResults);
const columns: string[] = Array.from(
yieldUniqueColumnNames(needColumns, [llmProvidedColumns, headerColumns], temporaryColumns)
);
// Some basic information we'll need later
const prefix = [packageName, dataStreamName];
const prefixedColumns = prefixColumns(columns, prefix);
const csvProcessor = createCSVProcessor('message', prefixedColumns);
const csvHandlingProcessors = [csvProcessor];
if (headerColumns.length > 0) {
const dropValues = columns.reduce((acc, column, index) => {
if (headerColumns[index] !== undefined) {
acc[column] = String(headerColumns[index]);
}
return acc;
}, {} as Record<string, string>);
const dropProcessor = createDropProcessor(
dropValues,
prefix,
'remove_csv_header',
'Remove the CSV header line by comparing the values'
);
csvHandlingProcessors.push(dropProcessor);
// What columns does the LLM suggest?
const llmProvidedColumns = (state.samplesFormat.columns || []).map(toSafeColumnName);
// What columns do we get by parsing the header row, if any exists?
const headerColumns: Array<string | undefined> = [];
const headerValues: Array<string | number | undefined> = [];
const csvRows = tempResults.map((result) => result[jsonKey] as { [key: string]: unknown });
if (state.samplesFormat.header) {
const headerRow = csvRows[0];
headerValues.push(...valuesFromHeader(temporaryColumns, headerRow));
headerColumns.push(...columnsFromHeader(temporaryColumns, headerRow));
}
const { pipelineResults: finalResults, errors: finalErrors } = await createJSONInput(
csvHandlingProcessors,
// Combine all that information into a single list of columns
const columns: string[] = Array.from(
yieldUniqueColumnNames(
totalColumnCount(temporaryColumns, csvRows),
[llmProvidedColumns, headerColumns],
temporaryColumns
)
);
// These processors extract CSV fields into a specific key.
const csvHandlingProcessors = createCSVPipeline(prefix, columns, headerValues);
// Test the processors on the samples provided
const { errors } = await createJSONInput(csvHandlingProcessors, samples, client);
if (errors.length > 0) {
throw new UnparseableCSVFormatError(errors as CSVParseError[]);
}
// These processors extract CSV fields into a specific key.
const csvToJSONProcessors = createCSVPipeline([jsonKey], columns, headerValues);
const { pipelineResults: jsonResults, errors: jsonErrors } = await createJSONInput(
csvToJSONProcessors,
samples,
client
);
if (finalErrors.length > 0) {
throw new UnparseableCSVFormatError(finalErrors as CSVParseError[]);
if (jsonErrors.length > 0) {
throw new UnparseableCSVFormatError(jsonErrors as CSVParseError[]);
}
// Converts JSON Object into a string and parses it as a array of JSON strings
const jsonSamples = finalResults
.map((log) => log[packageName])
.map((log) => (log as Record<string, unknown>)[dataStreamName])
.map((log) => JSON.stringify(log));
const jsonSamples = jsonResults.map((log) => log[jsonKey]).map((log) => JSON.stringify(log));
return {
jsonSamples,

View file

@ -126,7 +126,7 @@ export async function getLogFormatDetectionGraph({ model, client }: LogDetection
'handleUnstructuredGraph',
(await getUnstructuredGraph({ model, client })).withConfig({ runName: 'Unstructured' })
)
.addNode('handleCSV', (state: LogFormatDetectionState) => handleCSV({ state, model, client }))
.addNode('handleCSV', (state: LogFormatDetectionState) => handleCSV({ state, client }))
.addEdge(START, 'modelInput')
.addEdge('modelInput', 'handleLogFormatDetection')
.addEdge('handleKVGraph', 'modelOutput')

View file

@ -23,3 +23,13 @@ export type FieldPath = string[];
export function fieldPathToProcessorString(fieldPath: FieldPath): string {
return fieldPath.join('.');
}
/**
* Converts a string representing a processor's path into a field path array.
*
* @param processorString - The dotted string to convert
* @returns An array of path segments representing the field path
*/
export function processorStringToFieldPath(processorString: string): FieldPath {
return processorString.split('.');
}

View file

@ -8,11 +8,12 @@ import type { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { ESProcessorItem } from '../../common';
import { createPassthroughFailureProcessor, createRemoveProcessor } from './processors';
interface DocTemplate {
export interface DocTemplate {
_index: string;
_id: string;
_source: {
message: string;
[key: string]: unknown;
};
}