mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[Streams 🌊] Update simulator to assert fields & integration testing (#206950)
## 📓 Summary Closes https://github.com/elastic/streams-program/issues/68 This work updates the way a simulation for processing is performed, working against the `_ingest/_simulate` API. This gives less specific feedback on the simulation failure (which processor failed), but allows for a much more realistic simulation against the index configuration. This work also adds integration testing for this API. ## 📔 Reviewer notes The API is poorly typed due to missing typing in the elasticsearch-js library. #204175 updates the library with those typings, as soon as it's merged I'll update the API. ## 🎥 Recordings https://github.com/user-attachments/assets/36ce0d3c-b7de-44d2-bdc2-84ff67fb4b25
This commit is contained in:
parent
5766467359
commit
39bf5e646f
7 changed files with 454 additions and 83 deletions
|
@ -0,0 +1,13 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
export class DetectedMappingFailure extends Error {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
this.name = 'DetectedMappingFailure';
|
||||
}
|
||||
}
|
|
@ -5,15 +5,19 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
/* eslint-disable @typescript-eslint/naming-convention */
|
||||
|
||||
import { z } from '@kbn/zod';
|
||||
import { notFound, internal, badRequest } from '@hapi/boom';
|
||||
import { FieldDefinitionConfig, processingDefinitionSchema } from '@kbn/streams-schema';
|
||||
import { calculateObjectDiff, flattenObject } from '@kbn/object-utils';
|
||||
import {
|
||||
IngestSimulateResponse,
|
||||
IngestSimulateSimulateDocumentResult,
|
||||
} from '@elastic/elasticsearch/lib/api/types';
|
||||
FieldDefinitionConfig,
|
||||
fieldDefinitionConfigSchema,
|
||||
processingDefinitionSchema,
|
||||
} from '@kbn/streams-schema';
|
||||
import { calculateObjectDiff, flattenObject } from '@kbn/object-utils';
|
||||
import { isEmpty } from 'lodash';
|
||||
import { IScopedClusterClient } from '@kbn/core/server';
|
||||
import { DetectedMappingFailure } from '../../../lib/streams/errors/detected_mapping_failure';
|
||||
import { NonAdditiveProcessor } from '../../../lib/streams/errors/non_additive_processor';
|
||||
import { SimulationFailed } from '../../../lib/streams/errors/simulation_failed';
|
||||
import { formatToIngestProcessors } from '../../../lib/streams/helpers/processing';
|
||||
|
@ -21,6 +25,17 @@ import { createServerRoute } from '../../create_server_route';
|
|||
import { DefinitionNotFound } from '../../../lib/streams/errors';
|
||||
import { checkAccess } from '../../../lib/streams/stream_crud';
|
||||
|
||||
const paramsSchema = z.object({
|
||||
path: z.object({ id: z.string() }),
|
||||
body: z.object({
|
||||
processing: z.array(processingDefinitionSchema),
|
||||
documents: z.array(z.record(z.unknown())),
|
||||
detected_fields: z.array(fieldDefinitionConfigSchema.extend({ name: z.string() })).optional(),
|
||||
}),
|
||||
});
|
||||
|
||||
type ProcessingSimulateParams = z.infer<typeof paramsSchema>;
|
||||
|
||||
export const simulateProcessorRoute = createServerRoute({
|
||||
endpoint: 'POST /api/streams/{id}/processing/_simulate',
|
||||
options: {
|
||||
|
@ -33,82 +48,153 @@ export const simulateProcessorRoute = createServerRoute({
|
|||
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
|
||||
},
|
||||
},
|
||||
params: z.object({
|
||||
path: z.object({ id: z.string() }),
|
||||
body: z.object({
|
||||
processing: z.array(processingDefinitionSchema),
|
||||
documents: z.array(z.record(z.unknown())),
|
||||
}),
|
||||
}),
|
||||
handler: async ({ params, request, response, getScopedClients }) => {
|
||||
params: paramsSchema,
|
||||
handler: async ({ params, request, getScopedClients }) => {
|
||||
try {
|
||||
const { scopedClusterClient } = await getScopedClients({ request });
|
||||
|
||||
const hasAccess = await checkAccess({ id: params.path.id, scopedClusterClient });
|
||||
if (!hasAccess) {
|
||||
const { read } = await checkAccess({ id: params.path.id, scopedClusterClient });
|
||||
if (!read) {
|
||||
throw new DefinitionNotFound(`Stream definition for ${params.path.id} not found.`);
|
||||
}
|
||||
// Normalize processing definition to pipeline processors
|
||||
const processors = formatToIngestProcessors(params.body.processing);
|
||||
// Convert input documents to ingest simulation format
|
||||
const docs = params.body.documents.map((doc) => ({ _source: doc }));
|
||||
|
||||
let simulationResult: IngestSimulateResponse;
|
||||
try {
|
||||
simulationResult = await scopedClusterClient.asCurrentUser.ingest.simulate({
|
||||
verbose: true,
|
||||
pipeline: { processors },
|
||||
docs,
|
||||
});
|
||||
} catch (error) {
|
||||
throw new SimulationFailed(error);
|
||||
}
|
||||
const simulationBody = prepareSimulationBody(params);
|
||||
|
||||
const simulationDiffs = computeSimulationDiffs(simulationResult, docs);
|
||||
const simulationResult = await executeSimulation(scopedClusterClient, simulationBody);
|
||||
|
||||
const updatedFields = computeUpdatedFields(simulationDiffs);
|
||||
if (!isEmpty(updatedFields)) {
|
||||
throw new NonAdditiveProcessor(
|
||||
`The processor is not additive to the documents. It might update fields [${updatedFields.join()}]`
|
||||
);
|
||||
}
|
||||
const simulationDiffs = prepareSimulationDiffs(simulationResult, simulationBody.docs);
|
||||
|
||||
const documents = computeSimulationDocuments(simulationResult, docs);
|
||||
const detectedFields = computeDetectedFields(simulationDiffs);
|
||||
const successRate = computeSuccessRate(simulationResult);
|
||||
const failureRate = 1 - successRate;
|
||||
assertSimulationResult(simulationResult, simulationDiffs);
|
||||
|
||||
return {
|
||||
documents,
|
||||
success_rate: parseFloat(successRate.toFixed(2)),
|
||||
failure_rate: parseFloat(failureRate.toFixed(2)),
|
||||
detected_fields: detectedFields,
|
||||
};
|
||||
return prepareSimulationResponse(
|
||||
simulationResult,
|
||||
simulationBody.docs,
|
||||
simulationDiffs,
|
||||
params.body.detected_fields
|
||||
);
|
||||
} catch (error) {
|
||||
if (error instanceof DefinitionNotFound) {
|
||||
throw notFound(error);
|
||||
}
|
||||
|
||||
if (error instanceof SimulationFailed || error instanceof NonAdditiveProcessor) {
|
||||
if (
|
||||
error instanceof SimulationFailed ||
|
||||
error instanceof NonAdditiveProcessor ||
|
||||
error instanceof DetectedMappingFailure
|
||||
) {
|
||||
throw badRequest(error);
|
||||
}
|
||||
|
||||
throw internal(error);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
const computeSimulationDiffs = (
|
||||
simulation: IngestSimulateResponse,
|
||||
const prepareSimulationBody = (params: ProcessingSimulateParams) => {
|
||||
const { path, body } = params;
|
||||
const { processing, documents, detected_fields } = body;
|
||||
|
||||
const processors = formatToIngestProcessors(processing);
|
||||
const docs = documents.map((doc, id) => ({
|
||||
_index: path.id,
|
||||
_id: id.toString(),
|
||||
_source: doc,
|
||||
}));
|
||||
|
||||
const simulationBody: any = {
|
||||
docs,
|
||||
pipeline_substitutions: {
|
||||
[`${path.id}@stream.processing`]: {
|
||||
processors,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
if (detected_fields) {
|
||||
const properties = computeMappingProperties(detected_fields);
|
||||
simulationBody.component_template_substitutions = {
|
||||
[`${path.id}@stream.layer`]: {
|
||||
template: {
|
||||
mappings: {
|
||||
properties,
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
return simulationBody;
|
||||
};
|
||||
|
||||
// TODO: update type once Kibana updates to elasticsearch-js 8.17
|
||||
const executeSimulation = async (
|
||||
scopedClusterClient: IScopedClusterClient,
|
||||
simulationBody: ReturnType<typeof prepareSimulationBody>
|
||||
): Promise<any> => {
|
||||
try {
|
||||
// TODO: We should be using scopedClusterClient.asCurrentUser.simulate.ingest() once Kibana updates to elasticsearch-js 8.17
|
||||
return await scopedClusterClient.asCurrentUser.transport.request({
|
||||
method: 'POST',
|
||||
path: `_ingest/_simulate`,
|
||||
body: simulationBody,
|
||||
});
|
||||
} catch (error) {
|
||||
throw new SimulationFailed(error);
|
||||
}
|
||||
};
|
||||
|
||||
const assertSimulationResult = (
|
||||
simulationResult: Awaited<ReturnType<typeof executeSimulation>>,
|
||||
simulationDiffs: ReturnType<typeof prepareSimulationDiffs>
|
||||
) => {
|
||||
// Assert mappings are compatible with the documents
|
||||
const entryWithError = simulationResult.docs.find(isMappingFailure);
|
||||
if (entryWithError) {
|
||||
throw new DetectedMappingFailure(
|
||||
`The detected field types might not be compatible with these documents. ${entryWithError.doc.error.reason}`
|
||||
);
|
||||
}
|
||||
// Assert that the processors are purely additive to the documents
|
||||
const updatedFields = computeUpdatedFields(simulationDiffs);
|
||||
if (!isEmpty(updatedFields)) {
|
||||
throw new NonAdditiveProcessor(
|
||||
`The processor is not additive to the documents. It might update fields [${updatedFields.join()}]`
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const prepareSimulationResponse = (
|
||||
simulationResult: any,
|
||||
docs: Array<{ _source: Record<string, unknown> }>,
|
||||
simulationDiffs: ReturnType<typeof prepareSimulationDiffs>,
|
||||
detectedFields?: ProcessingSimulateParams['body']['detected_fields']
|
||||
) => {
|
||||
const confirmedValidDetectedFields = computeMappingProperties(detectedFields ?? []);
|
||||
const documents = computeSimulationDocuments(simulationResult, docs);
|
||||
const detectedFieldsResult = computeDetectedFields(simulationDiffs, confirmedValidDetectedFields);
|
||||
const successRate = computeSuccessRate(simulationResult);
|
||||
const failureRate = 1 - successRate;
|
||||
|
||||
return {
|
||||
documents,
|
||||
success_rate: parseFloat(successRate.toFixed(2)),
|
||||
failure_rate: parseFloat(failureRate.toFixed(2)),
|
||||
detected_fields: detectedFieldsResult,
|
||||
};
|
||||
};
|
||||
|
||||
// TODO: update type once Kibana updates to elasticsearch-js 8.17
|
||||
const prepareSimulationDiffs = (
|
||||
simulation: any,
|
||||
sampleDocs: Array<{ _source: Record<string, unknown> }>
|
||||
) => {
|
||||
// Since we filter out failed documents, we need to map the simulation docs to the sample docs for later retrieval
|
||||
const samplesToSimulationMap = new Map(simulation.docs.map((doc, id) => [doc, sampleDocs[id]]));
|
||||
const samplesToSimulationMap = new Map<any, { _source: Record<string, unknown> }>(
|
||||
simulation.docs.map((entry: any, id: number) => [entry.doc, sampleDocs[id]])
|
||||
);
|
||||
|
||||
const diffs = simulation.docs.filter(isSuccessfulDocument).map((doc) => {
|
||||
const sample = samplesToSimulationMap.get(doc);
|
||||
const diffs = simulation.docs.filter(isSuccessfulDocument).map((entry: any) => {
|
||||
const sample = samplesToSimulationMap.get(entry.doc);
|
||||
if (sample) {
|
||||
return calculateObjectDiff(sample._source, doc.processor_results.at(-1)?.doc?._source);
|
||||
return calculateObjectDiff(sample._source, entry.doc._source);
|
||||
}
|
||||
|
||||
return calculateObjectDiff({});
|
||||
|
@ -117,9 +203,10 @@ const computeSimulationDiffs = (
|
|||
return diffs;
|
||||
};
|
||||
|
||||
const computeUpdatedFields = (simulationDiff: ReturnType<typeof computeSimulationDiffs>) => {
|
||||
// TODO: update type once Kibana updates to elasticsearch-js 8.17
|
||||
const computeUpdatedFields = (simulationDiff: ReturnType<typeof prepareSimulationDiffs>) => {
|
||||
const diffs = simulationDiff
|
||||
.map((simulatedDoc) => flattenObject(simulatedDoc.updated))
|
||||
.map((simulatedDoc: any) => flattenObject(simulatedDoc.updated))
|
||||
.flatMap(Object.keys);
|
||||
|
||||
const uniqueFields = [...new Set(diffs)];
|
||||
|
@ -127,15 +214,16 @@ const computeUpdatedFields = (simulationDiff: ReturnType<typeof computeSimulatio
|
|||
return uniqueFields;
|
||||
};
|
||||
|
||||
// TODO: update type once Kibana updates to elasticsearch-js 8.17
|
||||
const computeSimulationDocuments = (
|
||||
simulation: IngestSimulateResponse,
|
||||
simulation: any,
|
||||
sampleDocs: Array<{ _source: Record<string, unknown> }>
|
||||
) => {
|
||||
return simulation.docs.map((doc, id) => {
|
||||
): Array<{ isMatch: boolean; value: Record<string, unknown> }> => {
|
||||
return simulation.docs.map((entry: any, id: number) => {
|
||||
// If every processor was successful, return and flatten the simulation doc from the last processor
|
||||
if (isSuccessfulDocument(doc)) {
|
||||
if (isSuccessfulDocument(entry)) {
|
||||
return {
|
||||
value: flattenObject(doc.processor_results.at(-1)?.doc?._source ?? sampleDocs[id]._source),
|
||||
value: flattenObject(entry.doc._source ?? sampleDocs[id]._source),
|
||||
isMatch: true,
|
||||
};
|
||||
}
|
||||
|
@ -148,32 +236,44 @@ const computeSimulationDocuments = (
|
|||
};
|
||||
|
||||
const computeDetectedFields = (
|
||||
simulationDiff: ReturnType<typeof computeSimulationDiffs>
|
||||
simulationDiff: ReturnType<typeof prepareSimulationDiffs>,
|
||||
confirmedValidDetectedFields: Record<string, { type: FieldDefinitionConfig['type'] | 'unmapped' }>
|
||||
): Array<{
|
||||
name: string;
|
||||
type: FieldDefinitionConfig['type'] | 'unmapped';
|
||||
}> => {
|
||||
const diffs = simulationDiff
|
||||
.map((simulatedDoc) => flattenObject(simulatedDoc.added))
|
||||
const diffs: string[] = simulationDiff
|
||||
.map((simulatedDoc: any) => flattenObject(simulatedDoc.added))
|
||||
.flatMap(Object.keys);
|
||||
|
||||
const uniqueFields = [...new Set(diffs)];
|
||||
|
||||
return uniqueFields.map((name) => ({ name, type: 'unmapped' }));
|
||||
return uniqueFields.map((name: string) => ({
|
||||
name,
|
||||
type: confirmedValidDetectedFields[name]?.type || 'unmapped',
|
||||
}));
|
||||
};
|
||||
|
||||
const computeSuccessRate = (simulation: IngestSimulateResponse) => {
|
||||
const successfulCount = simulation.docs.reduce((rate, doc) => {
|
||||
return (rate += isSuccessfulDocument(doc) ? 1 : 0);
|
||||
// TODO: update type once Kibana updates to elasticsearch-js 8.17
|
||||
const computeSuccessRate = (simulation: any) => {
|
||||
const successfulCount = simulation.docs.reduce((rate: number, entry: any) => {
|
||||
return (rate += isSuccessfulDocument(entry) ? 1 : 0);
|
||||
}, 0);
|
||||
|
||||
return successfulCount / simulation.docs.length;
|
||||
};
|
||||
|
||||
const isSuccessfulDocument = (
|
||||
doc: IngestSimulateSimulateDocumentResult
|
||||
): doc is Required<IngestSimulateSimulateDocumentResult> =>
|
||||
doc.processor_results?.every((processorSimulation) => processorSimulation.status === 'success') ||
|
||||
false;
|
||||
const computeMappingProperties = (
|
||||
detectedFields: NonNullable<ProcessingSimulateParams['body']['detected_fields']>
|
||||
) => {
|
||||
return Object.fromEntries(detectedFields.map(({ name, type }) => [name, { type }]));
|
||||
};
|
||||
|
||||
// TODO: update type once Kibana updates to elasticsearch-js 8.17
|
||||
const isSuccessfulDocument = (entry: any) => entry.doc.error === undefined;
|
||||
// TODO: update type once Kibana updates to elasticsearch-js 8.17
|
||||
const isMappingFailure = (entry: any) =>
|
||||
!isSuccessfulDocument(entry) && entry.doc.error.type === 'document_parsing_exception';
|
||||
|
||||
export const processingRoutes = {
|
||||
...simulateProcessorRoute,
|
||||
|
|
|
@ -68,7 +68,7 @@ export function AddProcessorFlyout({
|
|||
const handleSubmit: SubmitHandler<ProcessorFormState> = async (data) => {
|
||||
const processingDefinition = convertFormStateToProcessing(data);
|
||||
|
||||
simulate(processingDefinition).then((responseBody) => {
|
||||
simulate(processingDefinition, data.detected_fields).then((responseBody) => {
|
||||
if (responseBody instanceof Error) return;
|
||||
|
||||
onAddProcessor(processingDefinition, data.detected_fields);
|
||||
|
|
|
@ -109,7 +109,8 @@ export const ProcessorOutcomePreview = ({
|
|||
}, [formFields.field, detectedFieldsColumns, selectedDocsFilter]);
|
||||
|
||||
const detectedFieldsEnabled =
|
||||
isWiredReadStream(definition) && simulation && !isEmpty(simulation.detected_fields);
|
||||
isWiredReadStream(definition) &&
|
||||
((simulation && !isEmpty(simulation.detected_fields)) || !isEmpty(formFields.detected_fields));
|
||||
|
||||
return (
|
||||
<EuiPanel hasShadow={false} paddingSize="none">
|
||||
|
@ -126,7 +127,9 @@ export const ProcessorOutcomePreview = ({
|
|||
iconType="play"
|
||||
color="accentSecondary"
|
||||
size="s"
|
||||
onClick={() => onSimulate(convertFormStateToProcessing(formFields))}
|
||||
onClick={() => {
|
||||
onSimulate(convertFormStateToProcessing(formFields), formFields.detected_fields);
|
||||
}}
|
||||
isLoading={isLoading}
|
||||
>
|
||||
{i18n.translate(
|
||||
|
@ -136,7 +139,7 @@ export const ProcessorOutcomePreview = ({
|
|||
</EuiButton>
|
||||
</EuiFlexGroup>
|
||||
<EuiSpacer />
|
||||
{detectedFieldsEnabled && <DetectedFields detectedFields={simulation.detected_fields} />}
|
||||
{detectedFieldsEnabled && <DetectedFields detectedFields={simulation?.detected_fields} />}
|
||||
<OutcomeControls
|
||||
docsFilter={selectedDocsFilter}
|
||||
onDocsFilterChange={setSelectedDocsFilter}
|
||||
|
@ -264,14 +267,14 @@ const OutcomeControls = ({
|
|||
);
|
||||
};
|
||||
|
||||
const DetectedFields = ({ detectedFields }: { detectedFields: DetectedField[] }) => {
|
||||
const DetectedFields = ({ detectedFields }: { detectedFields?: DetectedField[] }) => {
|
||||
const { euiTheme } = useEuiTheme();
|
||||
const { fields, replace } = useFieldArray<{ detected_fields: DetectedField[] }>({
|
||||
name: 'detected_fields',
|
||||
});
|
||||
|
||||
useEffect(() => {
|
||||
replace(detectedFields);
|
||||
if (detectedFields) replace(detectedFields);
|
||||
}, [detectedFields, replace]);
|
||||
|
||||
return (
|
||||
|
|
|
@ -5,23 +5,31 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
/* eslint-disable @typescript-eslint/naming-convention */
|
||||
|
||||
import { useAbortController } from '@kbn/observability-utils-browser/hooks/use_abort_controller';
|
||||
import { ReadStreamDefinition, ProcessingDefinition, Condition } from '@kbn/streams-schema';
|
||||
import useAsyncFn from 'react-use/lib/useAsyncFn';
|
||||
import { IHttpFetchError, ResponseErrorBody } from '@kbn/core/public';
|
||||
import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range';
|
||||
import { APIReturnType } from '@kbn/streams-plugin/public/api';
|
||||
import { APIReturnType, StreamsAPIClientRequestParamsOf } from '@kbn/streams-plugin/public/api';
|
||||
import { useStreamsAppFetch } from '../../../hooks/use_streams_app_fetch';
|
||||
import { useKibana } from '../../../hooks/use_kibana';
|
||||
import { DetectedField } from '../types';
|
||||
|
||||
type Simulation = APIReturnType<'POST /api/streams/{id}/processing/_simulate'>;
|
||||
type SimulationRequestBody =
|
||||
StreamsAPIClientRequestParamsOf<'POST /api/streams/{id}/processing/_simulate'>['params']['body'];
|
||||
|
||||
export interface UseProcessingSimulatorReturnType {
|
||||
error?: IHttpFetchError<ResponseErrorBody>;
|
||||
isLoading: boolean;
|
||||
refreshSamples: () => void;
|
||||
samples: Array<Record<PropertyKey, unknown>>;
|
||||
simulate: (processing: ProcessingDefinition) => Promise<Simulation | null>;
|
||||
simulate: (
|
||||
processing: ProcessingDefinition,
|
||||
detectedFields?: DetectedField[]
|
||||
) => Promise<Simulation | null>;
|
||||
simulation?: Simulation | null;
|
||||
}
|
||||
|
||||
|
@ -76,11 +84,17 @@ export const useProcessingSimulator = ({
|
|||
const sampleDocs = (samples?.documents ?? []) as Array<Record<PropertyKey, unknown>>;
|
||||
|
||||
const [{ loading: isLoadingSimulation, error, value }, simulate] = useAsyncFn(
|
||||
(processingDefinition: ProcessingDefinition) => {
|
||||
(processingDefinition: ProcessingDefinition, detectedFields?: DetectedField[]) => {
|
||||
if (!definition) {
|
||||
return Promise.resolve(null);
|
||||
}
|
||||
|
||||
const detected_fields = detectedFields
|
||||
? (detectedFields.filter(
|
||||
(field) => field.type !== 'unmapped'
|
||||
) as SimulationRequestBody['detected_fields'])
|
||||
: undefined;
|
||||
|
||||
return streamsRepositoryClient.fetch('POST /api/streams/{id}/processing/_simulate', {
|
||||
signal: abortController.signal,
|
||||
params: {
|
||||
|
@ -88,6 +102,7 @@ export const useProcessingSimulator = ({
|
|||
body: {
|
||||
documents: sampleDocs,
|
||||
processing: [processingDefinition],
|
||||
detected_fields,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
|
|
@ -15,6 +15,7 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext)
|
|||
loadTestFile(require.resolve('./flush_config'));
|
||||
loadTestFile(require.resolve('./assets/dashboard'));
|
||||
loadTestFile(require.resolve('./schema'));
|
||||
loadTestFile(require.resolve('./processing_simulate'));
|
||||
loadTestFile(require.resolve('./root_stream'));
|
||||
});
|
||||
}
|
||||
|
|
|
@ -0,0 +1,239 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
import { ClientRequestParamsOf } from '@kbn/server-route-repository-utils';
|
||||
import { StreamsRouteRepository } from '@kbn/streams-plugin/server';
|
||||
import { errors } from '@elastic/elasticsearch';
|
||||
import { disableStreams, enableStreams, forkStream, indexDocument } from './helpers/requests';
|
||||
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
|
||||
import {
|
||||
StreamsSupertestRepositoryClient,
|
||||
createStreamsRepositoryAdminClient,
|
||||
} from './helpers/repository_client';
|
||||
|
||||
async function simulateProcessingForStream(
|
||||
client: StreamsSupertestRepositoryClient,
|
||||
id: string,
|
||||
body: ClientRequestParamsOf<
|
||||
StreamsRouteRepository,
|
||||
'POST /api/streams/{id}/processing/_simulate'
|
||||
>['params']['body'],
|
||||
statusCode = 200
|
||||
) {
|
||||
return client
|
||||
.fetch('POST /api/streams/{id}/processing/_simulate', {
|
||||
params: {
|
||||
path: { id },
|
||||
body,
|
||||
},
|
||||
})
|
||||
.expect(statusCode);
|
||||
}
|
||||
|
||||
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
|
||||
const roleScopedSupertest = getService('roleScopedSupertest');
|
||||
const esClient = getService('es');
|
||||
|
||||
let apiClient: StreamsSupertestRepositoryClient;
|
||||
|
||||
describe('Processing Simulation', () => {
|
||||
const TEST_TIMESTAMP = '2025-01-01T00:00:10.000Z';
|
||||
const TEST_MESSAGE = `${TEST_TIMESTAMP} error test`;
|
||||
const TEST_HOST = 'test-host';
|
||||
|
||||
const testDoc = {
|
||||
'@timestamp': TEST_TIMESTAMP,
|
||||
message: TEST_MESSAGE,
|
||||
'host.name': TEST_HOST,
|
||||
'log.level': 'error',
|
||||
};
|
||||
|
||||
const basicGrokProcessor = {
|
||||
config: {
|
||||
grok: {
|
||||
field: 'message',
|
||||
patterns: [
|
||||
'%{TIMESTAMP_ISO8601:parsed_timestamp} %{LOGLEVEL:parsed_level} %{GREEDYDATA:parsed_message}',
|
||||
],
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const createTestDocument = (message = TEST_MESSAGE) => ({
|
||||
'@timestamp': TEST_TIMESTAMP,
|
||||
message,
|
||||
});
|
||||
|
||||
before(async () => {
|
||||
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
|
||||
|
||||
await enableStreams(apiClient);
|
||||
|
||||
// Create a test document
|
||||
await indexDocument(esClient, 'logs', testDoc);
|
||||
|
||||
// Create a forked stream for testing
|
||||
await forkStream(apiClient, 'logs', {
|
||||
stream: {
|
||||
name: 'logs.test',
|
||||
},
|
||||
condition: {
|
||||
field: 'host.name',
|
||||
operator: 'eq' as const,
|
||||
value: TEST_HOST,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
after(async () => {
|
||||
await disableStreams(apiClient);
|
||||
});
|
||||
|
||||
describe('Successful simulations', () => {
|
||||
describe('with valid documents', () => {
|
||||
it('should simulate additive processing', async () => {
|
||||
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
|
||||
processing: [basicGrokProcessor],
|
||||
documents: [createTestDocument()],
|
||||
});
|
||||
|
||||
expect(response.body.success_rate).to.be(1);
|
||||
expect(response.body.failure_rate).to.be(0);
|
||||
|
||||
const { isMatch, value } = response.body.documents[0];
|
||||
expect(isMatch).to.be(true);
|
||||
expect(value).to.have.property('parsed_timestamp', TEST_TIMESTAMP);
|
||||
expect(value).to.have.property('parsed_level', 'error');
|
||||
expect(value).to.have.property('parsed_message', 'test');
|
||||
});
|
||||
|
||||
it('should simulate with detected fields', async () => {
|
||||
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
|
||||
processing: [basicGrokProcessor],
|
||||
documents: [createTestDocument()],
|
||||
detected_fields: [
|
||||
{ name: 'parsed_timestamp', type: 'date' },
|
||||
{ name: 'parsed_level', type: 'keyword' },
|
||||
],
|
||||
});
|
||||
|
||||
const findField = (name: string) =>
|
||||
response.body.detected_fields.find((f: { name: string }) => f.name === name);
|
||||
|
||||
expect(response.body.detected_fields).to.have.length(3); // Including parsed_message
|
||||
expect(findField('parsed_timestamp')).to.have.property('type', 'date');
|
||||
expect(findField('parsed_level')).to.have.property('type', 'keyword');
|
||||
});
|
||||
});
|
||||
|
||||
describe('with mixed success/failure documents', () => {
|
||||
it('should provide accurate success/failure rates', async () => {
|
||||
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
|
||||
processing: [basicGrokProcessor],
|
||||
documents: [
|
||||
createTestDocument(),
|
||||
createTestDocument('invalid format'),
|
||||
createTestDocument(`${TEST_TIMESTAMP} info test`),
|
||||
],
|
||||
});
|
||||
|
||||
expect(response.body.success_rate).to.be(0.67);
|
||||
expect(response.body.failure_rate).to.be(0.33);
|
||||
expect(response.body.documents).to.have.length(3);
|
||||
expect(response.body.documents[0].isMatch).to.be(true);
|
||||
expect(response.body.documents[1].isMatch).to.be(false);
|
||||
expect(response.body.documents[2].isMatch).to.be(true);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('Failed simulations', () => {
|
||||
it('should fail with invalid processor configurations', async () => {
|
||||
await simulateProcessingForStream(
|
||||
apiClient,
|
||||
'logs.test',
|
||||
{
|
||||
processing: [
|
||||
{
|
||||
config: {
|
||||
grok: {
|
||||
field: 'message',
|
||||
patterns: ['%{INVALID_PATTERN:field}'],
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
documents: [createTestDocument('test message')],
|
||||
},
|
||||
400
|
||||
);
|
||||
});
|
||||
|
||||
it('should fail when attempting to update existing fields', async () => {
|
||||
const response = await simulateProcessingForStream(
|
||||
apiClient,
|
||||
'logs.test',
|
||||
{
|
||||
processing: [
|
||||
{
|
||||
config: {
|
||||
grok: {
|
||||
field: 'message',
|
||||
patterns: ['%{TIMESTAMP_ISO8601:parsed_timestamp} %{GREEDYDATA:message}'], // Overwrites existing message field
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
documents: [createTestDocument(`${TEST_TIMESTAMP} original message`)],
|
||||
},
|
||||
400
|
||||
);
|
||||
|
||||
expect((response.body as errors.ResponseError['body']).message).to.contain(
|
||||
'The processor is not additive to the documents. It might update fields [message]'
|
||||
);
|
||||
});
|
||||
|
||||
it('should fail with incompatible detected field mappings', async () => {
|
||||
const response = await simulateProcessingForStream(
|
||||
apiClient,
|
||||
'logs.test',
|
||||
{
|
||||
processing: [basicGrokProcessor],
|
||||
documents: [createTestDocument()],
|
||||
detected_fields: [
|
||||
{ name: 'parsed_timestamp', type: 'boolean' }, // Incompatible type
|
||||
],
|
||||
},
|
||||
400
|
||||
);
|
||||
|
||||
expect((response.body as errors.ResponseError['body']).message).to.contain(
|
||||
'The detected field types might not be compatible with these documents.'
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Partial success simulations', () => {
|
||||
it('should handle mixed success/failure documents', async () => {
|
||||
const response = await simulateProcessingForStream(apiClient, 'logs.test', {
|
||||
processing: [basicGrokProcessor],
|
||||
documents: [
|
||||
createTestDocument(), // Will succeed
|
||||
createTestDocument('invalid format'), // Will fail
|
||||
],
|
||||
});
|
||||
|
||||
expect(response.body.success_rate).to.be(0.5);
|
||||
expect(response.body.failure_rate).to.be(0.5);
|
||||
expect(response.body.documents[0].isMatch).to.be(true);
|
||||
expect(response.body.documents[1].isMatch).to.be(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue