mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[Automatic Import] Fix unstructured syslog flow (#213042)
## Summary This PR fixes the Unstructured syslog flow. It picks up 5 samples send them to LLM to create a pattern and tests all the samples against the pattern , collects the unparsed samples [ if any ] , send them in for next round of pattern check and so on. This creates a list of patterns that matches all the samples and creates a grok processor with those patterns and it breaks the syslogs down into a JSON for ECS mapping , categorization and related graphs. ### Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios - [x] The PR description includes the appropriate Release Notes section, and the correct `release_note:*` label is applied per the [guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
This commit is contained in:
parent
367ff8dbec
commit
715a72fa18
15 changed files with 306 additions and 189 deletions
|
@ -14,12 +14,14 @@ export const unstructuredLogState = {
|
|||
jsonSamples: ['{"message":"dummy data"}'],
|
||||
finalized: false,
|
||||
ecsVersion: 'testVersion',
|
||||
errors: { test: 'testerror' },
|
||||
errors: [{ test: 'testerror' }],
|
||||
additionalProcessors: [],
|
||||
isFirst: false,
|
||||
unParsedSamples: ['dummy data'],
|
||||
currentPattern: '%{GREEDYDATA:message}',
|
||||
};
|
||||
|
||||
export const unstructuredLogResponse = {
|
||||
grok_patterns: [
|
||||
grok_pattern:
|
||||
'####<%{MONTH} %{MONTHDAY}, %{YEAR} %{TIME} (?:AM|PM) %{WORD:timezone}> <%{WORD:log_level}> <%{WORD:component}> <%{DATA:hostname}> <%{DATA:server_name}> <%{DATA:thread_info}> <%{DATA:user}> <%{DATA:empty_field}> <%{DATA:empty_field2}> <%{NUMBER:timestamp}> <%{DATA:message_id}> <%{GREEDYDATA:message}>',
|
||||
],
|
||||
};
|
||||
|
|
|
@ -12,3 +12,35 @@ export const EX_ANSWER_LOG_TYPE: SamplesFormat = {
|
|||
header: false,
|
||||
columns: ['ip', 'timestamp', 'request', 'status', '', 'bytes'],
|
||||
};
|
||||
export const LOG_FORMAT_EXAMPLE_LOGS = [
|
||||
{
|
||||
example:
|
||||
'[18/Feb/2025:22:39:16 +0000] CONNECT conn=20597223 from=10.1.1.1:1234 to=10.2.3.4:4389 protocol=LDAP',
|
||||
format: 'Structured',
|
||||
},
|
||||
{
|
||||
example:
|
||||
'2021-10-22 22:12:09,871 DEBUG [org.keycloak.events] (default task-3) operationType=CREATE, realmId=test, clientId=abcdefgh userId=sdfsf-b89c-4fca-9088-sdfsfsf, ipAddress=10.1.1.1, resourceType=USER, resourcePath=users/07972d16-b173-4c99-803d-90f211080f40',
|
||||
format: 'Structured',
|
||||
},
|
||||
{
|
||||
example:
|
||||
'<166>Aug 21 22:08:13 myfirewall.my-domain.tld (squid-1)[6802]: [1598040493.253 325](tel:1598040493.253 325) 175.16.199.1 TCP_MISS/304 2912 GET https://github.com/3ilson/pfelk/file-list/master - HIER_DIRECT/81.2.69.145 -',
|
||||
format: 'Unstructured',
|
||||
},
|
||||
{
|
||||
example:
|
||||
'<30>1 2021-07-03T23:01:56.547105-05:00 pfSense.example.com charon 18610 - - 08[CFG] ppk_id = (null)',
|
||||
format: 'Unstructured',
|
||||
},
|
||||
{
|
||||
example:
|
||||
'2016/10/25 14:49:34 [error] 54053#0: *1 open() "/usr/local/Cellar/nginx/1.10.2_1/html/favicon.ico" failed (2: No such file or directory)',
|
||||
format: 'Unstructured',
|
||||
},
|
||||
{
|
||||
example:
|
||||
'2025/02/12|14:42:42:871|FAKePolicyNumber-ws-sharedendorsement-autocore-54--fhfh-rghrg-0|INFO |http-nio-8080-exec-58 |RatingHelper.sendToPolicyPro:1521 |-call to PolicyPro for /rest/v2/actions/ISSUEEXT successful',
|
||||
format: 'Unstructured',
|
||||
},
|
||||
];
|
||||
|
|
|
@ -10,6 +10,7 @@ import { LOG_FORMAT_DETECTION_PROMPT } from './prompts';
|
|||
import type { LogDetectionNodeParams } from './types';
|
||||
import { SamplesFormat } from '../../../common';
|
||||
import { LOG_FORMAT_DETECTION_SAMPLE_ROWS } from '../../../common/constants';
|
||||
import { LOG_FORMAT_EXAMPLE_LOGS } from './constants';
|
||||
|
||||
export async function handleLogFormatDetection({
|
||||
state,
|
||||
|
@ -26,6 +27,7 @@ export async function handleLogFormatDetection({
|
|||
const logFormatDetectionResult = await logFormatDetectionNode.invoke({
|
||||
ex_answer: state.exAnswer,
|
||||
log_samples: samples.join('\n'),
|
||||
example_logs: LOG_FORMAT_EXAMPLE_LOGS,
|
||||
package_title: state.packageTitle,
|
||||
datastream_title: state.dataStreamTitle,
|
||||
});
|
||||
|
|
|
@ -27,11 +27,18 @@ Follow these steps to do this:
|
|||
* 'leef': If the log samples have Log Event Extended Format (LEEF) then classify it as "name: leef".
|
||||
* 'fix': If the log samples have Financial Information eXchange (FIX) then classify it as "name: fix".
|
||||
* 'unsupported': If you cannot put the format into any of the above categories then classify it with "name: unsupported".
|
||||
2. Header: for structured and unstructured format:
|
||||
2. You can look at the example_logs in the context to understand different log formats.
|
||||
3. Header: for structured and unstructured format:
|
||||
- if the samples have any or all of priority, timestamp, loglevel, hostname, ipAddress, messageId in the beginning information then set "header: true".
|
||||
- if the samples have a syslog header then set "header: true"
|
||||
- else set "header: false". If you are unable to determine the syslog header presence then set "header: false".
|
||||
3. Note that a comma-separated list should be classified as 'csv' if its rows only contain values separated by commas. But if it looks like a list of comma separated key-values pairs like 'key1=value1, key2=value2' it should be classified as 'structured'.
|
||||
4. Note that a comma-separated list should be classified as 'csv' if its rows only contain values separated by commas. But if it looks like a list of comma separated key-values pairs like 'key1=value1, key2=value2' it should be classified as 'structured'.
|
||||
|
||||
<example_logs>
|
||||
\`\`\`json
|
||||
{example_logs}
|
||||
\`\`\`
|
||||
</example_logs>
|
||||
|
||||
You ALWAYS follow these guidelines when writing your response:
|
||||
<guidelines>
|
||||
|
|
|
@ -7,13 +7,10 @@
|
|||
|
||||
export const GROK_EXAMPLE_ANSWER = {
|
||||
rfc: 'RFC2454',
|
||||
regex:
|
||||
'/(?:(d{4}[-]d{2}[-]d{2}[T]d{2}[:]d{2}[:]d{2}(?:.d{1,6})?(?:[+-]d{2}[:]d{2}|Z)?)|-)s(?:([w][wd.@-]*)|-)s(.*)$/',
|
||||
grok_patterns: ['%{WORD:key1}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}'],
|
||||
grok_pattern: '%{WORD:key1}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}',
|
||||
};
|
||||
|
||||
export const GROK_ERROR_EXAMPLE_ANSWER = {
|
||||
grok_patterns: [
|
||||
grok_pattern:
|
||||
'%{TIMESTAMP:timestamp}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}',
|
||||
],
|
||||
};
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { JsonOutputParser } from '@langchain/core/output_parsers';
|
||||
import type { UnstructuredLogState } from '../../types';
|
||||
import type { HandleUnstructuredNodeParams } from './types';
|
||||
import { GROK_ERROR_PROMPT } from './prompts';
|
||||
import { GROK_ERROR_EXAMPLE_ANSWER } from './constants';
|
||||
|
||||
export async function handleUnstructuredError({
|
||||
state,
|
||||
model,
|
||||
}: HandleUnstructuredNodeParams): Promise<Partial<UnstructuredLogState>> {
|
||||
const outputParser = new JsonOutputParser();
|
||||
const grokErrorGraph = GROK_ERROR_PROMPT.pipe(model).pipe(outputParser);
|
||||
const currentPatterns = state.grokPatterns;
|
||||
|
||||
const pattern = await grokErrorGraph.invoke({
|
||||
packageName: state.packageName,
|
||||
dataStreamName: state.dataStreamName,
|
||||
current_pattern: JSON.stringify(currentPatterns, null, 2),
|
||||
errors: JSON.stringify(state.errors, null, 2),
|
||||
ex_answer: JSON.stringify(GROK_ERROR_EXAMPLE_ANSWER, null, 2),
|
||||
});
|
||||
|
||||
return {
|
||||
grokPatterns: pattern.grok_patterns,
|
||||
lastExecutedChain: 'unstructuredError',
|
||||
};
|
||||
}
|
|
@ -1,40 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { FakeLLM } from '@langchain/core/utils/testing';
|
||||
import { handleUnstructuredError } from './error';
|
||||
import type { UnstructuredLogState } from '../../types';
|
||||
import {
|
||||
unstructuredLogState,
|
||||
unstructuredLogResponse,
|
||||
} from '../../../__jest__/fixtures/unstructured';
|
||||
import {
|
||||
ActionsClientChatOpenAI,
|
||||
ActionsClientSimpleChatModel,
|
||||
} from '@kbn/langchain/server/language_models';
|
||||
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
|
||||
|
||||
const model = new FakeLLM({
|
||||
response: JSON.stringify(unstructuredLogResponse, null, 2),
|
||||
}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel;
|
||||
|
||||
const state: UnstructuredLogState = unstructuredLogState;
|
||||
|
||||
describe('Testing unstructured error handling node', () => {
|
||||
const client = {
|
||||
asCurrentUser: {
|
||||
ingest: {
|
||||
simulate: jest.fn(),
|
||||
},
|
||||
},
|
||||
} as unknown as IScopedClusterClient;
|
||||
it('handleUnstructuredError()', async () => {
|
||||
const response = await handleUnstructuredError({ state, model, client });
|
||||
expect(response.grokPatterns).toStrictEqual(unstructuredLogResponse.grok_patterns);
|
||||
expect(response.lastExecutedChain).toBe('unstructuredError');
|
||||
});
|
||||
});
|
|
@ -10,7 +10,6 @@ import { StateGraph, END, START } from '@langchain/langgraph';
|
|||
import type { UnstructuredLogState } from '../../types';
|
||||
import { handleUnstructured } from './unstructured';
|
||||
import type { UnstructuredGraphParams, UnstructuredBaseNodeParams } from './types';
|
||||
import { handleUnstructuredError } from './error';
|
||||
import { handleUnstructuredValidate } from './validate';
|
||||
|
||||
const graphState: StateGraphArgs<UnstructuredLogState>['channels'] = {
|
||||
|
@ -30,6 +29,10 @@ const graphState: StateGraphArgs<UnstructuredLogState>['channels'] = {
|
|||
value: (x: string[], y?: string[]) => y ?? x,
|
||||
default: () => [],
|
||||
},
|
||||
currentPattern: {
|
||||
value: (x: string, y?: string) => y ?? x,
|
||||
default: () => '',
|
||||
},
|
||||
grokPatterns: {
|
||||
value: (x: string[], y?: string[]) => y ?? x,
|
||||
default: () => [],
|
||||
|
@ -42,8 +45,12 @@ const graphState: StateGraphArgs<UnstructuredLogState>['channels'] = {
|
|||
value: (x: boolean, y?: boolean) => y ?? x,
|
||||
default: () => false,
|
||||
},
|
||||
unParsedSamples: {
|
||||
value: (x: string[], y?: string[]) => y ?? x,
|
||||
default: () => [],
|
||||
},
|
||||
errors: {
|
||||
value: (x: object, y?: object) => y ?? x,
|
||||
value: (x: object[], y?: object[]) => y ?? x,
|
||||
default: () => [],
|
||||
},
|
||||
additionalProcessors: {
|
||||
|
@ -54,11 +61,16 @@ const graphState: StateGraphArgs<UnstructuredLogState>['channels'] = {
|
|||
value: (x: string, y?: string) => y ?? x,
|
||||
default: () => '',
|
||||
},
|
||||
isFirst: {
|
||||
value: (x: boolean, y?: boolean) => y ?? x,
|
||||
default: () => false,
|
||||
},
|
||||
};
|
||||
|
||||
function modelInput({ state }: UnstructuredBaseNodeParams): Partial<UnstructuredLogState> {
|
||||
return {
|
||||
finalized: false,
|
||||
isFirst: true,
|
||||
lastExecutedChain: 'modelInput',
|
||||
};
|
||||
}
|
||||
|
@ -72,10 +84,10 @@ function modelOutput({ state }: UnstructuredBaseNodeParams): Partial<Unstructure
|
|||
}
|
||||
|
||||
function validationRouter({ state }: UnstructuredBaseNodeParams): string {
|
||||
if (Object.keys(state.errors).length === 0) {
|
||||
if (Object.keys(state.unParsedSamples).length === 0) {
|
||||
return 'modelOutput';
|
||||
}
|
||||
return 'handleUnstructuredError';
|
||||
return 'handleUnparsed';
|
||||
}
|
||||
|
||||
export async function getUnstructuredGraph({ model, client }: UnstructuredGraphParams) {
|
||||
|
@ -84,9 +96,6 @@ export async function getUnstructuredGraph({ model, client }: UnstructuredGraphP
|
|||
})
|
||||
.addNode('modelInput', (state: UnstructuredLogState) => modelInput({ state }))
|
||||
.addNode('modelOutput', (state: UnstructuredLogState) => modelOutput({ state }))
|
||||
.addNode('handleUnstructuredError', (state: UnstructuredLogState) =>
|
||||
handleUnstructuredError({ state, model, client })
|
||||
)
|
||||
.addNode('handleUnstructured', (state: UnstructuredLogState) =>
|
||||
handleUnstructured({ state, model, client })
|
||||
)
|
||||
|
@ -100,11 +109,10 @@ export async function getUnstructuredGraph({ model, client }: UnstructuredGraphP
|
|||
'handleUnstructuredValidate',
|
||||
(state: UnstructuredLogState) => validationRouter({ state }),
|
||||
{
|
||||
handleUnstructuredError: 'handleUnstructuredError',
|
||||
handleUnparsed: 'handleUnstructured',
|
||||
modelOutput: 'modelOutput',
|
||||
}
|
||||
)
|
||||
.addEdge('handleUnstructuredError', 'handleUnstructuredValidate')
|
||||
.addEdge('modelOutput', END);
|
||||
|
||||
const compiledUnstructuredGraph = workflow.compile();
|
||||
|
|
|
@ -14,6 +14,9 @@ export const GROK_MAIN_PROMPT = ChatPromptTemplate.fromMessages([
|
|||
<samples>
|
||||
{samples}
|
||||
</samples>
|
||||
<errors>
|
||||
{errors}
|
||||
</errors>
|
||||
</context>`,
|
||||
],
|
||||
[
|
||||
|
@ -22,18 +25,19 @@ export const GROK_MAIN_PROMPT = ChatPromptTemplate.fromMessages([
|
|||
Your goal is to accurately extract key components such as timestamps, hostnames, priority levels, process names, events, VLAN information, MAC addresses, IP addresses, STP roles, port statuses, messages and more.
|
||||
|
||||
Follow these steps to help improve the grok patterns and apply it step by step:
|
||||
1. Familiarize yourself with various syslog message formats.
|
||||
2. PRI (Priority Level): Encoded in angle brackets, e.g., <134>, indicating the facility and severity.
|
||||
3. Timestamp: Use \`SYSLOGTIMESTAMP\` for RFC 3164 timestamps (e.g., Aug 10 16:34:02). Use \`TIMESTAMP_ISO8601\` for ISO 8601 (RFC 5424) timestamps. For epoch time, use \`NUMBER\`.
|
||||
4. If the timestamp could not be categorized into a predefined format, extract the date time fields separately and combine them with the format identified in the grok pattern.
|
||||
5. Make sure to identify the timezone component in the timestamp.
|
||||
6. Hostname/IP Address: The system or device that generated the message, which could be an IP address or fully qualified domain name
|
||||
7. Process Name and PID: Often included with brackets, such as sshd[1234].
|
||||
8. VLAN information: Usually in the format of VLAN: 1234.
|
||||
9. MAC Address: The network interface MAC address.
|
||||
10. Port number: The port number on the device.
|
||||
11. Look for status codes ,interface ,log type, source ,User action, destination, protocol, etc.
|
||||
12. message: This is the free-form message text that varies widely across log entries.
|
||||
1. If there are errors try to identify the root cause and provide a solution.
|
||||
2. Familiarize yourself with various syslog message formats.
|
||||
3. PRI (Priority Level): Encoded in angle brackets, e.g., <134>, indicating the facility and severity.
|
||||
4. Timestamp: Use \`SYSLOGTIMESTAMP\` for RFC 3164 timestamps (e.g., Aug 10 16:34:02). Use \`TIMESTAMP_ISO8601\` for ISO 8601 (RFC 5424) timestamps. For epoch time, use \`NUMBER\`.
|
||||
5. If the timestamp could not be categorized into a predefined format, extract the date time fields separately and combine them with the format identified in the grok pattern.
|
||||
6. Make sure to identify the timezone component in the timestamp.
|
||||
7. Hostname/IP Address: The system or device that generated the message, which could be an IP address or fully qualified domain name
|
||||
8. Process Name and PID: Often included with brackets, such as sshd[1234].
|
||||
9. VLAN information: Usually in the format of VLAN: 1234.
|
||||
10. MAC Address: The network interface MAC address.
|
||||
11. Port number: The port number on the device.
|
||||
12. Look for status codes ,interface ,log type, source ,User action, destination, protocol, etc.
|
||||
13. message: This is the free-form message text that varies widely across log entries.
|
||||
|
||||
|
||||
You ALWAYS follow these guidelines when writing your response:
|
||||
|
@ -54,54 +58,3 @@ export const GROK_MAIN_PROMPT = ChatPromptTemplate.fromMessages([
|
|||
],
|
||||
['ai', 'Please find the JSON object below:'],
|
||||
]);
|
||||
|
||||
export const GROK_ERROR_PROMPT = ChatPromptTemplate.fromMessages([
|
||||
[
|
||||
'system',
|
||||
`You are an expert in Syslogs and identifying the headers and structured body in syslog messages. Here is some context for you to reference for your task, read it carefully as you will get questions about it later:
|
||||
<context>
|
||||
<current_pattern>
|
||||
{current_pattern}
|
||||
</current_pattern>
|
||||
</context>`,
|
||||
],
|
||||
[
|
||||
'human',
|
||||
`Please go through each error below, carefully review the provided current grok pattern, and resolve the most likely cause to the supplied error by returning an updated version of the current_pattern.
|
||||
|
||||
<errors>
|
||||
{errors}
|
||||
</errors>
|
||||
|
||||
Follow these steps to help improve the grok patterns and apply it step by step:
|
||||
1. Familiarize yourself with various syslog message formats.
|
||||
2. PRI (Priority Level): Encoded in angle brackets, e.g., <134>, indicating the facility and severity.
|
||||
3. Timestamp: Use \`SYSLOGTIMESTAMP\` for RFC 3164 timestamps (e.g., Aug 10 16:34:02). Use \`TIMESTAMP_ISO8601\` for ISO 8601 (RFC 5424) timestamps. For epoch time, use \`NUMBER\`.
|
||||
4. If the timestamp could not be categorized into a predefined format, extract the date time fields separately and combine them with the format identified in the grok pattern.
|
||||
5. Make sure to identify the timezone component in the timestamp.
|
||||
6. Hostname/IP Address: The system or device that generated the message, which could be an IP address or fully qualified domain name
|
||||
7. Process Name and PID: Often included with brackets, such as sshd[1234].
|
||||
8. VLAN information: Usually in the format of VLAN: 1234.
|
||||
9. MAC Address: The network interface MAC address.
|
||||
10. Port number: The port number on the device.
|
||||
11. Look for status codes ,interface ,log type, source ,User action, destination, protocol, etc.
|
||||
12. message: This is the free-form message text that varies widely across log entries.
|
||||
|
||||
You ALWAYS follow these guidelines when writing your response:
|
||||
<guidelines>
|
||||
- Make sure to map the remaining message part to \'message\' in grok pattern.
|
||||
- Make sure to add \`{packageName}.{dataStreamName}\` as a prefix to each field in the pattern. Refer to example response.
|
||||
- Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format.
|
||||
</guidelines>
|
||||
|
||||
You are required to provide the output in the following example response format:
|
||||
|
||||
<example_response>
|
||||
A: Please find the JSON object below:
|
||||
\`\`\`json
|
||||
{ex_answer}
|
||||
\`\`\`
|
||||
</example_response>`,
|
||||
],
|
||||
['ai', 'Please find the JSON object below:'],
|
||||
]);
|
||||
|
|
|
@ -25,7 +25,7 @@ export interface HandleUnstructuredNodeParams extends UnstructuredNodeParams {
|
|||
}
|
||||
|
||||
export interface GrokResult {
|
||||
grok_patterns: string[];
|
||||
grok_pattern: string;
|
||||
message: string;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,31 @@ describe('Testing unstructured log handling node', () => {
|
|||
} as unknown as IScopedClusterClient;
|
||||
it('handleUnstructured()', async () => {
|
||||
const response = await handleUnstructured({ state, model, client });
|
||||
expect(response.grokPatterns).toStrictEqual(unstructuredLogResponse.grok_patterns);
|
||||
expect(response.currentPattern).toStrictEqual(unstructuredLogResponse.grok_pattern);
|
||||
expect(response.isFirst).toBe(false);
|
||||
expect(response.lastExecutedChain).toBe('handleUnstructured');
|
||||
});
|
||||
|
||||
it('should use unParsedSamples when isFirst is false', async () => {
|
||||
const modifiedState = {
|
||||
...state,
|
||||
isFirst: false,
|
||||
unParsedSamples: ['unparsed log 1', 'unparsed log 2'],
|
||||
};
|
||||
const response = await handleUnstructured({ state: modifiedState, model, client });
|
||||
expect(response.currentPattern).toBeDefined();
|
||||
expect(response.isFirst).toBe(false);
|
||||
expect(response.lastExecutedChain).toBe('handleUnstructured');
|
||||
});
|
||||
|
||||
it('should limit samples to maximum of 5', async () => {
|
||||
const modifiedState = {
|
||||
...state,
|
||||
logSamples: ['log1', 'log2', 'log3', 'log4', 'log5', 'log6', 'log7'],
|
||||
};
|
||||
const response = await handleUnstructured({ state: modifiedState, model, client });
|
||||
expect(response.currentPattern).toBeDefined();
|
||||
expect(response.isFirst).toBe(false);
|
||||
expect(response.lastExecutedChain).toBe('handleUnstructured');
|
||||
});
|
||||
});
|
||||
|
|
|
@ -17,18 +17,20 @@ export async function handleUnstructured({
|
|||
}: HandleUnstructuredNodeParams): Promise<Partial<UnstructuredLogState>> {
|
||||
const grokMainGraph = GROK_MAIN_PROMPT.pipe(model).pipe(new JsonOutputParser());
|
||||
|
||||
// Pick logSamples if there was no header detected.
|
||||
const samples = state.logSamples;
|
||||
const samples = state.isFirst ? state.logSamples : state.unParsedSamples;
|
||||
|
||||
const limitedSamples = samples.slice(0, 5);
|
||||
const pattern = (await grokMainGraph.invoke({
|
||||
packageName: state.packageName,
|
||||
dataStreamName: state.dataStreamName,
|
||||
samples: samples[0],
|
||||
samples: limitedSamples,
|
||||
errors: state.errors,
|
||||
ex_answer: JSON.stringify(GROK_EXAMPLE_ANSWER, null, 2),
|
||||
})) as GrokResult;
|
||||
|
||||
return {
|
||||
grokPatterns: pattern.grok_patterns,
|
||||
isFirst: false,
|
||||
currentPattern: pattern.grok_pattern,
|
||||
lastExecutedChain: 'handleUnstructured',
|
||||
};
|
||||
}
|
||||
|
|
|
@ -25,23 +25,22 @@ const model = new FakeLLM({
|
|||
const state: UnstructuredLogState = unstructuredLogState;
|
||||
|
||||
describe('Testing unstructured validation without errors', () => {
|
||||
const client = {
|
||||
asCurrentUser: {
|
||||
ingest: {
|
||||
simulate: jest.fn().mockReturnValue({
|
||||
docs: [
|
||||
{
|
||||
doc: {
|
||||
_source: { testPackage: { testDatastream: { message: 'dummy data' } } },
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
},
|
||||
},
|
||||
} as unknown as IScopedClusterClient;
|
||||
|
||||
it('handleUnstructuredValidate() without errors', async () => {
|
||||
const client = {
|
||||
asCurrentUser: {
|
||||
ingest: {
|
||||
simulate: jest.fn().mockReturnValue({
|
||||
docs: [
|
||||
{
|
||||
doc: {
|
||||
_source: { testPackage: { testDatastream: { message: 'dummy data' } } },
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
},
|
||||
},
|
||||
} as unknown as IScopedClusterClient;
|
||||
const response = await handleUnstructuredValidate({ state, model, client });
|
||||
expect(response.jsonSamples).toStrictEqual(unstructuredLogState.jsonSamples);
|
||||
expect(response.additionalProcessors).toStrictEqual([
|
||||
|
@ -53,7 +52,7 @@ describe('Testing unstructured validation without errors', () => {
|
|||
},
|
||||
},
|
||||
]);
|
||||
expect(response.errors).toStrictEqual([]);
|
||||
expect(response.errors).toStrictEqual(undefined);
|
||||
expect(response.lastExecutedChain).toBe('unstructuredValidate');
|
||||
});
|
||||
});
|
||||
|
@ -75,3 +74,138 @@ describe('Testing unstructured validation errors', () => {
|
|||
expect(response.lastExecutedChain).toBe('unstructuredValidate');
|
||||
});
|
||||
});
|
||||
describe('Testing unstructured validation with mixed results', () => {
|
||||
it('handleUnstructuredValidate() with some valid and invalid samples', async () => {
|
||||
const client = {
|
||||
asCurrentUser: {
|
||||
ingest: {
|
||||
simulate: jest
|
||||
.fn()
|
||||
.mockReturnValueOnce({
|
||||
docs: [
|
||||
{
|
||||
doc: {
|
||||
_source: { testPackage: { testDatastream: { message: 'valid sample' } } },
|
||||
},
|
||||
},
|
||||
],
|
||||
})
|
||||
.mockReturnValueOnce({
|
||||
docs: [
|
||||
{
|
||||
doc: {
|
||||
_source: { error: 'parsing error' },
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
},
|
||||
},
|
||||
} as unknown as IScopedClusterClient;
|
||||
const testState = {
|
||||
...state,
|
||||
logSamples: ['valid log', 'invalid log'],
|
||||
currentPattern: '%{GREEDYDATA:message}',
|
||||
packageName: 'testPackage',
|
||||
dataStreamName: 'testDatastream',
|
||||
};
|
||||
|
||||
const response = await handleUnstructuredValidate({ state: testState, model, client });
|
||||
|
||||
expect(response.unParsedSamples).toEqual(['invalid log']);
|
||||
expect(response.errors).toEqual(['parsing error']);
|
||||
expect(response.lastExecutedChain).toBe('unstructuredValidate');
|
||||
expect(client.asCurrentUser.ingest.simulate).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
describe('Testing unstructured validation with empty samples', () => {
|
||||
const client = {
|
||||
asCurrentUser: {
|
||||
ingest: {
|
||||
simulate: jest.fn(),
|
||||
},
|
||||
},
|
||||
} as unknown as IScopedClusterClient;
|
||||
|
||||
it('handleUnstructuredValidate() with empty log samples', async () => {
|
||||
const testState = {
|
||||
...state,
|
||||
logSamples: [],
|
||||
currentPattern: '',
|
||||
grokPatterns: [],
|
||||
};
|
||||
|
||||
const response = await handleUnstructuredValidate({ state: testState, model, client });
|
||||
expect(response.jsonSamples).toStrictEqual([]);
|
||||
expect(response.unParsedSamples).toStrictEqual([]);
|
||||
expect(response.lastExecutedChain).toBe('unstructuredValidate');
|
||||
expect(client.asCurrentUser.ingest.simulate).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('Testing unstructured validation with multiple patterns', () => {
|
||||
const client = {
|
||||
asCurrentUser: {
|
||||
ingest: {
|
||||
simulate: jest.fn().mockReturnValue({
|
||||
docs: [
|
||||
{
|
||||
doc: {
|
||||
_source: {
|
||||
testPackage: { testDatastream: { field1: 'value1', field2: 'value2' } },
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
},
|
||||
},
|
||||
} as unknown as IScopedClusterClient;
|
||||
|
||||
it('handleUnstructuredValidate() with multiple grok patterns', async () => {
|
||||
const testState = {
|
||||
...state,
|
||||
grokPatterns: ['pattern1', 'pattern2'],
|
||||
currentPattern: 'pattern3',
|
||||
logSamples: ['test log'],
|
||||
packageName: 'testPackage',
|
||||
dataStreamName: 'testDatastream',
|
||||
};
|
||||
|
||||
const response = await handleUnstructuredValidate({ state: testState, model, client });
|
||||
expect(response.jsonSamples).toHaveLength(1);
|
||||
expect(response.lastExecutedChain).toBe('unstructuredValidate');
|
||||
});
|
||||
});
|
||||
|
||||
describe('Testing unstructured validation with invalid JSON samples', () => {
|
||||
const client = {
|
||||
asCurrentUser: {
|
||||
ingest: {
|
||||
simulate: jest.fn().mockReturnValue({
|
||||
docs: [
|
||||
{
|
||||
doc: {
|
||||
_source: { testPackage: { testDatastream: undefined } },
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
},
|
||||
},
|
||||
} as unknown as IScopedClusterClient;
|
||||
|
||||
it('handleUnstructuredValidate() should handle invalid JSON samples', async () => {
|
||||
const testState = {
|
||||
...state,
|
||||
logSamples: ['invalid json log'],
|
||||
currentPattern: '%{GREEDYDATA:message}',
|
||||
packageName: 'testPackage',
|
||||
dataStreamName: 'testDatastream',
|
||||
};
|
||||
|
||||
const response = await handleUnstructuredValidate({ state: testState, model, client });
|
||||
expect(response.jsonSamples).toEqual([undefined]);
|
||||
expect(response.lastExecutedChain).toBe('unstructuredValidate');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -10,26 +10,53 @@ import type { HandleUnstructuredNodeParams, LogResult } from './types';
|
|||
import { testPipeline } from '../../util';
|
||||
import { createGrokProcessor, createPassthroughFailureProcessor } from '../../util/processors';
|
||||
|
||||
export interface UnstructuredLogParse {
|
||||
grokPatterns: string[];
|
||||
unParsedSamples: string[];
|
||||
}
|
||||
|
||||
export async function handleUnstructuredValidate({
|
||||
state,
|
||||
client,
|
||||
}: HandleUnstructuredNodeParams): Promise<Partial<UnstructuredLogState>> {
|
||||
const currentPattern = state.currentPattern;
|
||||
const grokPatterns = state.grokPatterns;
|
||||
const grokProcessor = createGrokProcessor(grokPatterns);
|
||||
const grokProcessor = createGrokProcessor([...grokPatterns, currentPattern]);
|
||||
const pipeline = { processors: grokProcessor, on_failure: [createPassthroughFailureProcessor()] };
|
||||
|
||||
const packageName = state.packageName;
|
||||
const dataStreamName = state.dataStreamName;
|
||||
const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as {
|
||||
pipelineResults: LogResult[];
|
||||
errors: object[];
|
||||
};
|
||||
const validSamples: LogResult[] = [];
|
||||
const unParsedSamples: string[] = [];
|
||||
const errors: object[] = [];
|
||||
|
||||
if (errors.length > 0) {
|
||||
return { errors, lastExecutedChain: 'unstructuredValidate' };
|
||||
for (const sample of state.logSamples) {
|
||||
const result = (await testPipeline([sample], pipeline, client)) as {
|
||||
pipelineResults: LogResult[];
|
||||
errors: object[];
|
||||
};
|
||||
|
||||
if (result.errors.length > 0) {
|
||||
unParsedSamples.push(sample);
|
||||
errors.push(result.errors[0]);
|
||||
} else {
|
||||
validSamples.push(result.pipelineResults[0]);
|
||||
}
|
||||
}
|
||||
|
||||
const jsonSamples = pipelineResults
|
||||
if (validSamples.length > 0) {
|
||||
grokPatterns.push(currentPattern);
|
||||
}
|
||||
|
||||
if (unParsedSamples.length > 0) {
|
||||
return {
|
||||
unParsedSamples,
|
||||
errors,
|
||||
lastExecutedChain: 'unstructuredValidate',
|
||||
};
|
||||
}
|
||||
|
||||
const jsonSamples = validSamples
|
||||
.map((log) => log[packageName])
|
||||
.map((log) => log[dataStreamName])
|
||||
.map((log) => JSON.stringify(log));
|
||||
|
@ -39,7 +66,7 @@ export async function handleUnstructuredValidate({
|
|||
return {
|
||||
jsonSamples,
|
||||
additionalProcessors,
|
||||
errors: [],
|
||||
unParsedSamples: [],
|
||||
lastExecutedChain: 'unstructuredValidate',
|
||||
};
|
||||
}
|
||||
|
|
|
@ -163,12 +163,15 @@ export interface UnstructuredLogState {
|
|||
packageName: string;
|
||||
dataStreamName: string;
|
||||
grokPatterns: string[];
|
||||
currentPattern: string;
|
||||
logSamples: string[];
|
||||
jsonSamples: string[];
|
||||
finalized: boolean;
|
||||
errors: object;
|
||||
errors: object[];
|
||||
unParsedSamples: string[];
|
||||
additionalProcessors: object[];
|
||||
ecsVersion: string;
|
||||
isFirst: boolean;
|
||||
}
|
||||
|
||||
export interface RelatedState {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue