[8.x] [Auto Import] CSV format support (#194386) (#196090)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Auto Import] CSV format support
(#194386)](https://github.com/elastic/kibana/pull/194386)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Ilya
Nikokoshev","email":"ilya.nikokoshev@elastic.co"},"sourceCommit":{"committedDate":"2024-10-14T10:24:58Z","message":"[Auto
Import] CSV format support (#194386)\n\n## Release
Notes\r\n\r\nAutomatic Import can now create integrations for logs in
the CSV format.\r\nOwing to the maturity of log format support, we thus
remove the verbiage\r\nabout requiring the JSON/NDJSON format.\r\n\r\n##
Summary\r\n\r\n**Added: the CSV feature**\r\n\r\nThe issue is
https://github.com/elastic/kibana/issues/194342 \r\n\r\nWhen the user
adds a log sample whose format is recognized as CSV by the\r\nLLM, we
now parse the samples and insert
the\r\n[csv](https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html)\r\nprocessor
into the generated pipeline.\r\n\r\nIf the header is present, we use it
for the field names and add
a\r\n[drop](https://www.elastic.co/guide/en/elasticsearch/reference/current/drop-processor.html)\r\nprocessor
that removes a header from the document stream by comparing\r\nthe
values to the header values.\r\n\r\nIf the header is missing, we ask the
LLM to generate a list of column\r\nnames, providing some context like
package and data stream title.\r\n\r\nShould the header or LLM
suggestion provide unsuitable for a specific\r\ncolumn, we use
`column1`, `column2` and so on as a fallback. To avoid\r\nduplicate
column names, we can add postfixes like `_2` as necessary.\r\n\r\nIf the
format appears to be CSV, but the `csv` processor returns fails,\r\nwe
bubble up an error using the recently
introduced\r\n`ErrorThatHandlesItsOwnResponse` class. We also provide
the first\r\nexample of passing the additional attributes of an error
(in this case,\r\nthe original CSV error) back to the client. The error
message is\r\ncomposed on the client side.\r\n\r\n**Removed: supported
formats message**\r\n \r\nThe message that asks the user to upload the
logs in `JSON/NDJSON\r\nformat` is removed in this PR:\r\n\r\n<img
width=\"741\"
alt=\"image\"\r\nsrc=\"https://github.com/user-attachments/assets/34d571c3-b12c-44a1-98e3-d7549160be12\">\r\n\r\n\r\n**Refactoring**\r\n
\r\nThe refactoring makes the \"→JSON\" conversion process more uniform
across\r\ndifferent chains and centralizes processor definitions
in\r\n`.../server/util/processors.ts`.\r\n\r\nLog format chain now
expects the LLM to follow the `SamplesFormat` when\r\nproviding the
information rather than an ad-hoc format.\r\n \r\nWhen testing, the
`fail` method is [not supported
in\r\n`jest`](https://stackoverflow.com/a/54244479/23968144), so it
is\r\nremoved.\r\n\r\nSee the PR for examples and
follow-up.\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"6a72037007d8f71504f444911c9fa25adfb1bb89","branchLabelMapping":{"^v9.0.0$":"main","^v8.16.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["v9.0.0","release_note:feature","backport:prev-minor","Team:Security-Scalability","Feature:AutomaticImport"],"title":"[Auto
Import] CSV format
support","number":194386,"url":"https://github.com/elastic/kibana/pull/194386","mergeCommit":{"message":"[Auto
Import] CSV format support (#194386)\n\n## Release
Notes\r\n\r\nAutomatic Import can now create integrations for logs in
the CSV format.\r\nOwing to the maturity of log format support, we thus
remove the verbiage\r\nabout requiring the JSON/NDJSON format.\r\n\r\n##
Summary\r\n\r\n**Added: the CSV feature**\r\n\r\nThe issue is
https://github.com/elastic/kibana/issues/194342 \r\n\r\nWhen the user
adds a log sample whose format is recognized as CSV by the\r\nLLM, we
now parse the samples and insert
the\r\n[csv](https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html)\r\nprocessor
into the generated pipeline.\r\n\r\nIf the header is present, we use it
for the field names and add
a\r\n[drop](https://www.elastic.co/guide/en/elasticsearch/reference/current/drop-processor.html)\r\nprocessor
that removes a header from the document stream by comparing\r\nthe
values to the header values.\r\n\r\nIf the header is missing, we ask the
LLM to generate a list of column\r\nnames, providing some context like
package and data stream title.\r\n\r\nShould the header or LLM
suggestion provide unsuitable for a specific\r\ncolumn, we use
`column1`, `column2` and so on as a fallback. To avoid\r\nduplicate
column names, we can add postfixes like `_2` as necessary.\r\n\r\nIf the
format appears to be CSV, but the `csv` processor returns fails,\r\nwe
bubble up an error using the recently
introduced\r\n`ErrorThatHandlesItsOwnResponse` class. We also provide
the first\r\nexample of passing the additional attributes of an error
(in this case,\r\nthe original CSV error) back to the client. The error
message is\r\ncomposed on the client side.\r\n\r\n**Removed: supported
formats message**\r\n \r\nThe message that asks the user to upload the
logs in `JSON/NDJSON\r\nformat` is removed in this PR:\r\n\r\n<img
width=\"741\"
alt=\"image\"\r\nsrc=\"https://github.com/user-attachments/assets/34d571c3-b12c-44a1-98e3-d7549160be12\">\r\n\r\n\r\n**Refactoring**\r\n
\r\nThe refactoring makes the \"→JSON\" conversion process more uniform
across\r\ndifferent chains and centralizes processor definitions
in\r\n`.../server/util/processors.ts`.\r\n\r\nLog format chain now
expects the LLM to follow the `SamplesFormat` when\r\nproviding the
information rather than an ad-hoc format.\r\n \r\nWhen testing, the
`fail` method is [not supported
in\r\n`jest`](https://stackoverflow.com/a/54244479/23968144), so it
is\r\nremoved.\r\n\r\nSee the PR for examples and
follow-up.\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"6a72037007d8f71504f444911c9fa25adfb1bb89"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/194386","number":194386,"mergeCommit":{"message":"[Auto
Import] CSV format support (#194386)\n\n## Release
Notes\r\n\r\nAutomatic Import can now create integrations for logs in
the CSV format.\r\nOwing to the maturity of log format support, we thus
remove the verbiage\r\nabout requiring the JSON/NDJSON format.\r\n\r\n##
Summary\r\n\r\n**Added: the CSV feature**\r\n\r\nThe issue is
https://github.com/elastic/kibana/issues/194342 \r\n\r\nWhen the user
adds a log sample whose format is recognized as CSV by the\r\nLLM, we
now parse the samples and insert
the\r\n[csv](https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html)\r\nprocessor
into the generated pipeline.\r\n\r\nIf the header is present, we use it
for the field names and add
a\r\n[drop](https://www.elastic.co/guide/en/elasticsearch/reference/current/drop-processor.html)\r\nprocessor
that removes a header from the document stream by comparing\r\nthe
values to the header values.\r\n\r\nIf the header is missing, we ask the
LLM to generate a list of column\r\nnames, providing some context like
package and data stream title.\r\n\r\nShould the header or LLM
suggestion provide unsuitable for a specific\r\ncolumn, we use
`column1`, `column2` and so on as a fallback. To avoid\r\nduplicate
column names, we can add postfixes like `_2` as necessary.\r\n\r\nIf the
format appears to be CSV, but the `csv` processor returns fails,\r\nwe
bubble up an error using the recently
introduced\r\n`ErrorThatHandlesItsOwnResponse` class. We also provide
the first\r\nexample of passing the additional attributes of an error
(in this case,\r\nthe original CSV error) back to the client. The error
message is\r\ncomposed on the client side.\r\n\r\n**Removed: supported
formats message**\r\n \r\nThe message that asks the user to upload the
logs in `JSON/NDJSON\r\nformat` is removed in this PR:\r\n\r\n<img
width=\"741\"
alt=\"image\"\r\nsrc=\"https://github.com/user-attachments/assets/34d571c3-b12c-44a1-98e3-d7549160be12\">\r\n\r\n\r\n**Refactoring**\r\n
\r\nThe refactoring makes the \"→JSON\" conversion process more uniform
across\r\ndifferent chains and centralizes processor definitions
in\r\n`.../server/util/processors.ts`.\r\n\r\nLog format chain now
expects the LLM to follow the `SamplesFormat` when\r\nproviding the
information rather than an ad-hoc format.\r\n \r\nWhen testing, the
`fail` method is [not supported
in\r\n`jest`](https://stackoverflow.com/a/54244479/23968144), so it
is\r\nremoved.\r\n\r\nSee the PR for examples and
follow-up.\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"6a72037007d8f71504f444911c9fa25adfb1bb89"}}]}]
BACKPORT-->

Co-authored-by: Ilya Nikokoshev <ilya.nikokoshev@elastic.co>
This commit is contained in:
Kibana Machine 2024-10-14 23:16:35 +11:00 committed by GitHub
parent 7a80e6f1a7
commit 6378ff3ac9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
47 changed files with 853 additions and 132 deletions

View file

@ -14,6 +14,8 @@ export const logFormatDetectionTestState = {
exAnswer: 'testanswer',
packageName: 'testPackage',
dataStreamName: 'testDatastream',
packageTitle: 'Test Title',
dataStreamTitle: 'Test Datastream Title',
finalized: false,
samplesFormat: { name: SamplesFormatName.Values.structured },
header: true,

View file

@ -19,6 +19,8 @@ import { z } from '@kbn/zod';
import {
PackageName,
DataStreamName,
PackageTitle,
DataStreamTitle,
LogSamples,
Connector,
LangSmithOptions,
@ -29,6 +31,8 @@ export type AnalyzeLogsRequestBody = z.infer<typeof AnalyzeLogsRequestBody>;
export const AnalyzeLogsRequestBody = z.object({
packageName: PackageName,
dataStreamName: DataStreamName,
packageTitle: PackageTitle,
dataStreamTitle: DataStreamTitle,
logSamples: LogSamples,
connectorId: Connector,
langSmithOptions: LangSmithOptions.optional(),

View file

@ -22,11 +22,17 @@ paths:
- connectorId
- packageName
- dataStreamName
- packageTitle
- dataStreamTitle
properties:
packageName:
$ref: "../model/common_attributes.schema.yaml#/components/schemas/PackageName"
dataStreamName:
$ref: "../model/common_attributes.schema.yaml#/components/schemas/DataStreamName"
packageTitle:
$ref: "../model/common_attributes.schema.yaml#/components/schemas/PackageTitle"
dataStreamTitle:
$ref: "../model/common_attributes.schema.yaml#/components/schemas/DataStreamTitle"
logSamples:
$ref: "../model/common_attributes.schema.yaml#/components/schemas/LogSamples"
connectorId:

View file

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { GenerationErrorCode } from '../constants';
// Errors raised by the generation process should provide information through this interface.
export interface GenerationErrorBody {
message: string;
attributes: GenerationErrorAttributes;
}
export function isGenerationErrorBody(obj: unknown | undefined): obj is GenerationErrorBody {
return (
typeof obj === 'object' &&
obj !== null &&
'message' in obj &&
typeof obj.message === 'string' &&
'attributes' in obj &&
obj.attributes !== undefined &&
isGenerationErrorAttributes(obj.attributes)
);
}
export interface GenerationErrorAttributes {
errorCode: GenerationErrorCode;
underlyingMessages: string[] | undefined;
}
export function isGenerationErrorAttributes(obj: unknown): obj is GenerationErrorAttributes {
return (
typeof obj === 'object' &&
obj !== null &&
'errorCode' in obj &&
typeof obj.errorCode === 'string' &&
(!('underlyingMessages' in obj) || Array.isArray(obj.underlyingMessages))
);
}

View file

@ -96,6 +96,8 @@ export const getRelatedRequestMock = (): RelatedRequestBody => ({
export const getAnalyzeLogsRequestBody = (): AnalyzeLogsRequestBody => ({
dataStreamName: 'test-data-stream-name',
packageName: 'test-package-name',
packageTitle: 'Test package title',
dataStreamTitle: 'Test data stream title',
connectorId: 'test-connector-id',
logSamples: rawSamples,
});

View file

@ -31,6 +31,18 @@ export const PackageName = z.string().min(1);
export type DataStreamName = z.infer<typeof DataStreamName>;
export const DataStreamName = z.string().min(1);
/**
* Package title for the integration to be built.
*/
export type PackageTitle = z.infer<typeof PackageTitle>;
export const PackageTitle = z.string().min(1);
/**
* DataStream title for the integration to be built.
*/
export type DataStreamTitle = z.infer<typeof DataStreamTitle>;
export const DataStreamTitle = z.string().min(1);
/**
* String form of the input logsamples.
*/
@ -86,6 +98,14 @@ export const SamplesFormat = z.object({
* For some formats, specifies whether the samples can be multiline.
*/
multiline: z.boolean().optional(),
/**
* For CSV format, specifies whether the samples have a header row. For other formats, specifies the presence of header in each row.
*/
header: z.boolean().optional(),
/**
* For CSV format, specifies the column names proposed by the LLM.
*/
columns: z.array(z.string()).optional(),
/**
* For a JSON format, describes how to get to the sample array from the root of the JSON.
*/

View file

@ -16,6 +16,16 @@ components:
minLength: 1
description: DataStream name for the integration to be built.
PackageTitle:
type: string
minLength: 1
description: Package title for the integration to be built.
DataStreamTitle:
type: string
minLength: 1
description: DataStream title for the integration to be built.
LogSamples:
type: array
items:
@ -66,6 +76,14 @@ components:
multiline:
type: boolean
description: For some formats, specifies whether the samples can be multiline.
header:
type: boolean
description: For CSV format, specifies whether the samples have a header row. For other formats, specifies the presence of header in each row.
columns:
type: array
description: For CSV format, specifies the column names proposed by the LLM.
items:
type: string
json_path:
type: array
description: For a JSON format, describes how to get to the sample array from the root of the JSON.

View file

@ -30,8 +30,9 @@ export const MINIMUM_LICENSE_TYPE: LicenseType = 'enterprise';
// ErrorCodes
export enum ErrorCode {
export enum GenerationErrorCode {
RECURSION_LIMIT = 'recursion-limit',
RECURSION_LIMIT_ANALYZE_LOGS = 'recursion-limit-analyze-logs',
UNSUPPORTED_LOG_SAMPLES_FORMAT = 'unsupported-log-samples-format',
UNPARSEABLE_CSV_DATA = 'unparseable-csv-data',
}

View file

@ -27,10 +27,9 @@ export type {
Integration,
Pipeline,
Docs,
SamplesFormat,
LangSmithOptions,
} from './api/model/common_attributes.gen';
export { SamplesFormatName } from './api/model/common_attributes.gen';
export { SamplesFormat, SamplesFormatName } from './api/model/common_attributes.gen';
export type { ESProcessorItem } from './api/model/processor_attributes.gen';
export type { CelInput } from './api/model/cel_input_attributes.gen';

View file

@ -105,6 +105,8 @@ describe('GenerationModal', () => {
it('should call runAnalyzeLogsGraph with correct parameters', () => {
expect(mockRunAnalyzeLogsGraph).toHaveBeenCalledWith({
...defaultRequest,
packageTitle: 'Mocked Integration title',
dataStreamTitle: 'Mocked Data Stream Title',
logSamples: integrationSettingsNonJSON.logSamples ?? [],
});
});

View file

@ -82,7 +82,7 @@ export const GenerationModal = React.memo<GenerationModalProps>(
{error ? (
<EuiFlexItem>
<EuiCallOut
title={i18n.GENERATION_ERROR(progressText[progress])}
title={i18n.GENERATION_ERROR_TITLE(progressText[progress])}
color="danger"
iconType="alert"
data-test-subj="generationErrorCallout"

View file

@ -318,9 +318,6 @@ export const SampleLogsInput = React.memo<SampleLogsInputProps>(({ integrationSe
<EuiText size="s" textAlign="center">
{i18n.LOGS_SAMPLE_DESCRIPTION}
</EuiText>
<EuiText size="xs" color="subdued" textAlign="center">
{i18n.LOGS_SAMPLE_DESCRIPTION_2}
</EuiText>
</>
}
onChange={onChangeLogsSample}

View file

@ -6,7 +6,8 @@
*/
import { i18n } from '@kbn/i18n';
import { ErrorCode } from '../../../../../../common/constants';
import { GenerationErrorCode } from '../../../../../../common/constants';
import type { GenerationErrorAttributes } from '../../../../../../common/api/generation_error';
export const INTEGRATION_NAME_TITLE = i18n.translate(
'xpack.integrationAssistant.step.dataStream.integrationNameTitle',
@ -109,12 +110,6 @@ export const LOGS_SAMPLE_DESCRIPTION = i18n.translate(
defaultMessage: 'Drag and drop a file or Browse files.',
}
);
export const LOGS_SAMPLE_DESCRIPTION_2 = i18n.translate(
'xpack.integrationAssistant.step.dataStream.logsSample.description2',
{
defaultMessage: 'JSON/NDJSON format',
}
);
export const LOGS_SAMPLE_TRUNCATED = (maxRows: number) =>
i18n.translate('xpack.integrationAssistant.step.dataStream.logsSample.truncatedWarning', {
values: { maxRows },
@ -188,7 +183,7 @@ export const PROGRESS_RELATED_GRAPH = i18n.translate(
defaultMessage: 'Generating related fields',
}
);
export const GENERATION_ERROR = (progressStep: string) =>
export const GENERATION_ERROR_TITLE = (progressStep: string) =>
i18n.translate('xpack.integrationAssistant.step.dataStream.generationError', {
values: { progressStep },
defaultMessage: 'An error occurred during: {progressStep}',
@ -198,24 +193,44 @@ export const RETRY = i18n.translate('xpack.integrationAssistant.step.dataStream.
defaultMessage: 'Retry',
});
export const ERROR_TRANSLATION: Record<ErrorCode, string> = {
[ErrorCode.RECURSION_LIMIT_ANALYZE_LOGS]: i18n.translate(
export const GENERATION_ERROR_TRANSLATION: Record<
GenerationErrorCode,
string | ((attributes: GenerationErrorAttributes) => string)
> = {
[GenerationErrorCode.RECURSION_LIMIT_ANALYZE_LOGS]: i18n.translate(
'xpack.integrationAssistant.errors.recursionLimitAnalyzeLogsErrorMessage',
{
defaultMessage:
'Please verify the format of log samples is correct and try again. Try with a fewer samples if error persists.',
}
),
[ErrorCode.RECURSION_LIMIT]: i18n.translate(
[GenerationErrorCode.RECURSION_LIMIT]: i18n.translate(
'xpack.integrationAssistant.errors.recursionLimitReached',
{
defaultMessage: 'Max attempts exceeded. Please try again.',
}
),
[ErrorCode.UNSUPPORTED_LOG_SAMPLES_FORMAT]: i18n.translate(
[GenerationErrorCode.UNSUPPORTED_LOG_SAMPLES_FORMAT]: i18n.translate(
'xpack.integrationAssistant.errors.unsupportedLogSamples',
{
defaultMessage: 'Unsupported log format in the samples.',
}
),
[GenerationErrorCode.UNPARSEABLE_CSV_DATA]: (attributes) => {
if (
attributes.underlyingMessages !== undefined &&
attributes.underlyingMessages?.length !== 0
) {
return i18n.translate('xpack.integrationAssistant.errors.uparseableCSV.withReason', {
values: {
reason: attributes.underlyingMessages[0],
},
defaultMessage: `Cannot parse the samples as the CSV data (reason: {reason}). Please check the provided samples.`,
});
} else {
return i18n.translate('xpack.integrationAssistant.errors.uparseableCSV.withoutReason', {
defaultMessage: `Cannot parse the samples as the CSV data. Please check the provided samples.`,
});
}
},
};

View file

@ -16,6 +16,7 @@ import {
type EcsMappingRequestBody,
type RelatedRequestBody,
} from '../../../../../../common';
import { isGenerationErrorBody } from '../../../../../../common/api/generation_error';
import {
runCategorizationGraph,
runEcsGraph,
@ -26,7 +27,6 @@ import { useKibana } from '../../../../../common/hooks/use_kibana';
import type { State } from '../../state';
import * as i18n from './translations';
import { useTelemetry } from '../../../telemetry';
import type { ErrorCode } from '../../../../../../common/constants';
import type { AIConnector, IntegrationSettings } from '../../types';
export type OnComplete = (result: State['result']) => void;
@ -46,6 +46,18 @@ interface RunGenerationProps {
setProgress: (progress: ProgressItem) => void;
}
// If the result is classified as a generation error, produce an error message
// as defined in the i18n file. Otherwise, return undefined.
function generationErrorMessage(body: unknown | undefined): string | undefined {
if (!isGenerationErrorBody(body)) {
return;
}
const errorCode = body.attributes.errorCode;
const translation = i18n.GENERATION_ERROR_TRANSLATION[errorCode];
return typeof translation === 'function' ? translation(body.attributes) : translation;
}
interface GenerationResults {
pipeline: Pipeline;
docs: Docs;
@ -96,12 +108,7 @@ export const useGeneration = ({
error: originalErrorMessage,
});
let errorMessage = originalErrorMessage;
const errorCode = e.body?.attributes?.errorCode as ErrorCode | undefined;
if (errorCode != null) {
errorMessage = i18n.ERROR_TRANSLATION[errorCode];
}
setError(errorMessage);
setError(generationErrorMessage(e.body) ?? originalErrorMessage);
} finally {
setIsRequesting(false);
}
@ -145,6 +152,9 @@ async function runGeneration({
const analyzeLogsRequest: AnalyzeLogsRequestBody = {
packageName: integrationSettings.name ?? '',
dataStreamName: integrationSettings.dataStreamName ?? '',
packageTitle: integrationSettings.title ?? integrationSettings.name ?? '',
dataStreamTitle:
integrationSettings.dataStreamTitle ?? integrationSettings.dataStreamName ?? '',
logSamples: integrationSettings.logSamples ?? [],
connectorId: connector.id,
langSmithOptions: getLangSmithOptions(),

View file

@ -0,0 +1,243 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
upperBoundForColumnCount,
generateColumnNames,
columnsFromHeader,
totalColumnCount,
toSafeColumnName,
yieldUniqueColumnNames,
} from './columns';
describe('upperBoundForColumnCount', () => {
it('should return the correct number of columns for a simple CSV', () => {
const samples = ['name,age,location', 'john,30,new york', 'jane,25,los angeles'];
expect(upperBoundForColumnCount(samples)).toBe(3);
});
it('should handle samples with varying column counts', () => {
const samples = ['name,age,location', 'john,30', 'jane,25,los angeles,usa'];
expect(upperBoundForColumnCount(samples)).toBe(4);
});
it('should return 0 for empty samples', () => {
const samples: string[] = [];
expect(upperBoundForColumnCount(samples)).toBe(0);
});
it('should handle samples with empty strings', () => {
const samples = ['', 'john,30,new york', 'jane,25,los angeles'];
expect(upperBoundForColumnCount(samples)).toBe(3);
});
it('should handle samples with only one column', () => {
const samples = ['name', 'john', 'jane'];
expect(upperBoundForColumnCount(samples)).toBe(1);
});
it('should handle samples with extra commas', () => {
const samples = ['name,age,location', 'john,30', 'jane,25,"los angeles,usa"'];
expect(upperBoundForColumnCount(samples)).toBeGreaterThanOrEqual(3);
});
});
describe('generateColumnNames', () => {
it('should generate the correct number of column names', () => {
const count = 5;
const expected = ['column1', 'column2', 'column3', 'column4', 'column5'];
expect(generateColumnNames(count)).toEqual(expected);
});
it('should return an empty array when count is 0', () => {
const count = 0;
const expected: string[] = [];
expect(generateColumnNames(count)).toEqual(expected);
});
it('should handle large counts correctly', () => {
const count = 100;
const result = generateColumnNames(count);
expect(result.length).toBe(count);
expect(result[0]).toBe('column1');
expect(result[count - 1]).toBe('column100');
});
});
describe('columnsFromHeader', () => {
it('should return the correct columns from the header object', () => {
const tempColumnNames = ['column1', 'column2', 'column3'];
const headerObject = { column1: 'name', column2: 'age', column3: 'location' };
expect(columnsFromHeader(tempColumnNames, headerObject)).toEqual(['name', 'age', 'location']);
});
it('should return an empty array if no columns match', () => {
const tempColumnNames = ['column1', 'column2', 'column3'];
const headerObject = { column4: 'name', column5: 'age', column6: 'location' };
expect(columnsFromHeader(tempColumnNames, headerObject)).toEqual([]);
});
it('should handle missing columns in the header object', () => {
const tempColumnNames = ['column1', 'column2', 'column3', 'column4'];
const headerObject = { column1: 'name', column3: 'location' };
expect(columnsFromHeader(tempColumnNames, headerObject)).toEqual([
'name',
undefined,
'location',
]);
});
it('should handle an empty header object', () => {
const tempColumnNames = ['column1', 'column2', 'column3'];
const headerObject = {};
expect(columnsFromHeader(tempColumnNames, headerObject)).toEqual([]);
});
it('should handle an empty tempColumnNames array', () => {
const tempColumnNames: string[] = [];
const headerObject = { column1: 'name', column2: 'age', column3: 'location' };
expect(columnsFromHeader(tempColumnNames, headerObject)).toEqual([]);
});
});
describe('totalColumnCount', () => {
it('should return the correct total column count for a simple CSV', () => {
const tempColumnNames = ['column1', 'column2', 'column3'];
const csvRows = [
{ column1: 'john', column2: '30', column3: 'new york' },
{ column1: 'jane', column3: '25', column4: 'los angeles' },
];
expect(totalColumnCount(tempColumnNames, csvRows)).toBe(3);
});
it('should handle rows with varying column counts', () => {
const tempColumnNames = ['column1', 'column2', 'column3', 'column4'];
const csvRows = [
{ column1: 'john', column2: '30' },
{ column1: 'jane', column3: 'los angeles', column4: 'usa' },
];
expect(totalColumnCount(tempColumnNames, csvRows)).toBe(4);
});
it('should return 0 for empty rows', () => {
const tempColumnNames = ['column1', 'column2', 'column3'];
expect(totalColumnCount(tempColumnNames, [])).toBe(0);
});
it('should handle rows with empty objects', () => {
const tempColumnNames = ['column1', 'column2', 'column3'];
const csvRows = [
{},
{ column1: 'john', column2: '30', column3: 'new york' },
{ column1: 'jane', column2: '25', column3: 'los angeles' },
];
expect(totalColumnCount(tempColumnNames, csvRows)).toBe(3);
});
it('should handle rows with only one column', () => {
const tempColumnNames = ['column1'];
const csvRows = [{ column1: 'john' }, { column1: 'jane' }];
expect(totalColumnCount(tempColumnNames, csvRows)).toBe(1);
});
it('should handle rows with extra columns', () => {
const tempColumnNames = ['column1', 'column2', 'column3'];
const csvRows = [
{ column1: 'john', column2: '30' },
{ column1: 'jane', column2: '25', column3: 'los angeles', column4: 'usa' },
];
expect(totalColumnCount(tempColumnNames, csvRows)).toBe(3);
});
describe('toSafeColumnName', () => {
it('should return undefined for non-string and non-number inputs', () => {
expect(toSafeColumnName(null)).toBeUndefined();
expect(toSafeColumnName(undefined)).toBeUndefined();
expect(toSafeColumnName({})).toBeUndefined();
expect(toSafeColumnName([1, 2])).toBeUndefined();
});
it('should replace non-alphanumeric characters with underscores', () => {
expect(toSafeColumnName('name@age!location')).toBe('name_age_location');
expect(toSafeColumnName('column#1')).toBe('column_1');
});
it('should return the same string if it contains only alphanumeric characters and underscores', () => {
expect(toSafeColumnName('Column1')).toBe('Column1');
expect(toSafeColumnName('Location')).toBe('Location');
});
it('should handle empty strings', () => {
expect(toSafeColumnName('')).toBeUndefined();
});
it('should handle strings starting from a digit or numbers', () => {
expect(toSafeColumnName('1ABC')).toBe('Column1ABC');
expect(toSafeColumnName(123)).toBe('Column123');
});
});
});
describe('yieldUniqueColumnNames', () => {
it('should yield unique column names based on preferred and fallback names', () => {
const count = 5;
const preferredNames = [
['name1', 'name2', undefined, 'name4', undefined],
[undefined, 'altName2', 'altName3', undefined, 'altName5'],
];
const fallbackNames = ['fallback1', 'fallback2', 'fallback3', 'fallback4', 'fallback5'];
const result = Array.from(yieldUniqueColumnNames(count, preferredNames, fallbackNames));
expect(result).toEqual(['name1', 'name2', 'altName3', 'name4', 'altName5']);
});
it('should use fallback names when preferred names are not provided', () => {
const count = 3;
const preferredNames = [['name1', undefined, 'name3']];
const fallbackNames = ['fallback1', 'fallback2', 'fallback3'];
const result = Array.from(yieldUniqueColumnNames(count, preferredNames, fallbackNames));
expect(result).toEqual(['name1', 'fallback2', 'name3']);
});
it('should append postfix to duplicate names to ensure uniqueness', () => {
const count = 4;
const preferredNames = [['name', 'name', 'name', 'name']];
const fallbackNames = ['fallback1', 'fallback2', 'fallback3', 'fallback4'];
const result = Array.from(yieldUniqueColumnNames(count, preferredNames, fallbackNames));
expect(result).toEqual(['name', 'name_2', 'name_3', 'name_4']);
});
it('should handle mixed preferred and fallback names with duplicates', () => {
const count = 6;
const preferredNames = [
['name', undefined, 'name', undefined, undefined, undefined],
[undefined, 'altName', undefined, 'altName', undefined, 'altName'],
];
const fallbackNames = [
'fallback1',
'fallback2',
'fallback3',
'fallback4',
'fallback5',
'fallback6',
];
const result = Array.from(yieldUniqueColumnNames(count, preferredNames, fallbackNames));
expect(result).toEqual(['name', 'altName', 'name_2', 'altName_2', 'fallback5', 'altName_3']);
});
it('should handle empty preferred names', () => {
const count = 3;
const preferredNames: Array<Array<string | undefined>> = [];
const fallbackNames: string[] = ['fallback1', 'fallback2', 'fallback3'];
const result = Array.from(yieldUniqueColumnNames(count, preferredNames, fallbackNames));
expect(result).toEqual(['fallback1', 'fallback2', 'fallback3']);
});
});

View file

@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
// Estimates from above the number of columns in the CSV samples.
export function upperBoundForColumnCount(csvSamples: string[]): number {
return Math.max(0, ...csvSamples.map((sample) => sample.split(',').length));
}
// Generates a list of temporary column names.
export function generateColumnNames(count: number): string[] {
return Array.from({ length: count }).map((_, i) => `column${i + 1}`);
}
// Converts a column name into a safe one to use in the `if ctx...` clause.
// Result must pass rules at https://www.elastic.co/guide/en/elasticsearch/painless/8.15/painless-identifiers.html
export function toSafeColumnName(columnName: unknown): string | undefined {
if (typeof columnName === 'number') {
return `Column${columnName}`;
}
if (typeof columnName !== 'string') {
return undefined;
}
if (columnName.length === 0) {
return undefined;
}
const safeName = columnName.replace(/[^a-zA-Z0-9_]/g, '_');
return /^[0-9]/.test(safeName) ? `Column${safeName}` : safeName;
}
// Returns the column list from a header row. We skip values that are not strings.
export function columnsFromHeader(
tempColumnNames: string[],
headerObject: { [key: string]: unknown }
): Array<string | undefined> {
const maxIndex = tempColumnNames.findLastIndex(
(columnName) => headerObject[columnName] !== undefined
);
return tempColumnNames
.slice(0, maxIndex + 1)
.map((columnName) => headerObject[columnName])
.map(toSafeColumnName);
}
// Count the number of columns actually present in the rows.
export function totalColumnCount(
tempColumnNames: string[],
csvRows: Array<{ [key: string]: unknown }>
): number {
return (
Math.max(
-1,
...csvRows.map((row) =>
tempColumnNames.findLastIndex((columnName) => row[columnName] !== undefined)
)
) + 1
);
}
// Prefixes each column with the provided prefixes, separated by a period.
export function prefixColumns(columns: string[], prefixes: string[]): string[] {
return columns.map((column) => [...prefixes, column].join('.'));
}
/**
* Generates a list of unique column names based on preferred and fallback names.
*
* The preferred names are used first, followed by the fallback names. It is required that
* there are enough fallback names to cover the number of unique column names needed.
*
* The resulting column names are guaranteed to be unique. If a column name is already in use,
* a postfix like _2, _3 and so on is added to the name to make it unique.
*
* @generator
* @param {number} count - The number of unique column names to generate.
* @param {Array<Array<string | undefined>>} preferredNames - A 2D array where each sub-array contains a list of names.
* @param {string[]} fallbackNames - An array of fallback names to use if no preferred name is defined.
* @yields {string} - A sequence of column names, such that no two are the same.
*/
export function* yieldUniqueColumnNames(
count: number,
preferredNames: Array<Array<string | undefined>>,
fallbackNames: string[]
): Generator<string, void> {
const knownNames = new Set<string>();
for (let i = 0; i < count; i++) {
let selectedName: string = fallbackNames[i];
for (const nameList of preferredNames) {
const name = nameList[i];
if (name) {
selectedName = name;
break;
}
}
let postfixString = '';
if (knownNames.has(selectedName)) {
for (let postfix = 2; ; postfix++) {
postfixString = `_${postfix}`;
if (!knownNames.has(selectedName + postfixString)) {
break;
}
}
}
selectedName += postfixString;
knownNames.add(selectedName);
yield selectedName;
}
}

View file

@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { LogFormatDetectionState } from '../../types';
import type { LogDetectionNodeParams } from '../log_type_detection/types';
import { createJSONInput } from '../../util';
import { createCSVProcessor, createDropProcessor } from '../../util/processors';
import { CSVParseError, UnparseableCSVFormatError } from '../../lib/errors/unparseable_csv_error';
import {
generateColumnNames,
upperBoundForColumnCount,
columnsFromHeader,
toSafeColumnName,
totalColumnCount,
yieldUniqueColumnNames,
prefixColumns,
} from './columns';
// We will only create the processor for the first MAX_CSV_COLUMNS columns.
const MAX_CSV_COLUMNS = 100;
// Converts CSV samples into JSON samples.
export async function handleCSV({
state,
client,
}: LogDetectionNodeParams): Promise<Partial<LogFormatDetectionState>> {
const packageName = state.packageName;
const dataStreamName = state.dataStreamName;
const samples = state.logSamples;
const temporaryColumns = generateColumnNames(
Math.min(upperBoundForColumnCount(samples), MAX_CSV_COLUMNS)
);
const temporaryProcessor = createCSVProcessor('message', temporaryColumns);
const { pipelineResults: tempResults, errors: tempErrors } = await createJSONInput(
[temporaryProcessor],
samples,
client
);
if (tempErrors.length > 0) {
throw new UnparseableCSVFormatError(tempErrors as CSVParseError[]);
}
const headerColumns = state.samplesFormat.header
? columnsFromHeader(temporaryColumns, tempResults[0])
: [];
const llmProvidedColumns = (state.samplesFormat.columns || []).map(toSafeColumnName);
const needColumns = totalColumnCount(temporaryColumns, tempResults);
const columns: string[] = Array.from(
yieldUniqueColumnNames(needColumns, [llmProvidedColumns, headerColumns], temporaryColumns)
);
const prefix = [packageName, dataStreamName];
const prefixedColumns = prefixColumns(columns, prefix);
const csvProcessor = createCSVProcessor('message', prefixedColumns);
const csvHandlingProcessors = [csvProcessor];
if (headerColumns.length > 0) {
const dropValues = columns.reduce((acc, column, index) => {
if (headerColumns[index] !== undefined) {
acc[column] = String(headerColumns[index]);
}
return acc;
}, {} as Record<string, string>);
const dropProcessor = createDropProcessor(
dropValues,
prefix,
'remove_csv_header',
'Remove the CSV header line by comparing the values'
);
csvHandlingProcessors.push(dropProcessor);
}
const { pipelineResults: finalResults, errors: finalErrors } = await createJSONInput(
csvHandlingProcessors,
samples,
client
);
if (finalErrors.length > 0) {
throw new UnparseableCSVFormatError(finalErrors as CSVParseError[]);
}
// Converts JSON Object into a string and parses it as a array of JSON strings
const jsonSamples = finalResults
.map((log) => log[packageName])
.map((log) => (log as Record<string, unknown>)[dataStreamName])
.map((log) => JSON.stringify(log));
return {
jsonSamples,
additionalProcessors: [...state.additionalProcessors, ...csvHandlingProcessors],
lastExecutedChain: 'handleCSV',
};
}

View file

@ -26,16 +26,6 @@ export const KV_HEADER_ERROR_EXAMPLE_ANSWER = {
'%{TIMESTAMP:cisco.audit.timestamp}:%{WORD:cisco.audit.value1};%{WORD:cisco.audit.key2}:%{WORD:cisco.audit.value2}:%{GREEDYDATA:message}',
};
export const onFailure = {
append: {
field: 'error.message',
value:
'{% raw %}Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}{% endraw %}',
},
};
export const removeProcessor = { remove: { field: 'message', ignore_missing: true } };
export const COMMON_ERRORS = [
{
error: 'field [message] does not contain value_split [=]',

View file

@ -29,11 +29,7 @@ describe('KVGraph', () => {
it('Ensures that the graph compiles', async () => {
// When getKVGraph runs, langgraph compiles the graph it will error if the graph has any issues.
// Common issues for example detecting a node has no next step, or there is a infinite loop between them.
try {
await getKVGraph({ model, client });
} catch (error) {
fail(`getKVGraph threw an error: ${error}`);
}
await getKVGraph({ model, client });
});
});
});

View file

@ -10,8 +10,11 @@ import { ESProcessorItem } from '../../../common';
import type { KVState } from '../../types';
import type { HandleKVNodeParams } from './types';
import { testPipeline } from '../../util';
import { onFailure, removeProcessor } from './constants';
import { createGrokProcessor } from '../../util/processors';
import {
createGrokProcessor,
createPassthroughFailureProcessor,
createRemoveProcessor,
} from '../../util/processors';
interface StructuredLogResult {
[packageName: string]: { [dataStreamName: string]: unknown };
@ -65,7 +68,7 @@ export async function handleHeaderValidate({
}: HandleKVNodeParams): Promise<Partial<KVState>> {
const grokPattern = state.grokPattern;
const grokProcessor = createGrokProcessor([grokPattern]);
const pipeline = { processors: grokProcessor, on_failure: [onFailure] };
const pipeline = { processors: grokProcessor, on_failure: [createPassthroughFailureProcessor()] };
const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as {
pipelineResults: GrokResult[];
@ -94,7 +97,10 @@ async function verifyKVProcessor(
client: IScopedClusterClient
): Promise<{ errors: object[] }> {
// This processor removes the original message field in the output
const pipeline = { processors: [kvProcessor[0], removeProcessor], on_failure: [onFailure] };
const pipeline = {
processors: [kvProcessor[0], createRemoveProcessor()],
on_failure: [createPassthroughFailureProcessor()],
};
const { errors } = await testPipeline(formattedSamples, pipeline, client);
return { errors };
}
@ -104,7 +110,10 @@ async function buildJSONSamples(
processors: object[],
client: IScopedClusterClient
): Promise<StructuredLogResult[]> {
const pipeline = { processors: [...processors, removeProcessor], on_failure: [onFailure] };
const pipeline = {
processors: [...processors, createRemoveProcessor()],
on_failure: [createPassthroughFailureProcessor()],
};
const { pipelineResults } = (await testPipeline(samples, pipeline, client)) as {
pipelineResults: StructuredLogResult[];
};

View file

@ -5,7 +5,10 @@
* 2.0.
*/
export const EX_ANSWER_LOG_TYPE = {
log_type: 'structured',
header: true,
import { SamplesFormat } from '../../../common';
export const EX_ANSWER_LOG_TYPE: SamplesFormat = {
name: 'csv',
header: false,
columns: ['ip', 'timestamp', 'request', 'status', '', 'bytes'],
};

View file

@ -13,17 +13,26 @@ import { FakeLLM } from '@langchain/core/utils/testing';
import { logFormatDetectionTestState } from '../../../__jest__/fixtures/log_type_detection';
import type { LogFormatDetectionState } from '../../types';
import { handleLogFormatDetection } from './detection';
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
const model = new FakeLLM({
response: '{ "log_type": "structured"}',
response: '{ "name": "structured"}',
}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel;
const state: LogFormatDetectionState = logFormatDetectionTestState;
describe('Testing log type detection handler', () => {
it('handleLogFormatDetection()', async () => {
const response = await handleLogFormatDetection({ state, model });
expect(response.samplesFormat).toStrictEqual({ name: 'structured' });
const client = {
asCurrentUser: {
ingest: {
simulate: jest.fn(),
},
},
} as unknown as IScopedClusterClient;
const response = await handleLogFormatDetection({ state, model, client });
expect(response.samplesFormat).toStrictEqual({ name: 'structured', header: false });
expect(response.lastExecutedChain).toBe('logFormatDetection');
});
});

View file

@ -8,6 +8,7 @@ import { JsonOutputParser } from '@langchain/core/output_parsers';
import type { LogFormatDetectionState } from '../../types';
import { LOG_FORMAT_DETECTION_PROMPT } from './prompts';
import type { LogDetectionNodeParams } from './types';
import { SamplesFormat } from '../../../common';
const MaxLogSamplesInPrompt = 5;
@ -23,13 +24,23 @@ export async function handleLogFormatDetection({
? state.logSamples.slice(0, MaxLogSamplesInPrompt)
: state.logSamples;
const detectedLogFormatAnswer = await logFormatDetectionNode.invoke({
const logFormatDetectionResult = await logFormatDetectionNode.invoke({
ex_answer: state.exAnswer,
log_samples: samples,
package_title: state.packageTitle,
datastream_title: state.dataStreamTitle,
});
const logFormat = detectedLogFormatAnswer.log_type;
const header = detectedLogFormatAnswer.header;
let samplesFormat: SamplesFormat = { name: 'unsupported' };
return { samplesFormat: { name: logFormat }, header, lastExecutedChain: 'logFormatDetection' };
try {
samplesFormat = SamplesFormat.parse(logFormatDetectionResult);
if (samplesFormat.header === undefined) {
samplesFormat.header = false;
}
} catch (error) {
// If the LLM fails to produce the output of specified format, we will default to unsupported.
}
return { samplesFormat, header: samplesFormat.header, lastExecutedChain: 'logFormatDetection' };
}

View file

@ -29,11 +29,7 @@ describe('LogFormatDetectionGraph', () => {
it('Ensures that the graph compiles', async () => {
// When getLogFormatDetectionGraph runs, langgraph compiles the graph it will error if the graph has any issues.
// Common issues for example detecting a node has no next step, or there is a infinite loop between them.
try {
await getLogFormatDetectionGraph({ model, client });
} catch (error) {
fail(`getLogFormatDetectionGraph threw an error: ${error}`);
}
await getLogFormatDetectionGraph({ model, client });
});
});
});

View file

@ -10,6 +10,7 @@ import { END, START, StateGraph } from '@langchain/langgraph';
import type { LogFormatDetectionState } from '../../types';
import { EX_ANSWER_LOG_TYPE } from './constants';
import { handleLogFormatDetection } from './detection';
import { handleCSV } from '../csv/csv';
import { ESProcessorItem, SamplesFormat } from '../../../common';
import { getKVGraph } from '../kv/graph';
import { LogDetectionGraphParams, LogDetectionBaseNodeParams } from './types';
@ -29,6 +30,14 @@ const graphState: StateGraphArgs<LogFormatDetectionState>['channels'] = {
value: (x: string, y?: string) => y ?? x,
default: () => '',
},
packageTitle: {
value: (x: string, y?: string) => y ?? x,
default: () => '',
},
dataStreamTitle: {
value: (x: string, y?: string) => y ?? x,
default: () => '',
},
logSamples: {
value: (x: string[], y?: string[]) => y ?? x,
default: () => [],
@ -94,9 +103,9 @@ function logFormatRouter({ state }: LogDetectionBaseNodeParams): string {
if (state.samplesFormat.name === LogFormat.UNSTRUCTURED) {
return 'unstructured';
}
// if (state.samplesFormat === LogFormat.CSV) {
// return 'csv';
// }
if (state.samplesFormat.name === LogFormat.CSV) {
return 'csv';
}
return 'unsupported';
}
@ -107,15 +116,16 @@ export async function getLogFormatDetectionGraph({ model, client }: LogDetection
.addNode('modelInput', (state: LogFormatDetectionState) => modelInput({ state }))
.addNode('modelOutput', (state: LogFormatDetectionState) => modelOutput({ state }))
.addNode('handleLogFormatDetection', (state: LogFormatDetectionState) =>
handleLogFormatDetection({ state, model })
handleLogFormatDetection({ state, model, client })
)
.addNode('handleKVGraph', await getKVGraph({ model, client }))
.addNode('handleUnstructuredGraph', await getUnstructuredGraph({ model, client }))
// .addNode('handleCsvGraph', (state: LogFormatDetectionState) => getCompiledCsvGraph({state, model}))
.addNode('handleCSV', (state: LogFormatDetectionState) => handleCSV({ state, model, client }))
.addEdge(START, 'modelInput')
.addEdge('modelInput', 'handleLogFormatDetection')
.addEdge('handleKVGraph', 'modelOutput')
.addEdge('handleUnstructuredGraph', 'modelOutput')
.addEdge('handleCSV', 'modelOutput')
.addEdge('modelOutput', END)
.addConditionalEdges(
'handleLogFormatDetection',
@ -123,7 +133,7 @@ export async function getLogFormatDetectionGraph({ model, client }: LogDetection
{
structured: 'handleKVGraph',
unstructured: 'handleUnstructuredGraph',
// csv: 'handleCsvGraph',
csv: 'handleCSV',
unsupported: 'modelOutput',
}
);

View file

@ -8,30 +8,27 @@ import { ChatPromptTemplate } from '@langchain/core/prompts';
export const LOG_FORMAT_DETECTION_PROMPT = ChatPromptTemplate.fromMessages([
[
'system',
`You are a helpful, expert assistant in identifying different log types based on the format.
Here is some context for you to reference for your task, read it carefully as you will get questions about it later:
<context>
<log_samples>
{log_samples}
</log_samples>
</context>`,
`You are a helpful, expert assistant specializing in all things logs. You're great at analyzing log samples.`,
],
[
'human',
`Looking at the log samples , our goal is to identify the syslog type based on the guidelines below.
Follow these steps to identify the log format type:
1. Go through each log sample and identify the log format type.
`The current task is to identify the log format from the provided samples based on the guidelines below.
The samples apply to the data stream {datastream_title} inside the integration package {package_title}.
Follow these steps to do this:
1. Go through each log sample and identify the log format. Output this as "name: <log_format>".
2. If the samples have any or all of priority, timestamp, loglevel, hostname, ipAddress, messageId in the beginning information then set "header: true".
3. If the samples have a syslog header then set "header: true" , else set "header: false". If you are unable to determine the syslog header presence then set "header: false".
4. If the log samples have structured message body with key-value pairs then classify it as "log_type: structured". Look for a flat list of key-value pairs, often separated by spaces, commas, or other delimiters.
4. If the log samples have structured message body with key-value pairs then classify it as "name: structured". Look for a flat list of key-value pairs, often separated by spaces, commas, or other delimiters.
5. Consider variations in formatting, such as quotes around values ("key=value", key="value"), special characters in keys or values, or escape sequences.
6. If the log samples have unstructured body like a free-form text then classify it as "log_type: unstructured".
7. If the log samples follow a csv format then classify it as "log_type: csv".
8. If the samples are identified as "csv" and there is a csv header then set "header: true" , else set "header: false".
9. If you do not find the log format in any of the above categories then classify it as "log_type: unsupported".
6. If the log samples have unstructured body like a free-form text then classify it as "name: unstructured".
7. If the log samples follow a csv format then classify it with "name: csv". There are two sub-cases for csv:
a. If there is a csv header then set "header: true".
b. If there is no csv header then set "header: false" and try to find good names for the columns in the "columns" array by looking into the values of data in those columns. For each column, if you are unable to find good name candidate for it then output an empty string, like in the example.
8. If you cannot put the format into any of the above categories then classify it with "name: unsupported".
You ALWAYS follow these guidelines when writing your response:
You ALWAYS follow these guidelines when writing your response:
<guidelines>
- Do not respond with anything except the updated current mapping JSON object enclosed with 3 backticks (\`). See example response below.
</guidelines>
@ -42,7 +39,13 @@ A: Please find the JSON object below:
\`\`\`json
{ex_answer}
\`\`\`
</example>`,
</example>
Please process these log samples:
<log_samples>
{log_samples}
</log_samples>
`,
],
['ai', 'Please find the JSON object below:'],
]);

View file

@ -14,6 +14,7 @@ export interface LogDetectionBaseNodeParams {
export interface LogDetectionNodeParams extends LogDetectionBaseNodeParams {
model: ChatModels;
client: IScopedClusterClient;
}
export interface LogDetectionGraphParams {

View file

@ -17,11 +17,3 @@ export const GROK_ERROR_EXAMPLE_ANSWER = {
'%{TIMESTAMP:timestamp}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}',
],
};
export const onFailure = {
append: {
field: 'error.message',
value:
'{% raw %}Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}{% endraw %}',
},
};

View file

@ -29,11 +29,7 @@ describe('UnstructuredGraph', () => {
it('Ensures that the graph compiles', async () => {
// When getUnstructuredGraph runs, langgraph compiles the graph it will error if the graph has any issues.
// Common issues for example detecting a node has no next step, or there is a infinite loop between them.
try {
await getUnstructuredGraph({ model, client });
} catch (error) {
fail(`getUnstructuredGraph threw an error: ${error}`);
}
await getUnstructuredGraph({ model, client });
});
});
});

View file

@ -8,8 +8,7 @@
import type { UnstructuredLogState } from '../../types';
import type { HandleUnstructuredNodeParams, LogResult } from './types';
import { testPipeline } from '../../util';
import { onFailure } from './constants';
import { createGrokProcessor } from '../../util/processors';
import { createGrokProcessor, createPassthroughFailureProcessor } from '../../util/processors';
export async function handleUnstructuredValidate({
state,
@ -17,10 +16,10 @@ export async function handleUnstructuredValidate({
}: HandleUnstructuredNodeParams): Promise<Partial<UnstructuredLogState>> {
const grokPatterns = state.grokPatterns;
const grokProcessor = createGrokProcessor(grokPatterns);
const pipeline = { processors: grokProcessor, on_failure: [onFailure] };
const pipeline = { processors: grokProcessor, on_failure: [createPassthroughFailureProcessor()] };
const packageName = state.packageName;
const dataStreamName = state.dataStreamName;
const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as {
pipelineResults: LogResult[];
errors: object[];

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { KibanaResponseFactory } from '@kbn/core/server';
import { ErrorThatHandlesItsOwnResponse } from './types';
import { GenerationErrorCode } from '../../../common/constants';
import {
GenerationErrorAttributes,
GenerationErrorBody,
} from '../../../common/api/generation_error';
const errorCode = GenerationErrorCode.UNPARSEABLE_CSV_DATA;
export interface CSVParseError {
message: string[];
}
export class UnparseableCSVFormatError extends Error implements ErrorThatHandlesItsOwnResponse {
attributes: GenerationErrorAttributes;
constructor(csvParseErrors: CSVParseError[]) {
super(errorCode);
this.attributes = {
errorCode,
underlyingMessages: csvParseErrors.flatMap((error) => error.message),
};
}
public sendResponse(res: KibanaResponseFactory) {
const body: GenerationErrorBody = {
message: errorCode,
attributes: this.attributes,
};
return res.customError({
statusCode: 422,
body,
});
}
}

View file

@ -7,10 +7,10 @@
import { KibanaResponseFactory } from '@kbn/core/server';
import { ErrorThatHandlesItsOwnResponse } from './types';
import { ErrorCode } from '../../../common/constants';
import { GenerationErrorCode } from '../../../common/constants';
export class UnsupportedLogFormatError extends Error implements ErrorThatHandlesItsOwnResponse {
private readonly errorCode: string = ErrorCode.UNSUPPORTED_LOG_SAMPLES_FORMAT;
private readonly errorCode: string = GenerationErrorCode.UNSUPPORTED_LOG_SAMPLES_FORMAT;
// eslint-disable-next-line @typescript-eslint/no-useless-constructor
constructor(message: string) {

View file

@ -18,7 +18,7 @@ import { buildRouteValidationWithZod } from '../util/route_validation';
import { withAvailability } from './with_availability';
import { isErrorThatHandlesItsOwnResponse, UnsupportedLogFormatError } from '../lib/errors';
import { handleCustomErrors } from './routes_util';
import { ErrorCode } from '../../common/constants';
import { GenerationErrorCode } from '../../common/constants';
export function registerAnalyzeLogsRoutes(
router: IRouter<IntegrationAssistantRouteHandlerContext>
@ -43,7 +43,14 @@ export function registerAnalyzeLogsRoutes(
},
},
withAvailability(async (context, req, res): Promise<IKibanaResponse<AnalyzeLogsResponse>> => {
const { packageName, dataStreamName, logSamples, langSmithOptions } = req.body;
const {
packageName,
dataStreamName,
packageTitle,
dataStreamTitle,
logSamples,
langSmithOptions,
} = req.body;
const services = await context.resolve(['core']);
const { client } = services.core.elasticsearch;
const { getStartServices, logger } = await context.integrationAssistant;
@ -79,18 +86,20 @@ export function registerAnalyzeLogsRoutes(
const logFormatParameters = {
packageName,
dataStreamName,
packageTitle,
dataStreamTitle,
logSamples,
};
const graph = await getLogFormatDetectionGraph({ model, client });
const graphResults = await graph.invoke(logFormatParameters, options);
const graphLogFormat = graphResults.results.samplesFormat.name;
if (graphLogFormat === 'unsupported' || graphLogFormat === 'csv') {
throw new UnsupportedLogFormatError(ErrorCode.UNSUPPORTED_LOG_SAMPLES_FORMAT);
if (graphLogFormat === 'unsupported') {
throw new UnsupportedLogFormatError(GenerationErrorCode.UNSUPPORTED_LOG_SAMPLES_FORMAT);
}
return res.ok({ body: AnalyzeLogsResponse.parse(graphResults) });
} catch (err) {
try {
handleCustomErrors(err, ErrorCode.RECURSION_LIMIT_ANALYZE_LOGS);
handleCustomErrors(err, GenerationErrorCode.RECURSION_LIMIT_ANALYZE_LOGS);
} catch (e) {
if (isErrorThatHandlesItsOwnResponse(e)) {
return e.sendResponse(res);

View file

@ -13,7 +13,7 @@ import { buildRouteValidationWithZod } from '../util/route_validation';
import { withAvailability } from './with_availability';
import { isErrorThatHandlesItsOwnResponse } from '../lib/errors';
import { handleCustomErrors } from './routes_util';
import { ErrorCode } from '../../common/constants';
import { GenerationErrorCode } from '../../common/constants';
export function registerIntegrationBuilderRoutes(
router: IRouter<IntegrationAssistantRouteHandlerContext>
) {
@ -42,7 +42,7 @@ export function registerIntegrationBuilderRoutes(
});
} catch (err) {
try {
handleCustomErrors(err, ErrorCode.RECURSION_LIMIT);
handleCustomErrors(err, GenerationErrorCode.RECURSION_LIMIT);
} catch (e) {
if (isErrorThatHandlesItsOwnResponse(e)) {
return e.sendResponse(response);

View file

@ -22,7 +22,7 @@ import { buildRouteValidationWithZod } from '../util/route_validation';
import { withAvailability } from './with_availability';
import { isErrorThatHandlesItsOwnResponse } from '../lib/errors';
import { handleCustomErrors } from './routes_util';
import { ErrorCode } from '../../common/constants';
import { GenerationErrorCode } from '../../common/constants';
export function registerCategorizationRoutes(
router: IRouter<IntegrationAssistantRouteHandlerContext>
@ -103,7 +103,7 @@ export function registerCategorizationRoutes(
return res.ok({ body: CategorizationResponse.parse(results) });
} catch (err) {
try {
handleCustomErrors(err, ErrorCode.RECURSION_LIMIT);
handleCustomErrors(err, GenerationErrorCode.RECURSION_LIMIT);
} catch (e) {
if (isErrorThatHandlesItsOwnResponse(e)) {
return e.sendResponse(res);

View file

@ -18,7 +18,7 @@ import { buildRouteValidationWithZod } from '../util/route_validation';
import { withAvailability } from './with_availability';
import { isErrorThatHandlesItsOwnResponse } from '../lib/errors';
import { handleCustomErrors } from './routes_util';
import { ErrorCode } from '../../common/constants';
import { GenerationErrorCode } from '../../common/constants';
export function registerEcsRoutes(router: IRouter<IntegrationAssistantRouteHandlerContext>) {
router.versioned
@ -97,7 +97,7 @@ export function registerEcsRoutes(router: IRouter<IntegrationAssistantRouteHandl
return res.ok({ body: EcsMappingResponse.parse(results) });
} catch (err) {
try {
handleCustomErrors(err, ErrorCode.RECURSION_LIMIT);
handleCustomErrors(err, GenerationErrorCode.RECURSION_LIMIT);
} catch (e) {
if (isErrorThatHandlesItsOwnResponse(e)) {
return e.sendResponse(res);

View file

@ -14,7 +14,7 @@ import { buildRouteValidationWithZod } from '../util/route_validation';
import { withAvailability } from './with_availability';
import { isErrorThatHandlesItsOwnResponse } from '../lib/errors';
import { handleCustomErrors } from './routes_util';
import { ErrorCode } from '../../common/constants';
import { GenerationErrorCode } from '../../common/constants';
export function registerPipelineRoutes(router: IRouter<IntegrationAssistantRouteHandlerContext>) {
router.versioned
@ -51,7 +51,7 @@ export function registerPipelineRoutes(router: IRouter<IntegrationAssistantRoute
});
} catch (err) {
try {
handleCustomErrors(err, ErrorCode.RECURSION_LIMIT);
handleCustomErrors(err, GenerationErrorCode.RECURSION_LIMIT);
} catch (e) {
if (isErrorThatHandlesItsOwnResponse(e)) {
return e.sendResponse(res);

View file

@ -18,7 +18,7 @@ import { buildRouteValidationWithZod } from '../util/route_validation';
import { withAvailability } from './with_availability';
import { isErrorThatHandlesItsOwnResponse } from '../lib/errors';
import { handleCustomErrors } from './routes_util';
import { ErrorCode } from '../../common/constants';
import { GenerationErrorCode } from '../../common/constants';
export function registerRelatedRoutes(router: IRouter<IntegrationAssistantRouteHandlerContext>) {
router.versioned
@ -94,7 +94,7 @@ export function registerRelatedRoutes(router: IRouter<IntegrationAssistantRouteH
return res.ok({ body: RelatedResponse.parse(results) });
} catch (err) {
try {
handleCustomErrors(err, ErrorCode.RECURSION_LIMIT);
handleCustomErrors(err, GenerationErrorCode.RECURSION_LIMIT);
} catch (e) {
if (isErrorThatHandlesItsOwnResponse(e)) {
return e.sendResponse(res);

View file

@ -8,12 +8,12 @@
import { handleCustomErrors } from './routes_util';
import { GraphRecursionError } from '@langchain/langgraph';
import { RecursionLimitError } from '../lib/errors';
import { ErrorCode } from '../../common/constants';
import { GenerationErrorCode } from '../../common/constants';
describe('handleError', () => {
it('should throw a RecursionLimitError when given a GraphRecursionError', () => {
const errorMessage = 'Recursion limit exceeded';
const errorCode = ErrorCode.RECURSION_LIMIT;
const errorCode = GenerationErrorCode.RECURSION_LIMIT;
const recursionError = new GraphRecursionError(errorMessage);
expect(() => {
@ -26,7 +26,7 @@ describe('handleError', () => {
it('should rethrow the error when given an error that is not a GraphRecursionError', () => {
const errorMessage = 'Some other error';
const errorCode = ErrorCode.RECURSION_LIMIT;
const errorCode = GenerationErrorCode.RECURSION_LIMIT;
const otherError = new Error(errorMessage);
expect(() => {

View file

@ -6,7 +6,7 @@
*/
import { GraphRecursionError } from '@langchain/langgraph';
import { ErrorCode } from '../../common/constants';
import { GenerationErrorCode } from '../../common/constants';
import { RecursionLimitError } from '../lib/errors';
/**
@ -21,7 +21,9 @@ import { RecursionLimitError } from '../lib/errors';
*/
export function handleCustomErrors(
err: Error,
recursionErrorCode: ErrorCode.RECURSION_LIMIT | ErrorCode.RECURSION_LIMIT_ANALYZE_LOGS
recursionErrorCode:
| GenerationErrorCode.RECURSION_LIMIT
| GenerationErrorCode.RECURSION_LIMIT_ANALYZE_LOGS
) {
if (err instanceof GraphRecursionError) {
throw new RecursionLimitError(err.message, recursionErrorCode);

View file

@ -109,6 +109,8 @@ export interface LogFormatDetectionState {
lastExecutedChain: string;
packageName: string;
dataStreamName: string;
packageTitle: string;
dataStreamTitle: string;
logSamples: string[];
jsonSamples: string[];
exAnswer: string;
@ -117,7 +119,7 @@ export interface LogFormatDetectionState {
header: boolean;
ecsVersion: string;
results: object;
additionalProcessors: ESProcessorItem[]; // # This will be generated in the sub-graphs
additionalProcessors: ESProcessorItem[]; // Generated in handleXXX nodes or subgraphs.
}
export interface KVState {

View file

@ -17,5 +17,5 @@ export {
export { generateFields, mergeSamples } from './samples';
export { deepCopy, generateUniqueId } from './util';
export { testPipeline } from './pipeline';
export { testPipeline, createJSONInput } from './pipeline';
export { combineProcessors } from './processors';

View file

@ -5,6 +5,8 @@
* 2.0.
*/
import type { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { ESProcessorItem } from '../../common';
import { createPassthroughFailureProcessor, createRemoveProcessor } from './processors';
interface DocTemplate {
_index: string;
@ -29,15 +31,17 @@ export async function testPipeline(
samples: string[],
pipeline: object,
client: IScopedClusterClient
): Promise<{ pipelineResults: object[]; errors: object[] }> {
): Promise<{ pipelineResults: Array<{ [key: string]: unknown }>; errors: object[] }> {
const docs = samples.map((sample) => formatSample(sample));
const pipelineResults: object[] = [];
const pipelineResults: Array<{ [key: string]: unknown }> = [];
const errors: object[] = [];
try {
const output = await client.asCurrentUser.ingest.simulate({ docs, pipeline });
for (const doc of output.docs) {
if (doc.doc?._source?.error) {
if (!doc) {
// Nothing to do the document was dropped.
} else if (doc.doc?._source?.error) {
errors.push(doc.doc._source.error);
} else if (doc.doc?._source) {
pipelineResults.push(doc.doc._source);
@ -49,3 +53,16 @@ export async function testPipeline(
return { pipelineResults, errors };
}
export async function createJSONInput(
processors: ESProcessorItem[],
formattedSamples: string[],
client: IScopedClusterClient
): Promise<{ pipelineResults: Array<{ [key: string]: unknown }>; errors: object[] }> {
const pipeline = {
processors: [...processors, createRemoveProcessor()],
on_failure: [createPassthroughFailureProcessor()],
};
const { pipelineResults, errors } = await testPipeline(formattedSamples, pipeline, client);
return { pipelineResults, errors };
}

View file

@ -77,3 +77,64 @@ export function createKVProcessor(kvInput: KVProcessor, state: KVState): ESProce
const kvProcessor = safeLoad(renderedTemplate) as ESProcessorItem;
return kvProcessor;
}
// Processor for the csv input to convert it to JSON.
export function createCSVProcessor(source: string, targets: string[]): ESProcessorItem {
return {
csv: {
field: source,
target_fields: targets,
description: 'Parse CSV input',
tag: 'parse_csv',
},
};
}
// Trivial processor for the on_failure part of the pipeline.
// Use only if the source of error is not necessary.
export function createPassthroughFailureProcessor(): ESProcessorItem {
return {
append: {
field: 'error.message',
description: 'Append the error message as-is',
tag: 'append_error_message',
value: '{{{_ingest.on_failure_message}}}',
},
};
}
// Processor to remove the message field.
export function createRemoveProcessor(): ESProcessorItem {
return {
remove: {
field: 'message',
ignore_missing: true,
description: 'Remove the message field',
tag: 'remove_message_field',
},
};
}
// Processor to drop the specific values.
// values is a record of key value pairs to match against the fields
// root is the root of the fields to match against
export function createDropProcessor(
values: Record<string, unknown>,
prefix: string[],
tag: string,
description: string
): ESProcessorItem {
const prefixExpression = prefix.join('?.');
const conditions = Object.entries(values)
.map(([key, value]) => `ctx.${prefixExpression}?.${key} == '${String(value)}'`)
.join(' && ');
return {
drop: {
if: conditions,
ignore_failure: true,
description,
tag,
},
};
}

View file

@ -24777,7 +24777,6 @@
"xpack.integrationAssistant.step.dataStream.integrationNameDescription": "Le nom du package est utilisé pour faire référence à l'intégration dans le pipeline d'ingestion d'Elastic",
"xpack.integrationAssistant.step.dataStream.integrationNameTitle": "Définir le nom du package",
"xpack.integrationAssistant.step.dataStream.logsSample.description": "Glissez et déposez un fichier ou parcourez les fichiers.",
"xpack.integrationAssistant.step.dataStream.logsSample.description2": "Format JSON/NDJSON",
"xpack.integrationAssistant.step.dataStream.logsSample.errorCanNotRead": "Impossible de lire le fichier de logs exemple",
"xpack.integrationAssistant.step.dataStream.logsSample.errorEmpty": "Le fichier de logs exemple est vide",
"xpack.integrationAssistant.step.dataStream.logsSample.errorNotArray": "Le fichier de logs exemple n'est pas un tableau",

View file

@ -24524,7 +24524,6 @@
"xpack.integrationAssistant.step.dataStream.integrationNameDescription": "このパッケージ名は、Elasticのインジェストパイプラインの統合を参照するために使用されます",
"xpack.integrationAssistant.step.dataStream.integrationNameTitle": "パッケージ名を定義",
"xpack.integrationAssistant.step.dataStream.logsSample.description": "ファイルをドラッグアンドドロップするか、ファイルを参照します",
"xpack.integrationAssistant.step.dataStream.logsSample.description2": "JSON/NDJSON形式",
"xpack.integrationAssistant.step.dataStream.logsSample.errorCanNotRead": "ログサンプルファイルを読み取れませんでした",
"xpack.integrationAssistant.step.dataStream.logsSample.errorEmpty": "ログサンプルファイルが空です",
"xpack.integrationAssistant.step.dataStream.logsSample.errorNotArray": "ログサンプルファイルは配列ではありません",

View file

@ -24558,7 +24558,6 @@
"xpack.integrationAssistant.step.dataStream.integrationNameDescription": "软件包名称用于在 Elastic 采集管道中引用集成",
"xpack.integrationAssistant.step.dataStream.integrationNameTitle": "定义软件包名称",
"xpack.integrationAssistant.step.dataStream.logsSample.description": "拖放文件或浏览文件。",
"xpack.integrationAssistant.step.dataStream.logsSample.description2": "JSON/NDJSON 格式",
"xpack.integrationAssistant.step.dataStream.logsSample.errorCanNotRead": "无法读取日志样例文件",
"xpack.integrationAssistant.step.dataStream.logsSample.errorEmpty": "日志样例文件为空",
"xpack.integrationAssistant.step.dataStream.logsSample.errorNotArray": "日志样例文件不是数组",