mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
# Backport This will backport the following commits from `main` to `8.x`: - [[Auto Import] Use larger number of samples on the backend (#196233)](https://github.com/elastic/kibana/pull/196233) <!--- Backport version: 9.4.3 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) <!--BACKPORT [{"author":{"name":"Ilya Nikokoshev","email":"ilya.nikokoshev@elastic.co"},"sourceCommit":{"committedDate":"2024-10-15T16:22:05Z","message":"[Auto Import] Use larger number of samples on the backend (#196233)\n\n## Release Notes\r\n\r\nAutomatic Import now analyses larger number of samples to generate an\r\nintegration.\r\n\r\n## Summary\r\n\r\nCloses https://github.com/elastic/security-team/issues/9844\r\n\r\n**Added: Backend Sampling**\r\n\r\nWe pass 100 rows (these numeric values are adjustable) to the backend\r\n[^1]\r\n\r\n[^1]: As before, deterministically selected on the frontend, see\r\nhttps://github.com/elastic/kibana/pull/191598\r\n\r\n\r\nThe Categorization chain now processes the samples in batches,\r\nperforming after initial categorization a number of review cycles (but\r\nnot more than 5, tuned so that we stay under the 2 minute limit for a\r\nsingle API call).\r\n\r\nTo decide when to stop processing we keep the list of _stable_ samples\r\nas follows:\r\n\r\n1. The list is initially empty.\r\n2. For each review we select a random subset of 40 samples, preferring\r\nto pick up the not-stable samples.\r\n3. After each review – when the LLM potentially gives us new or changes\r\nthe old processors – we compare the new pipeline results with the old\r\npipeline results.\r\n4. Those reviewed samples that did not change their categorization are\r\nadded to the stable list.\r\n5. Any samples that have changed their categorization are removed from\r\nthe stable list.\r\n6. If all samples are stable, we finish processing.\r\n\r\n**Removed: User Notification**\r\n\r\nUsing 100 samples provides a balance between expected complexity and\r\ntime budget we work with. We might want to change it in the future,\r\npossibly dynamically, making the specific number of no importance to the\r\nuser. Thus we remove the truncation notification.\r\n\r\n**Unchanged:**\r\n\r\n- No batching is made in the related chain: it seems to work as-is.\r\n\r\n**Refactored:**\r\n\r\n- We centralize the sizing constants in the\r\n`x-pack/plugins/integration_assistant/common/constants.ts` file.\r\n- We remove the unused state key `formattedSamples` and combine\r\n`modelJSONInput` back into `modelInput`.\r\n\r\n> [!NOTE] \r\n> I had difficulty generating new graph diagrams, so they remain\r\nunchanged.","sha":"fc3ce5475a73aad1abdbf857bc8787cd0f10aaed","branchLabelMapping":{"^v9.0.0$":"main","^v8.16.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:enhancement","enhancement","v9.0.0","backport:prev-minor","8.16 candidate","Team:Security-Scalability","Feature:AutomaticImport"],"title":"[Auto Import] Use larger number of samples on the backend","number":196233,"url":"https://github.com/elastic/kibana/pull/196233","mergeCommit":{"message":"[Auto Import] Use larger number of samples on the backend (#196233)\n\n## Release Notes\r\n\r\nAutomatic Import now analyses larger number of samples to generate an\r\nintegration.\r\n\r\n## Summary\r\n\r\nCloses https://github.com/elastic/security-team/issues/9844\r\n\r\n**Added: Backend Sampling**\r\n\r\nWe pass 100 rows (these numeric values are adjustable) to the backend\r\n[^1]\r\n\r\n[^1]: As before, deterministically selected on the frontend, see\r\nhttps://github.com/elastic/kibana/pull/191598\r\n\r\n\r\nThe Categorization chain now processes the samples in batches,\r\nperforming after initial categorization a number of review cycles (but\r\nnot more than 5, tuned so that we stay under the 2 minute limit for a\r\nsingle API call).\r\n\r\nTo decide when to stop processing we keep the list of _stable_ samples\r\nas follows:\r\n\r\n1. The list is initially empty.\r\n2. For each review we select a random subset of 40 samples, preferring\r\nto pick up the not-stable samples.\r\n3. After each review – when the LLM potentially gives us new or changes\r\nthe old processors – we compare the new pipeline results with the old\r\npipeline results.\r\n4. Those reviewed samples that did not change their categorization are\r\nadded to the stable list.\r\n5. Any samples that have changed their categorization are removed from\r\nthe stable list.\r\n6. If all samples are stable, we finish processing.\r\n\r\n**Removed: User Notification**\r\n\r\nUsing 100 samples provides a balance between expected complexity and\r\ntime budget we work with. We might want to change it in the future,\r\npossibly dynamically, making the specific number of no importance to the\r\nuser. Thus we remove the truncation notification.\r\n\r\n**Unchanged:**\r\n\r\n- No batching is made in the related chain: it seems to work as-is.\r\n\r\n**Refactored:**\r\n\r\n- We centralize the sizing constants in the\r\n`x-pack/plugins/integration_assistant/common/constants.ts` file.\r\n- We remove the unused state key `formattedSamples` and combine\r\n`modelJSONInput` back into `modelInput`.\r\n\r\n> [!NOTE] \r\n> I had difficulty generating new graph diagrams, so they remain\r\nunchanged.","sha":"fc3ce5475a73aad1abdbf857bc8787cd0f10aaed"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/196233","number":196233,"mergeCommit":{"message":"[Auto Import] Use larger number of samples on the backend (#196233)\n\n## Release Notes\r\n\r\nAutomatic Import now analyses larger number of samples to generate an\r\nintegration.\r\n\r\n## Summary\r\n\r\nCloses https://github.com/elastic/security-team/issues/9844\r\n\r\n**Added: Backend Sampling**\r\n\r\nWe pass 100 rows (these numeric values are adjustable) to the backend\r\n[^1]\r\n\r\n[^1]: As before, deterministically selected on the frontend, see\r\nhttps://github.com/elastic/kibana/pull/191598\r\n\r\n\r\nThe Categorization chain now processes the samples in batches,\r\nperforming after initial categorization a number of review cycles (but\r\nnot more than 5, tuned so that we stay under the 2 minute limit for a\r\nsingle API call).\r\n\r\nTo decide when to stop processing we keep the list of _stable_ samples\r\nas follows:\r\n\r\n1. The list is initially empty.\r\n2. For each review we select a random subset of 40 samples, preferring\r\nto pick up the not-stable samples.\r\n3. After each review – when the LLM potentially gives us new or changes\r\nthe old processors – we compare the new pipeline results with the old\r\npipeline results.\r\n4. Those reviewed samples that did not change their categorization are\r\nadded to the stable list.\r\n5. Any samples that have changed their categorization are removed from\r\nthe stable list.\r\n6. If all samples are stable, we finish processing.\r\n\r\n**Removed: User Notification**\r\n\r\nUsing 100 samples provides a balance between expected complexity and\r\ntime budget we work with. We might want to change it in the future,\r\npossibly dynamically, making the specific number of no importance to the\r\nuser. Thus we remove the truncation notification.\r\n\r\n**Unchanged:**\r\n\r\n- No batching is made in the related chain: it seems to work as-is.\r\n\r\n**Refactored:**\r\n\r\n- We centralize the sizing constants in the\r\n`x-pack/plugins/integration_assistant/common/constants.ts` file.\r\n- We remove the unused state key `formattedSamples` and combine\r\n`modelJSONInput` back into `modelInput`.\r\n\r\n> [!NOTE] \r\n> I had difficulty generating new graph diagrams, so they remain\r\nunchanged.","sha":"fc3ce5475a73aad1abdbf857bc8787cd0f10aaed"}}]}] BACKPORT--> Co-authored-by: Ilya Nikokoshev <ilya.nikokoshev@elastic.co>
This commit is contained in:
parent
51b835992e
commit
a4938bc3ca
31 changed files with 534 additions and 190 deletions
|
@ -162,7 +162,6 @@ export const testPipelineInvalidEcs: { pipelineResults: object[]; errors: object
|
|||
export const categorizationTestState = {
|
||||
rawSamples: ['{"test1": "test1"}'],
|
||||
samples: ['{ "test1": "test1" }'],
|
||||
formattedSamples: '{"test1": "test1"}',
|
||||
ecsTypes: 'testtypes',
|
||||
ecsCategories: 'testcategories',
|
||||
exAnswer: 'testanswer',
|
||||
|
@ -173,9 +172,8 @@ export const categorizationTestState = {
|
|||
previousError: 'testprevious',
|
||||
previousInvalidCategorization: 'testinvalid',
|
||||
pipelineResults: [{ test: 'testresult' }],
|
||||
finalized: false,
|
||||
hasTriedOnce: false,
|
||||
reviewed: false,
|
||||
previousPipelineResults: [{ test: 'testresult' }],
|
||||
lastReviewedSamples: [],
|
||||
currentPipeline: { test: 'testpipeline' },
|
||||
currentProcessors: [
|
||||
{
|
||||
|
@ -193,6 +191,9 @@ export const categorizationTestState = {
|
|||
initialPipeline: categorizationInitialPipeline,
|
||||
results: { test: 'testresults' },
|
||||
samplesFormat: { name: SamplesFormatName.Values.json },
|
||||
stableSamples: [],
|
||||
reviewCount: 0,
|
||||
finalized: false,
|
||||
};
|
||||
|
||||
export const categorizationMockProcessors = [
|
||||
|
|
|
@ -140,7 +140,6 @@ export const testPipelineValidResult: { pipelineResults: object[]; errors: objec
|
|||
export const relatedTestState = {
|
||||
rawSamples: ['{"test1": "test1"}'],
|
||||
samples: ['{ "test1": "test1" }'],
|
||||
formattedSamples: '{"test1": "test1"}',
|
||||
ecs: 'testtypes',
|
||||
exAnswer: 'testanswer',
|
||||
packageName: 'testpackage',
|
||||
|
|
|
@ -36,3 +36,11 @@ export enum GenerationErrorCode {
|
|||
UNSUPPORTED_LOG_SAMPLES_FORMAT = 'unsupported-log-samples-format',
|
||||
UNPARSEABLE_CSV_DATA = 'unparseable-csv-data',
|
||||
}
|
||||
|
||||
// Size limits
|
||||
export const FRONTEND_SAMPLE_ROWS = 100;
|
||||
export const LOG_FORMAT_DETECTION_SAMPLE_ROWS = 5;
|
||||
export const CATEGORIZATION_INITIAL_BATCH_SIZE = 60;
|
||||
export const CATEROGIZATION_REVIEW_BATCH_SIZE = 40;
|
||||
export const CATEGORIZATION_REVIEW_MAX_CYCLES = 5;
|
||||
export const CATEGORIZATION_RECURSION_LIMIT = 50;
|
||||
|
|
|
@ -21,6 +21,8 @@ export {
|
|||
} from './api/analyze_logs/analyze_logs_route.gen';
|
||||
export { CelInputRequestBody, CelInputResponse } from './api/cel/cel_input_route.gen';
|
||||
|
||||
export { partialShuffleArray } from './utils';
|
||||
|
||||
export type {
|
||||
DataStream,
|
||||
InputType,
|
||||
|
|
|
@ -11,7 +11,6 @@ import { TestProvider } from '../../../../../mocks/test_provider';
|
|||
import { parseNDJSON, parseJSONArray, SampleLogsInput } from './sample_logs_input';
|
||||
import { ActionsProvider } from '../../state';
|
||||
import { mockActions } from '../../mocks/state';
|
||||
import { mockServices } from '../../../../../services/mocks/services';
|
||||
|
||||
const wrapper: React.FC<React.PropsWithChildren<{}>> = ({ children }) => (
|
||||
<TestProvider>
|
||||
|
@ -165,25 +164,6 @@ describe('SampleLogsInput', () => {
|
|||
samplesFormat: { name: 'json', json_path: [] },
|
||||
});
|
||||
});
|
||||
|
||||
describe('when the file has too many rows', () => {
|
||||
const tooLargeLogsSample = Array(6).fill(logsSampleRaw).join(','); // 12 entries
|
||||
beforeEach(async () => {
|
||||
await changeFile(input, new File([`[${tooLargeLogsSample}]`], 'test.json', { type }));
|
||||
});
|
||||
|
||||
it('should truncate the logs sample', () => {
|
||||
expect(mockActions.setIntegrationSettings).toBeCalledWith({
|
||||
logSamples: tooLargeLogsSample.split(',').slice(0, 2),
|
||||
samplesFormat: { name: 'json', json_path: [] },
|
||||
});
|
||||
});
|
||||
it('should add a notification toast', () => {
|
||||
expect(mockServices.notifications.toasts.addInfo).toBeCalledWith(
|
||||
`The logs sample has been truncated to 10 rows.`
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('when the file is a json array under a key', () => {
|
||||
|
@ -236,25 +216,6 @@ describe('SampleLogsInput', () => {
|
|||
samplesFormat: { name: 'ndjson', multiline: false },
|
||||
});
|
||||
});
|
||||
|
||||
describe('when the file has too many rows', () => {
|
||||
const tooLargeLogsSample = Array(6).fill(simpleNDJSON).join('\n'); // 12 entries
|
||||
beforeEach(async () => {
|
||||
await changeFile(input, new File([tooLargeLogsSample], 'test.json', { type }));
|
||||
});
|
||||
|
||||
it('should truncate the logs sample', () => {
|
||||
expect(mockActions.setIntegrationSettings).toBeCalledWith({
|
||||
logSamples: tooLargeLogsSample.split('\n').slice(0, 2),
|
||||
samplesFormat: { name: 'ndjson', multiline: false },
|
||||
});
|
||||
});
|
||||
it('should add a notification toast', () => {
|
||||
expect(mockServices.notifications.toasts.addInfo).toBeCalledWith(
|
||||
`The logs sample has been truncated to 10 rows.`
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('when the file is a an ndjson with a single record', () => {
|
||||
|
|
|
@ -8,14 +8,12 @@
|
|||
import React, { useCallback, useState } from 'react';
|
||||
import { EuiCallOut, EuiFilePicker, EuiFormRow, EuiSpacer, EuiText } from '@elastic/eui';
|
||||
import { isPlainObject } from 'lodash/fp';
|
||||
import { useKibana } from '@kbn/kibana-react-plugin/public';
|
||||
import type { IntegrationSettings } from '../../types';
|
||||
import * as i18n from './translations';
|
||||
import { useActions } from '../../state';
|
||||
import type { SamplesFormat } from '../../../../../../common';
|
||||
import { partialShuffleArray } from './utils';
|
||||
|
||||
const MaxLogsSampleRows = 10;
|
||||
import { partialShuffleArray } from '../../../../../../common';
|
||||
import { FRONTEND_SAMPLE_ROWS } from '../../../../../../common/constants';
|
||||
|
||||
/**
|
||||
* Parse the logs sample file content as newiline-delimited JSON (NDJSON).
|
||||
|
@ -83,8 +81,8 @@ export const parseJSONArray = (
|
|||
* @returns Whether the array was truncated.
|
||||
*/
|
||||
function trimShuffleLogsSample<T>(array: T[]): boolean {
|
||||
const willTruncate = array.length > MaxLogsSampleRows;
|
||||
const numElements = willTruncate ? MaxLogsSampleRows : array.length;
|
||||
const willTruncate = array.length > FRONTEND_SAMPLE_ROWS;
|
||||
const numElements = willTruncate ? FRONTEND_SAMPLE_ROWS : array.length;
|
||||
|
||||
partialShuffleArray(array, 1, numElements);
|
||||
|
||||
|
@ -215,7 +213,6 @@ interface SampleLogsInputProps {
|
|||
}
|
||||
|
||||
export const SampleLogsInput = React.memo<SampleLogsInputProps>(({ integrationSettings }) => {
|
||||
const { notifications } = useKibana().services;
|
||||
const { setIntegrationSettings } = useActions();
|
||||
const [isParsing, setIsParsing] = useState(false);
|
||||
const [sampleFileError, setSampleFileError] = useState<string>();
|
||||
|
@ -266,11 +263,7 @@ export const SampleLogsInput = React.memo<SampleLogsInputProps>(({ integrationSe
|
|||
return;
|
||||
}
|
||||
|
||||
const { samplesFormat, logSamples, isTruncated } = prepareResult;
|
||||
|
||||
if (isTruncated) {
|
||||
notifications?.toasts.addInfo(i18n.LOGS_SAMPLE_TRUNCATED(MaxLogsSampleRows));
|
||||
}
|
||||
const { samplesFormat, logSamples } = prepareResult;
|
||||
|
||||
setIntegrationSettings({
|
||||
...integrationSettings,
|
||||
|
@ -293,7 +286,7 @@ export const SampleLogsInput = React.memo<SampleLogsInputProps>(({ integrationSe
|
|||
|
||||
reader.readAsText(logsSampleFile);
|
||||
},
|
||||
[integrationSettings, setIntegrationSettings, notifications?.toasts, setIsParsing]
|
||||
[integrationSettings, setIntegrationSettings, setIsParsing]
|
||||
);
|
||||
return (
|
||||
<EuiFormRow
|
||||
|
|
|
@ -110,11 +110,6 @@ export const LOGS_SAMPLE_DESCRIPTION = i18n.translate(
|
|||
defaultMessage: 'Drag and drop a file or Browse files.',
|
||||
}
|
||||
);
|
||||
export const LOGS_SAMPLE_TRUNCATED = (maxRows: number) =>
|
||||
i18n.translate('xpack.integrationAssistant.step.dataStream.logsSample.truncatedWarning', {
|
||||
values: { maxRows },
|
||||
defaultMessage: `The logs sample has been truncated to {maxRows} rows.`,
|
||||
});
|
||||
export const LOGS_SAMPLE_ERROR = {
|
||||
CAN_NOT_READ: i18n.translate(
|
||||
'xpack.integrationAssistant.step.dataStream.logsSample.errorCanNotRead',
|
||||
|
|
|
@ -11,6 +11,8 @@ import { combineProcessors } from '../../util/processors';
|
|||
import { CATEGORIZATION_EXAMPLE_PROCESSORS } from './constants';
|
||||
import { CATEGORIZATION_MAIN_PROMPT } from './prompts';
|
||||
import type { CategorizationNodeParams } from './types';
|
||||
import { selectResults } from './util';
|
||||
import { CATEGORIZATION_INITIAL_BATCH_SIZE } from '../../../common/constants';
|
||||
|
||||
export async function handleCategorization({
|
||||
state,
|
||||
|
@ -19,8 +21,15 @@ export async function handleCategorization({
|
|||
const categorizationMainPrompt = CATEGORIZATION_MAIN_PROMPT;
|
||||
const outputParser = new JsonOutputParser();
|
||||
const categorizationMainGraph = categorizationMainPrompt.pipe(model).pipe(outputParser);
|
||||
|
||||
const [pipelineResults, _] = selectResults(
|
||||
state.pipelineResults,
|
||||
CATEGORIZATION_INITIAL_BATCH_SIZE,
|
||||
new Set(state.stableSamples)
|
||||
);
|
||||
|
||||
const currentProcessors = (await categorizationMainGraph.invoke({
|
||||
pipeline_results: JSON.stringify(state.pipelineResults, null, 2),
|
||||
pipeline_results: JSON.stringify(pipelineResults, null, 2),
|
||||
example_processors: CATEGORIZATION_EXAMPLE_PROCESSORS,
|
||||
ex_answer: state?.exAnswer,
|
||||
ecs_categories: state?.ecsCategories,
|
||||
|
@ -36,7 +45,7 @@ export async function handleCategorization({
|
|||
return {
|
||||
currentPipeline,
|
||||
currentProcessors,
|
||||
hasTriedOnce: true,
|
||||
lastReviewedSamples: [],
|
||||
lastExecutedChain: 'categorization',
|
||||
};
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
export const ECS_CATEGORIES = {
|
||||
api: 'Covers events from API calls, including those from OS and network protocols. Allowed event.type combinations: access, admin, allowed, change, creation, deletion, denied, end, info, start, user',
|
||||
authentication:
|
||||
|
|
|
@ -39,7 +39,6 @@ export async function handleErrors({
|
|||
return {
|
||||
currentPipeline,
|
||||
currentProcessors,
|
||||
reviewed: false,
|
||||
lastExecutedChain: 'error',
|
||||
};
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import { handleReview } from './review';
|
|||
import { handleCategorization } from './categorization';
|
||||
import { handleErrors } from './errors';
|
||||
import { handleInvalidCategorization } from './invalid';
|
||||
import { handleUpdateStableSamples } from './stable';
|
||||
import { testPipeline, combineProcessors } from '../../util';
|
||||
import {
|
||||
ActionsClientChatOpenAI,
|
||||
|
@ -39,6 +40,7 @@ jest.mock('./errors');
|
|||
jest.mock('./review');
|
||||
jest.mock('./categorization');
|
||||
jest.mock('./invalid');
|
||||
jest.mock('./stable');
|
||||
|
||||
jest.mock('../../util/pipeline', () => ({
|
||||
testPipeline: jest.fn(),
|
||||
|
@ -74,7 +76,8 @@ describe('runCategorizationGraph', () => {
|
|||
return {
|
||||
currentPipeline,
|
||||
currentProcessors,
|
||||
reviewed: false,
|
||||
stableSamples: [],
|
||||
reviewCount: 0,
|
||||
finalized: false,
|
||||
lastExecutedChain: 'categorization',
|
||||
};
|
||||
|
@ -90,7 +93,8 @@ describe('runCategorizationGraph', () => {
|
|||
return {
|
||||
currentPipeline,
|
||||
currentProcessors,
|
||||
reviewed: false,
|
||||
stableSamples: [],
|
||||
reviewCount: 0,
|
||||
finalized: false,
|
||||
lastExecutedChain: 'error',
|
||||
};
|
||||
|
@ -106,7 +110,8 @@ describe('runCategorizationGraph', () => {
|
|||
return {
|
||||
currentPipeline,
|
||||
currentProcessors,
|
||||
reviewed: false,
|
||||
stableSamples: [],
|
||||
reviewCount: 0,
|
||||
finalized: false,
|
||||
lastExecutedChain: 'invalidCategorization',
|
||||
};
|
||||
|
@ -122,11 +127,29 @@ describe('runCategorizationGraph', () => {
|
|||
return {
|
||||
currentProcessors,
|
||||
currentPipeline,
|
||||
reviewed: true,
|
||||
stableSamples: [],
|
||||
reviewCount: 0,
|
||||
finalized: false,
|
||||
lastExecutedChain: 'review',
|
||||
};
|
||||
});
|
||||
// After the review it should route to modelOutput and finish.
|
||||
(handleUpdateStableSamples as jest.Mock)
|
||||
.mockResolvedValueOnce({
|
||||
stableSamples: [],
|
||||
finalized: false,
|
||||
lastExecutedChain: 'handleUpdateStableSamples',
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
stableSamples: [],
|
||||
finalized: false,
|
||||
lastExecutedChain: 'handleUpdateStableSamples',
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
stableSamples: [0],
|
||||
finalized: false,
|
||||
lastExecutedChain: 'handleUpdateStableSamples',
|
||||
});
|
||||
});
|
||||
|
||||
it('Ensures that the graph compiles', async () => {
|
||||
|
|
|
@ -10,7 +10,7 @@ import { StateGraph, END, START } from '@langchain/langgraph';
|
|||
import { SamplesFormat } from '../../../common';
|
||||
import type { CategorizationState } from '../../types';
|
||||
import { handleValidatePipeline } from '../../util/graph';
|
||||
import { formatSamples, prefixSamples } from '../../util/samples';
|
||||
import { prefixSamples } from '../../util/samples';
|
||||
import { handleCategorization } from './categorization';
|
||||
import { CATEGORIZATION_EXAMPLE_ANSWER, ECS_CATEGORIES, ECS_TYPES } from './constants';
|
||||
import { handleErrors } from './errors';
|
||||
|
@ -18,6 +18,8 @@ import { handleInvalidCategorization } from './invalid';
|
|||
import { handleReview } from './review';
|
||||
import type { CategorizationBaseNodeParams, CategorizationGraphParams } from './types';
|
||||
import { handleCategorizationValidation } from './validate';
|
||||
import { handleUpdateStableSamples } from './stable';
|
||||
import { CATEGORIZATION_REVIEW_MAX_CYCLES } from '../../../common/constants';
|
||||
|
||||
const graphState: StateGraphArgs<CategorizationState>['channels'] = {
|
||||
lastExecutedChain: {
|
||||
|
@ -32,10 +34,6 @@ const graphState: StateGraphArgs<CategorizationState>['channels'] = {
|
|||
value: (x: string[], y?: string[]) => y ?? x,
|
||||
default: () => [],
|
||||
},
|
||||
formattedSamples: {
|
||||
value: (x: string, y?: string) => y ?? x,
|
||||
default: () => '',
|
||||
},
|
||||
ecsTypes: {
|
||||
value: (x: string, y?: string) => y ?? x,
|
||||
default: () => '',
|
||||
|
@ -60,13 +58,13 @@ const graphState: StateGraphArgs<CategorizationState>['channels'] = {
|
|||
value: (x: boolean, y?: boolean) => y ?? x,
|
||||
default: () => false,
|
||||
},
|
||||
reviewed: {
|
||||
value: (x: boolean, y?: boolean) => y ?? x,
|
||||
default: () => false,
|
||||
stableSamples: {
|
||||
value: (x: number[], y: number[]) => y ?? x,
|
||||
default: () => [],
|
||||
},
|
||||
hasTriedOnce: {
|
||||
value: (x: boolean, y?: boolean) => y ?? x,
|
||||
default: () => false,
|
||||
reviewCount: {
|
||||
value: (x: number, y: number) => y ?? x,
|
||||
default: () => 0,
|
||||
},
|
||||
errors: {
|
||||
value: (x: object, y?: object) => y ?? x,
|
||||
|
@ -80,6 +78,14 @@ const graphState: StateGraphArgs<CategorizationState>['channels'] = {
|
|||
value: (x: object[], y?: object[]) => y ?? x,
|
||||
default: () => [{}],
|
||||
},
|
||||
previousPipelineResults: {
|
||||
value: (x: object[], y?: object[]) => y ?? x,
|
||||
default: () => [{}],
|
||||
},
|
||||
lastReviewedSamples: {
|
||||
value: (x: number[], y: number[]) => y ?? x,
|
||||
default: () => [],
|
||||
},
|
||||
currentPipeline: {
|
||||
value: (x: object, y?: object) => y ?? x,
|
||||
default: () => ({}),
|
||||
|
@ -110,33 +116,22 @@ const graphState: StateGraphArgs<CategorizationState>['channels'] = {
|
|||
},
|
||||
};
|
||||
|
||||
function modelJSONInput({ state }: CategorizationBaseNodeParams): Partial<CategorizationState> {
|
||||
const samples = prefixSamples(state);
|
||||
const formattedSamples = formatSamples(samples);
|
||||
function modelInput({ state }: CategorizationBaseNodeParams): Partial<CategorizationState> {
|
||||
let samples: string[];
|
||||
if (state.samplesFormat.name === 'json' || state.samplesFormat.name === 'ndjson') {
|
||||
samples = prefixSamples(state);
|
||||
} else {
|
||||
samples = state.rawSamples;
|
||||
}
|
||||
|
||||
const initialPipeline = JSON.parse(JSON.stringify(state.currentPipeline));
|
||||
return {
|
||||
exAnswer: JSON.stringify(CATEGORIZATION_EXAMPLE_ANSWER, null, 2),
|
||||
ecsCategories: JSON.stringify(ECS_CATEGORIES, null, 2),
|
||||
ecsTypes: JSON.stringify(ECS_TYPES, null, 2),
|
||||
samples,
|
||||
formattedSamples,
|
||||
initialPipeline,
|
||||
finalized: false,
|
||||
reviewed: false,
|
||||
lastExecutedChain: 'modelJSONInput',
|
||||
};
|
||||
}
|
||||
|
||||
function modelInput({ state }: CategorizationBaseNodeParams): Partial<CategorizationState> {
|
||||
const initialPipeline = JSON.parse(JSON.stringify(state.currentPipeline));
|
||||
return {
|
||||
exAnswer: JSON.stringify(CATEGORIZATION_EXAMPLE_ANSWER, null, 2),
|
||||
ecsCategories: JSON.stringify(ECS_CATEGORIES, null, 2),
|
||||
ecsTypes: JSON.stringify(ECS_TYPES, null, 2),
|
||||
samples: state.rawSamples,
|
||||
initialPipeline,
|
||||
finalized: false,
|
||||
reviewed: false,
|
||||
stableSamples: [],
|
||||
lastExecutedChain: 'modelInput',
|
||||
};
|
||||
}
|
||||
|
@ -152,16 +147,9 @@ function modelOutput({ state }: CategorizationBaseNodeParams): Partial<Categoriz
|
|||
};
|
||||
}
|
||||
|
||||
function modelRouter({ state }: CategorizationBaseNodeParams): string {
|
||||
if (state.samplesFormat.name === 'json' || state.samplesFormat.name === 'ndjson') {
|
||||
return 'modelJSONInput';
|
||||
}
|
||||
return 'modelInput';
|
||||
}
|
||||
|
||||
function validationRouter({ state }: CategorizationBaseNodeParams): string {
|
||||
if (Object.keys(state.currentProcessors).length === 0) {
|
||||
if (state.hasTriedOnce || state.reviewed) {
|
||||
if (state.stableSamples.length === state.pipelineResults.length) {
|
||||
return 'modelOutput';
|
||||
}
|
||||
return 'categorization';
|
||||
|
@ -171,24 +159,27 @@ function validationRouter({ state }: CategorizationBaseNodeParams): string {
|
|||
|
||||
function chainRouter({ state }: CategorizationBaseNodeParams): string {
|
||||
if (Object.keys(state.currentProcessors).length === 0) {
|
||||
if (state.hasTriedOnce || state.reviewed) {
|
||||
if (state.stableSamples.length === state.pipelineResults.length) {
|
||||
return 'modelOutput';
|
||||
}
|
||||
}
|
||||
|
||||
if (Object.keys(state.errors).length > 0) {
|
||||
return 'errors';
|
||||
}
|
||||
|
||||
if (Object.keys(state.invalidCategorization).length > 0) {
|
||||
return 'invalidCategorization';
|
||||
}
|
||||
if (!state.reviewed) {
|
||||
|
||||
if (
|
||||
state.stableSamples.length < state.pipelineResults.length &&
|
||||
state.reviewCount < CATEGORIZATION_REVIEW_MAX_CYCLES
|
||||
) {
|
||||
return 'review';
|
||||
}
|
||||
if (!state.finalized) {
|
||||
return 'modelOutput';
|
||||
}
|
||||
|
||||
return END;
|
||||
return 'modelOutput';
|
||||
}
|
||||
|
||||
export async function getCategorizationGraph({ client, model }: CategorizationGraphParams) {
|
||||
|
@ -196,7 +187,6 @@ export async function getCategorizationGraph({ client, model }: CategorizationGr
|
|||
channels: graphState,
|
||||
})
|
||||
.addNode('modelInput', (state: CategorizationState) => modelInput({ state }))
|
||||
.addNode('modelJSONInput', (state: CategorizationState) => modelJSONInput({ state }))
|
||||
.addNode('modelOutput', (state: CategorizationState) => modelOutput({ state }))
|
||||
.addNode('handleCategorization', (state: CategorizationState) =>
|
||||
handleCategorization({ state, model })
|
||||
|
@ -204,6 +194,9 @@ export async function getCategorizationGraph({ client, model }: CategorizationGr
|
|||
.addNode('handleValidatePipeline', (state: CategorizationState) =>
|
||||
handleValidatePipeline({ state, client })
|
||||
)
|
||||
.addNode('handleUpdateStableSamples', (state: CategorizationState) =>
|
||||
handleUpdateStableSamples({ state })
|
||||
)
|
||||
.addNode('handleCategorizationValidation', (state: CategorizationState) =>
|
||||
handleCategorizationValidation({ state })
|
||||
)
|
||||
|
@ -212,19 +205,16 @@ export async function getCategorizationGraph({ client, model }: CategorizationGr
|
|||
)
|
||||
.addNode('handleErrors', (state: CategorizationState) => handleErrors({ state, model }))
|
||||
.addNode('handleReview', (state: CategorizationState) => handleReview({ state, model }))
|
||||
.addConditionalEdges(START, (state: CategorizationState) => modelRouter({ state }), {
|
||||
modelJSONInput: 'modelJSONInput',
|
||||
modelInput: 'modelInput', // For Non JSON input samples
|
||||
})
|
||||
.addEdge(START, 'modelInput')
|
||||
.addEdge('modelOutput', END)
|
||||
.addEdge('modelJSONInput', 'handleValidatePipeline')
|
||||
.addEdge('modelInput', 'handleValidatePipeline')
|
||||
.addEdge('handleCategorization', 'handleValidatePipeline')
|
||||
.addEdge('handleInvalidCategorization', 'handleValidatePipeline')
|
||||
.addEdge('handleErrors', 'handleValidatePipeline')
|
||||
.addEdge('handleReview', 'handleValidatePipeline')
|
||||
.addEdge('handleValidatePipeline', 'handleUpdateStableSamples')
|
||||
.addConditionalEdges(
|
||||
'handleValidatePipeline',
|
||||
'handleUpdateStableSamples',
|
||||
(state: CategorizationState) => validationRouter({ state }),
|
||||
{
|
||||
modelOutput: 'modelOutput',
|
||||
|
|
|
@ -39,7 +39,6 @@ export async function handleInvalidCategorization({
|
|||
return {
|
||||
currentPipeline,
|
||||
currentProcessors,
|
||||
reviewed: false,
|
||||
lastExecutedChain: 'invalidCategorization',
|
||||
};
|
||||
}
|
||||
|
|
|
@ -12,6 +12,8 @@ import type { CategorizationNodeParams } from './types';
|
|||
import type { SimplifiedProcessors, SimplifiedProcessor, CategorizationState } from '../../types';
|
||||
import { combineProcessors } from '../../util/processors';
|
||||
import { ECS_EVENT_TYPES_PER_CATEGORY } from './constants';
|
||||
import { selectResults } from './util';
|
||||
import { CATEROGIZATION_REVIEW_BATCH_SIZE } from '../../../common/constants';
|
||||
|
||||
export async function handleReview({
|
||||
state,
|
||||
|
@ -21,9 +23,15 @@ export async function handleReview({
|
|||
const outputParser = new JsonOutputParser();
|
||||
const categorizationReview = categorizationReviewPrompt.pipe(model).pipe(outputParser);
|
||||
|
||||
const [pipelineResults, selectedIndices] = selectResults(
|
||||
state.pipelineResults,
|
||||
CATEROGIZATION_REVIEW_BATCH_SIZE,
|
||||
new Set(state.stableSamples)
|
||||
);
|
||||
|
||||
const currentProcessors = (await categorizationReview.invoke({
|
||||
current_processors: JSON.stringify(state.currentProcessors, null, 2),
|
||||
pipeline_results: JSON.stringify(state.pipelineResults, null, 2),
|
||||
pipeline_results: JSON.stringify(pipelineResults, null, 2),
|
||||
previous_invalid_categorization: state.previousInvalidCategorization,
|
||||
previous_error: state.previousError,
|
||||
ex_answer: state?.exAnswer,
|
||||
|
@ -41,7 +49,8 @@ export async function handleReview({
|
|||
return {
|
||||
currentPipeline,
|
||||
currentProcessors,
|
||||
reviewed: true,
|
||||
reviewCount: state.reviewCount + 1,
|
||||
lastReviewedSamples: selectedIndices,
|
||||
lastExecutedChain: 'review',
|
||||
};
|
||||
}
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { CategorizationState } from '../../types';
|
||||
import type { CategorizationBaseNodeParams } from './types';
|
||||
import { diffCategorization } from './util';
|
||||
|
||||
/**
|
||||
* Updates the stable samples in the categorization state.
|
||||
*
|
||||
* Example: If the pipeline results are [A, B, C, D], the previous pipeline results are [A, X, C, D],
|
||||
* the previously stable samples are {0, 1} and the last reviewed samples are {1, 2}, then 1 will be removed from
|
||||
* the list of stable samples and 2 will be added to the list of stable samples. The new set will be {0, 2}.
|
||||
*
|
||||
* @param {CategorizationBaseNodeParams} params - The parameters containing the current state.
|
||||
* @returns {Partial<CategorizationState>} - The updated categorization state with new stable samples,
|
||||
* cleared last reviewed samples, and the last executed chain set to 'handleUpdateStableSamples'.
|
||||
*/
|
||||
export function handleUpdateStableSamples({
|
||||
state,
|
||||
}: CategorizationBaseNodeParams): Partial<CategorizationState> {
|
||||
if (state.previousPipelineResults.length === 0) {
|
||||
return {};
|
||||
}
|
||||
|
||||
const diff = diffCategorization(state.pipelineResults, state.previousPipelineResults);
|
||||
|
||||
const newStableSamples = Array.from(
|
||||
new Set<number>(
|
||||
[...state.stableSamples, ...state.lastReviewedSamples].filter((x) => !diff.has(x))
|
||||
)
|
||||
);
|
||||
|
||||
return {
|
||||
stableSamples: newStableSamples,
|
||||
lastReviewedSamples: [],
|
||||
lastExecutedChain: 'handleUpdateStableSamples',
|
||||
};
|
||||
}
|
|
@ -0,0 +1,270 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { selectResults, diffCategorization, stringArraysEqual } from './util';
|
||||
import { partialShuffleArray } from '../../../common';
|
||||
import type { PipelineResult } from './validate';
|
||||
|
||||
// Mock the partialShuffleArray function
|
||||
jest.mock('../../../common', () => ({
|
||||
partialShuffleArray: jest.fn(),
|
||||
}));
|
||||
|
||||
describe('selectResults', () => {
|
||||
const mockPartialShuffleArray = partialShuffleArray as jest.MockedFunction<
|
||||
typeof partialShuffleArray
|
||||
>;
|
||||
|
||||
beforeEach(() => {
|
||||
mockPartialShuffleArray.mockClear();
|
||||
});
|
||||
|
||||
it('should return the correct number of samples and their indices', () => {
|
||||
const pipelineResults = [
|
||||
{ event: { category: ['1'] } },
|
||||
{ event: { category: ['2'] } },
|
||||
{ event: { category: ['3'] } },
|
||||
] satisfies PipelineResult[];
|
||||
const maxSamples = 2;
|
||||
|
||||
mockPartialShuffleArray.mockImplementation((array, numSamples) => {
|
||||
// Mock implementation that does not actually shuffle
|
||||
return array;
|
||||
});
|
||||
|
||||
const [selectedResults, indices] = selectResults(pipelineResults, maxSamples, new Set());
|
||||
expect(selectedResults).toHaveLength(maxSamples);
|
||||
expect(indices).toHaveLength(maxSamples);
|
||||
expect(indices).toEqual([0, 1]);
|
||||
expect(selectedResults).toEqual([pipelineResults[0], pipelineResults[1]]);
|
||||
});
|
||||
|
||||
it('should return all results if maxSamples is greater than the number of pipelineResults', () => {
|
||||
const pipelineResults: PipelineResult[] = [
|
||||
{ event: { category: ['1'] } },
|
||||
{ event: { category: ['2'] } },
|
||||
];
|
||||
const maxSamples = 5;
|
||||
|
||||
mockPartialShuffleArray.mockImplementation((array, numSamples) => {
|
||||
// Mock implementation that does not actually shuffle
|
||||
return array;
|
||||
});
|
||||
|
||||
const [selectedResults, indices] = selectResults(pipelineResults, maxSamples, new Set());
|
||||
|
||||
expect(selectedResults).toHaveLength(pipelineResults.length);
|
||||
expect(indices).toHaveLength(pipelineResults.length);
|
||||
expect(indices).toEqual([0, 1]);
|
||||
expect(selectedResults).toEqual(pipelineResults);
|
||||
});
|
||||
|
||||
it('should call partialShuffleArray with correct arguments', () => {
|
||||
const pipelineResults: PipelineResult[] = [
|
||||
{ event: { category: ['1'] } },
|
||||
{ event: { category: ['2'] } },
|
||||
{ event: { category: ['3'] } },
|
||||
];
|
||||
|
||||
selectResults(pipelineResults, 2, new Set());
|
||||
|
||||
expect(mockPartialShuffleArray).toHaveBeenCalledWith([0, 1], 0, 2);
|
||||
});
|
||||
|
||||
it('should handle avoiding indices', () => {
|
||||
const pipelineResults = [
|
||||
{ event: { category: ['1'] } },
|
||||
{ event: { category: ['2'] } },
|
||||
{ event: { category: ['3'] } },
|
||||
] satisfies PipelineResult[];
|
||||
const maxSamples = 2;
|
||||
|
||||
mockPartialShuffleArray.mockImplementation((array, numSamples) => {
|
||||
// Mock implementation that does not actually shuffle
|
||||
return array;
|
||||
});
|
||||
|
||||
const [selectedResults, indices] = selectResults(pipelineResults, maxSamples, new Set());
|
||||
expect(selectedResults).toHaveLength(maxSamples);
|
||||
expect(indices).toHaveLength(maxSamples);
|
||||
expect(indices).toEqual([0, 1]);
|
||||
expect(selectedResults).toEqual([pipelineResults[0], pipelineResults[1]]);
|
||||
});
|
||||
|
||||
// Mock the partialShuffleArray function
|
||||
jest.mock('../../../common', () => ({
|
||||
partialShuffleArray: jest.fn(),
|
||||
}));
|
||||
|
||||
describe('selectResults', () => {
|
||||
beforeEach(() => {
|
||||
mockPartialShuffleArray.mockClear();
|
||||
});
|
||||
|
||||
it('should return the correct number of samples and their indices', () => {
|
||||
const pipelineResults = [
|
||||
{ event: { category: ['1'] } },
|
||||
{ event: { category: ['2'] } },
|
||||
{ event: { category: ['3'] } },
|
||||
] satisfies PipelineResult[];
|
||||
const maxSamples = 2;
|
||||
|
||||
mockPartialShuffleArray.mockImplementation((array, numSamples) => {
|
||||
// Mock implementation that does not actually shuffle
|
||||
return array;
|
||||
});
|
||||
|
||||
const [selectedResults, indices] = selectResults(pipelineResults, maxSamples, new Set());
|
||||
expect(selectedResults).toHaveLength(maxSamples);
|
||||
expect(indices).toHaveLength(maxSamples);
|
||||
expect(indices).toEqual([0, 1]);
|
||||
expect(selectedResults).toEqual([pipelineResults[0], pipelineResults[1]]);
|
||||
});
|
||||
|
||||
it('should return all results if maxSamples is greater than the number of pipelineResults', () => {
|
||||
const pipelineResults: PipelineResult[] = [
|
||||
{ event: { category: ['1'] } },
|
||||
{ event: { category: ['2'] } },
|
||||
];
|
||||
const maxSamples = 5;
|
||||
|
||||
mockPartialShuffleArray.mockImplementation((array, numSamples) => {
|
||||
// Mock implementation that does not actually shuffle
|
||||
return array;
|
||||
});
|
||||
|
||||
const [selectedResults, indices] = selectResults(pipelineResults, maxSamples, new Set());
|
||||
|
||||
expect(selectedResults).toHaveLength(pipelineResults.length);
|
||||
expect(indices).toHaveLength(pipelineResults.length);
|
||||
expect(indices).toEqual([0, 1]);
|
||||
expect(selectedResults).toEqual(pipelineResults);
|
||||
});
|
||||
|
||||
it('should call partialShuffleArray with correct arguments', () => {
|
||||
const pipelineResults: PipelineResult[] = [
|
||||
{ event: { category: ['1'] } },
|
||||
{ event: { category: ['2'] } },
|
||||
{ event: { category: ['3'] } },
|
||||
];
|
||||
|
||||
selectResults(pipelineResults, 2, new Set());
|
||||
|
||||
expect(mockPartialShuffleArray).toHaveBeenCalledWith([0, 1], 0, 2);
|
||||
});
|
||||
|
||||
it('should handle avoiding indices', () => {
|
||||
const pipelineResults = [
|
||||
{ event: { category: ['1'] } },
|
||||
{ event: { category: ['2'] } },
|
||||
{ event: { category: ['3'] } },
|
||||
] satisfies PipelineResult[];
|
||||
const maxSamples = 2;
|
||||
|
||||
mockPartialShuffleArray.mockImplementation((array, numSamples) => {
|
||||
// Mock implementation that does not actually shuffle
|
||||
return array;
|
||||
});
|
||||
|
||||
const [selectedResults, indices] = selectResults(pipelineResults, maxSamples, new Set([1]));
|
||||
expect(selectedResults).toHaveLength(maxSamples);
|
||||
expect(indices).toHaveLength(maxSamples);
|
||||
expect(indices).toEqual([0, 2]);
|
||||
expect(selectedResults).toEqual([pipelineResults[0], pipelineResults[2]]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('diffPipelineResults', () => {
|
||||
it('should return an empty set if there are no differences', () => {
|
||||
const pipelineResults: PipelineResult[] = [
|
||||
{ event: { category: ['1'], type: ['type1'] } },
|
||||
{ event: { category: ['2'], type: ['type2'] } },
|
||||
];
|
||||
const previousPipelineResults: PipelineResult[] = [
|
||||
{ event: { category: ['1'], type: ['type1'] } },
|
||||
{ event: { category: ['2'], type: ['type2'] } },
|
||||
];
|
||||
|
||||
const result = diffCategorization(pipelineResults, previousPipelineResults);
|
||||
expect(result).toEqual(new Set());
|
||||
});
|
||||
|
||||
it('should return a set of indices where the categories differ', () => {
|
||||
const pipelineResults: PipelineResult[] = [
|
||||
{ event: { category: ['1'], type: ['type1'] } },
|
||||
{ event: { category: ['2'], type: ['type2'] } },
|
||||
];
|
||||
const previousPipelineResults: PipelineResult[] = [
|
||||
{ event: { category: ['1'], type: ['type1'] } },
|
||||
{ event: { category: ['3'], type: ['type2'] } },
|
||||
];
|
||||
|
||||
const result = diffCategorization(pipelineResults, previousPipelineResults);
|
||||
expect(result).toEqual(new Set([1]));
|
||||
});
|
||||
|
||||
it('should return a set of indices where the types differ', () => {
|
||||
const pipelineResults: PipelineResult[] = [
|
||||
{ event: { category: ['1'], type: ['type1'] } },
|
||||
{ event: { category: ['2'], type: ['type2'] } },
|
||||
];
|
||||
const previousPipelineResults: PipelineResult[] = [
|
||||
{ event: { category: ['1'], type: ['type1'] } },
|
||||
{ event: { category: ['2'], type: ['type3'] } },
|
||||
];
|
||||
|
||||
const result = diffCategorization(pipelineResults, previousPipelineResults);
|
||||
expect(result).toEqual(new Set([1]));
|
||||
});
|
||||
|
||||
it('should return a set of indices where both categories and types differ', () => {
|
||||
const pipelineResults: PipelineResult[] = [
|
||||
{ event: { category: ['1'], type: ['type1'] } },
|
||||
{ event: { category: ['2'], type: ['type2'] } },
|
||||
];
|
||||
const previousPipelineResults: PipelineResult[] = [
|
||||
{ event: { category: ['3'], type: ['type3'] } },
|
||||
{ event: { category: ['4'], type: ['type4'] } },
|
||||
];
|
||||
|
||||
const result = diffCategorization(pipelineResults, previousPipelineResults);
|
||||
expect(result).toEqual(new Set([0, 1]));
|
||||
});
|
||||
|
||||
describe('stringArraysEqual', () => {
|
||||
it('should return true for equal arrays', () => {
|
||||
const arr1 = ['a', 'b', 'c'];
|
||||
const arr2 = ['a', 'b', 'c'];
|
||||
expect(stringArraysEqual(arr1, arr2)).toBe(true);
|
||||
});
|
||||
|
||||
it('should return false for arrays of different lengths', () => {
|
||||
const arr1 = ['a', 'b', 'c'];
|
||||
const arr2 = ['a', 'b'];
|
||||
expect(stringArraysEqual(arr1, arr2)).toBe(false);
|
||||
});
|
||||
|
||||
it('should return false for arrays with different elements', () => {
|
||||
const arr1 = ['a', 'b', 'c'];
|
||||
const arr2 = ['a', 'b', 'd'];
|
||||
expect(stringArraysEqual(arr1, arr2)).toBe(false);
|
||||
});
|
||||
|
||||
it('should return false for arrays with same elements in different order', () => {
|
||||
const arr1 = ['a', 'b', 'c'];
|
||||
const arr2 = ['c', 'b', 'a'];
|
||||
expect(stringArraysEqual(arr1, arr2)).toBe(false);
|
||||
});
|
||||
|
||||
it('should return true for empty arrays', () => {
|
||||
const arr1: string[] = [];
|
||||
const arr2: string[] = [];
|
||||
expect(stringArraysEqual(arr1, arr2)).toBe(true);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { PipelineResult } from './validate';
|
||||
import { partialShuffleArray } from '../../../common';
|
||||
|
||||
/**
|
||||
* Selects a subset of results for further processing from the given list.
|
||||
*
|
||||
* The shuffle is deterministic and reproducible, based on the default seed.
|
||||
*
|
||||
* @param pipelineResults - An array of PipelineResult objects to select from.
|
||||
* @param maxSamples - The maximum number of samples to select.
|
||||
* @returns An array of PipelineResult objects, containing up to `maxSamples` elements and their indices.
|
||||
*/
|
||||
export function selectResults(
|
||||
pipelineResults: PipelineResult[],
|
||||
maxSamples: number,
|
||||
avoidIndices: Set<number>
|
||||
): [PipelineResult[], number[]] {
|
||||
const numSamples = Math.min(pipelineResults.length, maxSamples);
|
||||
const indices = Array.from({ length: pipelineResults.length }, (_, i) => i).filter(
|
||||
(i) => !avoidIndices.has(i)
|
||||
);
|
||||
if (indices.length < numSamples) {
|
||||
const avoidIndicesList = Array.from(avoidIndices).sort();
|
||||
partialShuffleArray(avoidIndicesList, 0, numSamples - indices.length);
|
||||
avoidIndicesList.length = numSamples - indices.length;
|
||||
indices.push(...avoidIndicesList);
|
||||
}
|
||||
partialShuffleArray(indices, 0, numSamples);
|
||||
indices.length = numSamples;
|
||||
return [indices.map((i) => pipelineResults[i]), indices];
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a PipelineResult object into its categorization.
|
||||
*
|
||||
* @param {PipelineResult} result - The result object from the pipeline containing event details.
|
||||
* @returns {string[]} An array of strings combining event categories and types. Returns an empty array if event, event.category, or event.type is missing.
|
||||
*/
|
||||
function toCategorization(result: PipelineResult): string[] {
|
||||
const event = result?.event;
|
||||
if (!event || !event.category || !event.type) {
|
||||
return [];
|
||||
}
|
||||
return [...event.category.sort(), ...event.type.sort()];
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares two arrays of strings for equality.
|
||||
*
|
||||
* @param arr1 - The first array of strings to compare.
|
||||
* @param arr2 - The second array of strings to compare.
|
||||
* @returns the equality predicate
|
||||
*/
|
||||
export function stringArraysEqual(arr1: string[], arr2: string[]): boolean {
|
||||
return arr1.length === arr2.length && arr1.every((value, index) => value === arr2[index]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares two arrays of pipeline results and returns a set of indices where the categorization differs.
|
||||
*
|
||||
* @param pipelineResults - The current array of pipeline results.
|
||||
* @param previousPipelineResults - The previous array of pipeline results to compare against.
|
||||
* @returns A set of indices where the pipeline results differ in event category or type.
|
||||
*/
|
||||
export function diffCategorization(
|
||||
pipelineResults: PipelineResult[],
|
||||
previousPipelineResults: PipelineResult[]
|
||||
): Set<number> {
|
||||
const diff = Array.from({ length: pipelineResults.length }, (_, i) => i).filter((i) => {
|
||||
const category1 = toCategorization(pipelineResults[i]);
|
||||
const category2 = toCategorization(previousPipelineResults[i]);
|
||||
return !stringArraysEqual(category1, category2);
|
||||
});
|
||||
return new Set(diff);
|
||||
}
|
|
@ -10,12 +10,12 @@ import { ECS_EVENT_TYPES_PER_CATEGORY, EVENT_CATEGORIES, EVENT_TYPES } from './c
|
|||
|
||||
import type { EventCategories } from './constants';
|
||||
|
||||
interface Event {
|
||||
export interface Event {
|
||||
type?: string[];
|
||||
category?: string[];
|
||||
}
|
||||
|
||||
interface PipelineResult {
|
||||
export interface PipelineResult {
|
||||
event?: Event;
|
||||
}
|
||||
|
||||
|
|
|
@ -93,7 +93,7 @@ export async function handleHeaderValidate({
|
|||
|
||||
async function verifyKVProcessor(
|
||||
kvProcessor: ESProcessorItem,
|
||||
formattedSamples: string[],
|
||||
samples: string[],
|
||||
client: IScopedClusterClient
|
||||
): Promise<{ errors: object[] }> {
|
||||
// This processor removes the original message field in the output
|
||||
|
@ -101,7 +101,7 @@ async function verifyKVProcessor(
|
|||
processors: [kvProcessor[0], createRemoveProcessor()],
|
||||
on_failure: [createPassthroughFailureProcessor()],
|
||||
};
|
||||
const { errors } = await testPipeline(formattedSamples, pipeline, client);
|
||||
const { errors } = await testPipeline(samples, pipeline, client);
|
||||
return { errors };
|
||||
}
|
||||
|
||||
|
|
|
@ -9,8 +9,7 @@ import type { LogFormatDetectionState } from '../../types';
|
|||
import { LOG_FORMAT_DETECTION_PROMPT } from './prompts';
|
||||
import type { LogDetectionNodeParams } from './types';
|
||||
import { SamplesFormat } from '../../../common';
|
||||
|
||||
const MaxLogSamplesInPrompt = 5;
|
||||
import { LOG_FORMAT_DETECTION_SAMPLE_ROWS } from '../../../common/constants';
|
||||
|
||||
export async function handleLogFormatDetection({
|
||||
state,
|
||||
|
@ -20,8 +19,8 @@ export async function handleLogFormatDetection({
|
|||
const logFormatDetectionNode = LOG_FORMAT_DETECTION_PROMPT.pipe(model).pipe(outputParser);
|
||||
|
||||
const samples =
|
||||
state.logSamples.length > MaxLogSamplesInPrompt
|
||||
? state.logSamples.slice(0, MaxLogSamplesInPrompt)
|
||||
state.logSamples.length > LOG_FORMAT_DETECTION_SAMPLE_ROWS
|
||||
? state.logSamples.slice(0, LOG_FORMAT_DETECTION_SAMPLE_ROWS)
|
||||
: state.logSamples;
|
||||
|
||||
const logFormatDetectionResult = await logFormatDetectionNode.invoke({
|
||||
|
|
|
@ -10,7 +10,7 @@ import { StateGraph, END, START } from '@langchain/langgraph';
|
|||
import { SamplesFormat } from '../../../common';
|
||||
import type { RelatedState } from '../../types';
|
||||
import { handleValidatePipeline } from '../../util/graph';
|
||||
import { formatSamples, prefixSamples } from '../../util/samples';
|
||||
import { prefixSamples } from '../../util/samples';
|
||||
import { RELATED_ECS_FIELDS, RELATED_EXAMPLE_ANSWER } from './constants';
|
||||
import { handleErrors } from './errors';
|
||||
import { handleRelated } from './related';
|
||||
|
@ -30,10 +30,6 @@ const graphState: StateGraphArgs<RelatedState>['channels'] = {
|
|||
value: (x: string[], y?: string[]) => y ?? x,
|
||||
default: () => [],
|
||||
},
|
||||
formattedSamples: {
|
||||
value: (x: string, y?: string) => y ?? x,
|
||||
default: () => '',
|
||||
},
|
||||
hasTriedOnce: {
|
||||
value: (x: boolean, y?: boolean) => y ?? x,
|
||||
default: () => false,
|
||||
|
@ -97,31 +93,22 @@ const graphState: StateGraphArgs<RelatedState>['channels'] = {
|
|||
};
|
||||
|
||||
function modelInput({ state }: RelatedBaseNodeParams): Partial<RelatedState> {
|
||||
const initialPipeline = JSON.parse(JSON.stringify(state.currentPipeline));
|
||||
return {
|
||||
exAnswer: JSON.stringify(RELATED_EXAMPLE_ANSWER, null, 2),
|
||||
ecs: JSON.stringify(RELATED_ECS_FIELDS, null, 2),
|
||||
samples: state.rawSamples,
|
||||
initialPipeline,
|
||||
finalized: false,
|
||||
reviewed: false,
|
||||
lastExecutedChain: 'modelInput',
|
||||
};
|
||||
}
|
||||
let samples: string[];
|
||||
if (state.samplesFormat.name === 'json' || state.samplesFormat.name === 'ndjson') {
|
||||
samples = prefixSamples(state);
|
||||
} else {
|
||||
samples = state.rawSamples;
|
||||
}
|
||||
|
||||
function modelJSONInput({ state }: RelatedBaseNodeParams): Partial<RelatedState> {
|
||||
const samples = prefixSamples(state);
|
||||
const formattedSamples = formatSamples(samples);
|
||||
const initialPipeline = JSON.parse(JSON.stringify(state.currentPipeline));
|
||||
return {
|
||||
exAnswer: JSON.stringify(RELATED_EXAMPLE_ANSWER, null, 2),
|
||||
ecs: JSON.stringify(RELATED_ECS_FIELDS, null, 2),
|
||||
samples,
|
||||
formattedSamples,
|
||||
initialPipeline,
|
||||
finalized: false,
|
||||
reviewed: false,
|
||||
lastExecutedChain: 'modelJSONInput',
|
||||
lastExecutedChain: 'modelInput',
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -143,13 +130,6 @@ function inputRouter({ state }: RelatedBaseNodeParams): string {
|
|||
return 'related';
|
||||
}
|
||||
|
||||
function modelRouter({ state }: RelatedBaseNodeParams): string {
|
||||
if (state.samplesFormat.name === 'json' || state.samplesFormat.name === 'ndjson') {
|
||||
return 'modelJSONInput';
|
||||
}
|
||||
return 'modelInput';
|
||||
}
|
||||
|
||||
function chainRouter({ state }: RelatedBaseNodeParams): string {
|
||||
if (Object.keys(state.currentProcessors).length === 0) {
|
||||
if (state.hasTriedOnce || state.reviewed) {
|
||||
|
@ -172,7 +152,6 @@ function chainRouter({ state }: RelatedBaseNodeParams): string {
|
|||
export async function getRelatedGraph({ client, model }: RelatedGraphParams) {
|
||||
const workflow = new StateGraph({ channels: graphState })
|
||||
.addNode('modelInput', (state: RelatedState) => modelInput({ state }))
|
||||
.addNode('modelJSONInput', (state: RelatedState) => modelJSONInput({ state }))
|
||||
.addNode('modelOutput', (state: RelatedState) => modelOutput({ state }))
|
||||
.addNode('handleRelated', (state: RelatedState) => handleRelated({ state, model }))
|
||||
.addNode('handleValidatePipeline', (state: RelatedState) =>
|
||||
|
@ -180,10 +159,7 @@ export async function getRelatedGraph({ client, model }: RelatedGraphParams) {
|
|||
)
|
||||
.addNode('handleErrors', (state: RelatedState) => handleErrors({ state, model }))
|
||||
.addNode('handleReview', (state: RelatedState) => handleReview({ state, model }))
|
||||
.addConditionalEdges(START, (state: RelatedState) => modelRouter({ state }), {
|
||||
modelJSONInput: 'modelJSONInput',
|
||||
modelInput: 'modelInput', // For Non JSON input samples
|
||||
})
|
||||
.addEdge(START, 'modelInput')
|
||||
.addEdge('modelOutput', END)
|
||||
.addEdge('handleRelated', 'handleValidatePipeline')
|
||||
.addEdge('handleErrors', 'handleValidatePipeline')
|
||||
|
@ -192,10 +168,6 @@ export async function getRelatedGraph({ client, model }: RelatedGraphParams) {
|
|||
related: 'handleRelated',
|
||||
validatePipeline: 'handleValidatePipeline',
|
||||
})
|
||||
.addConditionalEdges('modelJSONInput', (state: RelatedState) => inputRouter({ state }), {
|
||||
related: 'handleRelated',
|
||||
validatePipeline: 'handleValidatePipeline',
|
||||
})
|
||||
.addConditionalEdges(
|
||||
'handleValidatePipeline',
|
||||
(state: RelatedState) => chainRouter({ state }),
|
||||
|
|
|
@ -22,7 +22,7 @@ import { buildRouteValidationWithZod } from '../util/route_validation';
|
|||
import { withAvailability } from './with_availability';
|
||||
import { isErrorThatHandlesItsOwnResponse } from '../lib/errors';
|
||||
import { handleCustomErrors } from './routes_util';
|
||||
import { GenerationErrorCode } from '../../common/constants';
|
||||
import { CATEGORIZATION_RECURSION_LIMIT, GenerationErrorCode } from '../../common/constants';
|
||||
|
||||
export function registerCategorizationRoutes(
|
||||
router: IRouter<IntegrationAssistantRouteHandlerContext>
|
||||
|
@ -91,6 +91,7 @@ export function registerCategorizationRoutes(
|
|||
samplesFormat,
|
||||
};
|
||||
const options = {
|
||||
recursionLimit: CATEGORIZATION_RECURSION_LIMIT,
|
||||
callbacks: [
|
||||
new APMTracer({ projectName: langSmithOptions?.projectName ?? 'default' }, logger),
|
||||
...getLangSmithTracer({ ...langSmithOptions, logger }),
|
||||
|
|
|
@ -42,7 +42,6 @@ export interface SimplifiedProcessors {
|
|||
export interface CategorizationState {
|
||||
rawSamples: string[];
|
||||
samples: string[];
|
||||
formattedSamples: string;
|
||||
ecsTypes: string;
|
||||
ecsCategories: string;
|
||||
exAnswer: string;
|
||||
|
@ -52,9 +51,11 @@ export interface CategorizationState {
|
|||
errors: object;
|
||||
previousError: string;
|
||||
pipelineResults: object[];
|
||||
previousPipelineResults: object[];
|
||||
lastReviewedSamples: number[]; // Filled when reviewing.
|
||||
stableSamples: number[]; // Samples that did not change due to a review.
|
||||
reviewCount: number;
|
||||
finalized: boolean;
|
||||
reviewed: boolean;
|
||||
hasTriedOnce: boolean;
|
||||
currentPipeline: object;
|
||||
currentProcessors: object[];
|
||||
invalidCategorization: object[];
|
||||
|
@ -154,7 +155,6 @@ export interface UnstructuredLogState {
|
|||
export interface RelatedState {
|
||||
rawSamples: string[];
|
||||
samples: string[];
|
||||
formattedSamples: string;
|
||||
ecs: string;
|
||||
exAnswer: string;
|
||||
packageName: string;
|
||||
|
|
|
@ -19,9 +19,11 @@ export async function handleValidatePipeline({
|
|||
}: HandleValidateNodeParams): Promise<Partial<CategorizationState> | Partial<RelatedState>> {
|
||||
const previousError = JSON.stringify(state.errors, null, 2);
|
||||
const results = await testPipeline(state.rawSamples, state.currentPipeline, client);
|
||||
|
||||
return {
|
||||
errors: results.errors,
|
||||
previousError,
|
||||
previousPipelineResults: state.pipelineResults,
|
||||
pipelineResults: results.pipelineResults,
|
||||
lastExecutedChain: 'validate_pipeline',
|
||||
};
|
||||
|
|
|
@ -56,13 +56,13 @@ export async function testPipeline(
|
|||
|
||||
export async function createJSONInput(
|
||||
processors: ESProcessorItem[],
|
||||
formattedSamples: string[],
|
||||
samples: string[],
|
||||
client: IScopedClusterClient
|
||||
): Promise<{ pipelineResults: Array<{ [key: string]: unknown }>; errors: object[] }> {
|
||||
const pipeline = {
|
||||
processors: [...processors, createRemoveProcessor()],
|
||||
on_failure: [createPassthroughFailureProcessor()],
|
||||
};
|
||||
const { pipelineResults, errors } = await testPipeline(formattedSamples, pipeline, client);
|
||||
const { pipelineResults, errors } = await testPipeline(samples, pipeline, client);
|
||||
return { pipelineResults, errors };
|
||||
}
|
||||
|
|
|
@ -48,17 +48,6 @@ export function prefixSamples(
|
|||
return modifiedSamples;
|
||||
}
|
||||
|
||||
export function formatSamples(samples: string[]): string {
|
||||
const formattedSamples: unknown[] = [];
|
||||
|
||||
for (const sample of samples) {
|
||||
const sampleObj = JSON.parse(sample);
|
||||
formattedSamples.push(sampleObj);
|
||||
}
|
||||
|
||||
return JSON.stringify(formattedSamples, null, 2);
|
||||
}
|
||||
|
||||
function determineType(value: unknown): string {
|
||||
if (typeof value === 'object' && value !== null) {
|
||||
if (Array.isArray(value)) {
|
||||
|
|
|
@ -24771,7 +24771,6 @@
|
|||
"xpack.integrationAssistant.step.dataStream.logsSample.errorNotArray": "Le fichier de logs exemple n'est pas un tableau",
|
||||
"xpack.integrationAssistant.step.dataStream.logsSample.errorNotObject": "Le fichier de logs exemple contient des entrées n’étant pas des objets",
|
||||
"xpack.integrationAssistant.step.dataStream.logsSample.label": "Logs",
|
||||
"xpack.integrationAssistant.step.dataStream.logsSample.truncatedWarning": "L'échantillon de logs a été tronqué pour contenir {maxRows} lignes.",
|
||||
"xpack.integrationAssistant.step.dataStream.logsSample.warning": "Veuillez noter que ces données seront analysées par un outil d'IA tiers. Assurez-vous de respecter les directives de confidentialité et de sécurité lors de la sélection des données.",
|
||||
"xpack.integrationAssistant.step.dataStream.nameAlreadyExistsError": "Ce nom d'intégration est déjà utilisé. Veuillez choisir un autre nom.",
|
||||
"xpack.integrationAssistant.step.dataStream.noSpacesHelpText": "Les noms peuvent contenir uniquement des lettres minuscules, des chiffres et des traits de soulignement (_)",
|
||||
|
|
|
@ -24518,7 +24518,6 @@
|
|||
"xpack.integrationAssistant.step.dataStream.logsSample.errorNotArray": "ログサンプルファイルは配列ではありません",
|
||||
"xpack.integrationAssistant.step.dataStream.logsSample.errorNotObject": "ログサンプルファイルには、オブジェクト以外のエントリが含まれています",
|
||||
"xpack.integrationAssistant.step.dataStream.logsSample.label": "ログ",
|
||||
"xpack.integrationAssistant.step.dataStream.logsSample.truncatedWarning": "ログサンプルは{maxRows}行に切り詰められました。",
|
||||
"xpack.integrationAssistant.step.dataStream.logsSample.warning": "このデータは、サードパーティAIツールによって分析されます。データを選択するときには、プライバシーおよびセキュリティガイドラインに準拠していることを確認してください。",
|
||||
"xpack.integrationAssistant.step.dataStream.nameAlreadyExistsError": "この統合名はすでに使用中です。別の名前を選択してください。",
|
||||
"xpack.integrationAssistant.step.dataStream.noSpacesHelpText": "名前には、小文字、数字、アンダースコア(_)のみを使用できます。",
|
||||
|
|
|
@ -24552,7 +24552,6 @@
|
|||
"xpack.integrationAssistant.step.dataStream.logsSample.errorNotArray": "日志样例文件不是数组",
|
||||
"xpack.integrationAssistant.step.dataStream.logsSample.errorNotObject": "日志样例文件包含非对象条目",
|
||||
"xpack.integrationAssistant.step.dataStream.logsSample.label": "日志",
|
||||
"xpack.integrationAssistant.step.dataStream.logsSample.truncatedWarning": "日志样例已被截短为 {maxRows} 行。",
|
||||
"xpack.integrationAssistant.step.dataStream.logsSample.warning": "请注意,此数据将由第三方 AI 工具进行分析。选择数据时,请确保遵循隐私和安全指引。",
|
||||
"xpack.integrationAssistant.step.dataStream.nameAlreadyExistsError": "此集成名称已在使用中。请选择其他名称。",
|
||||
"xpack.integrationAssistant.step.dataStream.noSpacesHelpText": "名称只能包含小写字母、数字和下划线 (_)",
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue