[Streams 🌊] Improve typing for samples and simulation docs (#209991)

## 📓 Summary

This change introduce a new recursive record type to let the documents
applied used for sampling and simulation not fail on the excessive
strict keys check.

```tsx
// Any primitive value allowed for schema validation, excludes symbols and bigint
type Primitive + zod primitive 
// Recursive object  
interface RecursiveRecord  + zod recursiveRecord 
```
This commit is contained in:
Marco Antonio Ghiani 2025-02-06 14:46:31 +01:00 committed by GitHub
parent 3e5f55dd02
commit f534b5466f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 50 additions and 21 deletions

View file

@ -18,3 +18,21 @@ export const streamDefinitionSchema: z.Schema<StreamDefinition> = z.union([
]);
export const isStreamDefinition = createIsNarrowSchema(z.unknown(), streamDefinitionSchema);
export type Primitive = string | number | boolean | null | undefined;
export const primitive: z.ZodType<Primitive> = z.union([
z.string(),
z.number(),
z.boolean(),
z.null(),
z.undefined(),
]);
export interface RecursiveRecord {
[key: PropertyKey]: Primitive | Primitive[] | RecursiveRecord;
}
export const recursiveRecord: z.ZodType<RecursiveRecord> = z.record(
z.union([primitive, z.array(primitive), z.lazy(() => recursiveRecord)])
);

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { conditionSchema } from '@kbn/streams-schema';
import { RecursiveRecord, conditionSchema } from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { ResyncStreamsResponse } from '../../../lib/streams/client';
import { getFields } from '../../../lib/streams/helpers/condition_fields';
@ -105,7 +105,7 @@ export const sampleStreamRoute = createServerRoute({
size: z.optional(z.number()),
}),
}),
handler: async ({ params, request, getScopedClients }): Promise<{ documents: unknown[] }> => {
handler: async ({ params, request, getScopedClients }) => {
const { scopedClusterClient } = await getScopedClients({ request });
const { read } = await checkAccess({ id: params.path.id, scopedClusterClient });
@ -162,7 +162,7 @@ export const sampleStreamRoute = createServerRoute({
...searchBody,
});
return { documents: results.hits.hits.map((hit) => hit._source) };
return { documents: results.hits.hits.map((hit) => hit._source) as RecursiveRecord[] };
},
});

View file

@ -11,8 +11,10 @@ import { IScopedClusterClient } from '@kbn/core/server';
import { calculateObjectDiff, flattenObject } from '@kbn/object-utils';
import {
FieldDefinitionConfig,
RecursiveRecord,
namedFieldDefinitionConfigSchema,
processorDefinitionSchema,
recursiveRecord,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { isEmpty } from 'lodash';
@ -28,7 +30,7 @@ const paramsSchema = z.object({
path: z.object({ id: z.string() }),
body: z.object({
processing: z.array(processorDefinitionSchema),
documents: z.array(z.record(z.unknown())),
documents: z.array(recursiveRecord),
detected_fields: z.array(namedFieldDefinitionConfigSchema).optional(),
}),
});
@ -139,6 +141,7 @@ const assertSimulationResult = (
}
// Assert that the processors are purely additive to the documents
const updatedFields = computeUpdatedFields(simulationDiffs);
if (!isEmpty(updatedFields)) {
throw new NonAdditiveProcessorError(
`The processor is not additive to the documents. It might update fields [${updatedFields.join()}]`
@ -148,7 +151,7 @@ const assertSimulationResult = (
const prepareSimulationResponse = (
simulationResult: any,
docs: Array<{ _source: Record<string, unknown> }>,
docs: Array<{ _source: RecursiveRecord }>,
simulationDiffs: ReturnType<typeof prepareSimulationDiffs>,
detectedFields?: ProcessingSimulateParams['body']['detected_fields']
) => {
@ -169,10 +172,10 @@ const prepareSimulationResponse = (
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const prepareSimulationDiffs = (
simulation: any,
sampleDocs: Array<{ _source: Record<string, unknown> }>
sampleDocs: Array<{ _source: RecursiveRecord }>
) => {
// Since we filter out failed documents, we need to map the simulation docs to the sample docs for later retrieval
const samplesToSimulationMap = new Map<any, { _source: Record<string, unknown> }>(
const samplesToSimulationMap = new Map<any, { _source: RecursiveRecord }>(
simulation.docs.map((entry: any, id: number) => [entry.doc, sampleDocs[id]])
);
@ -202,8 +205,8 @@ const computeUpdatedFields = (simulationDiff: ReturnType<typeof prepareSimulatio
// TODO: update type once Kibana updates to elasticsearch-js 8.17
const computeSimulationDocuments = (
simulation: any,
sampleDocs: Array<{ _source: Record<string, unknown> }>
): Array<{ isMatch: boolean; value: Record<string, unknown> }> => {
sampleDocs: Array<{ _source: RecursiveRecord }>
): Array<{ isMatch: boolean; value: RecursiveRecord }> => {
return simulation.docs.map((entry: any, id: number) => {
// If every processor was successful, return and flatten the simulation doc from the last processor
if (isSuccessfulDocument(entry)) {

View file

@ -6,7 +6,11 @@
*/
import { z } from '@kbn/zod';
import { getFlattenedObject } from '@kbn/std';
import { fieldDefinitionConfigSchema, isWiredStreamDefinition } from '@kbn/streams-schema';
import {
RecursiveRecord,
fieldDefinitionConfigSchema,
isWiredStreamDefinition,
} from '@kbn/streams-schema';
import { checkAccess } from '../../../lib/streams/stream_crud';
import { createServerRoute } from '../../create_server_route';
import { DefinitionNotFoundError } from '../../../lib/streams/errors/definition_not_found_error';
@ -107,7 +111,7 @@ export const schemaFieldsSimulationRoute = createServerRoute({
}): Promise<{
status: 'unknown' | 'success' | 'failure';
simulationError: string | null;
documentsWithRuntimeFieldsApplied: unknown[] | null;
documentsWithRuntimeFieldsApplied: RecursiveRecord[] | null;
}> => {
const { scopedClusterClient } = await getScopedClients({ request });
@ -171,7 +175,7 @@ export const schemaFieldsSimulationRoute = createServerRoute({
_index: params.path.id,
_id: hit._id,
_source: Object.fromEntries(
Object.entries(getFlattenedObject(hit._source as Record<string, unknown>)).filter(
Object.entries(getFlattenedObject(hit._source as RecursiveRecord)).filter(
([k]) => fieldDefinitionKeys.includes(k) || k === '@timestamp'
)
),
@ -260,7 +264,7 @@ export const schemaFieldsSimulationRoute = createServerRoute({
if (!hit.fields) {
return {};
}
return Object.keys(hit.fields).reduce<Record<string, unknown>>((acc, field) => {
return Object.keys(hit.fields).reduce<RecursiveRecord>((acc, field) => {
acc[field] = hit.fields![field][0];
return acc;
}, {});

View file

@ -6,6 +6,7 @@
*/
import { EuiDataGrid } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { RecursiveRecord } from '@kbn/streams-schema';
import { isEmpty } from 'lodash';
import React, { useMemo } from 'react';
@ -13,7 +14,7 @@ export function PreviewTable({
documents,
displayColumns,
}: {
documents: unknown[];
documents: RecursiveRecord[];
displayColumns?: string[];
}) {
const columns = useMemo(() => {
@ -57,7 +58,7 @@ export function PreviewTable({
if (!doc || typeof doc !== 'object') {
return '';
}
const value = (doc as Record<string, unknown>)[columnId];
const value = (doc as RecursiveRecord)[columnId];
if (value === undefined || value === null) {
return '';
}

View file

@ -14,6 +14,7 @@ import {
Condition,
processorDefinitionSchema,
isSchema,
RecursiveRecord,
} from '@kbn/streams-schema';
import { IHttpFetchError, ResponseErrorBody } from '@kbn/core/public';
import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range';
@ -39,7 +40,7 @@ export interface UseProcessingSimulatorReturn {
hasLiveChanges: boolean;
error?: IHttpFetchError<ResponseErrorBody>;
isLoading: boolean;
samples: Array<Record<PropertyKey, unknown>>;
samples: RecursiveRecord[];
simulation?: Simulation | null;
tableColumns: TableColumn[];
refreshSamples: () => void;
@ -141,7 +142,7 @@ export const useProcessingSimulator = ({
{ disableToastOnError: true }
);
const sampleDocs = samples?.documents as Array<Record<PropertyKey, unknown>>;
const sampleDocs = samples?.documents;
const {
loading: isLoadingSimulation,
@ -149,7 +150,7 @@ export const useProcessingSimulator = ({
error: simulationError,
} = useStreamsAppFetch(
({ signal }) => {
if (!definition || isEmpty(sampleDocs) || isEmpty(liveDraftProcessors)) {
if (!definition || isEmpty<RecursiveRecord[]>(sampleDocs) || isEmpty(liveDraftProcessors)) {
return Promise.resolve(null);
}

View file

@ -20,6 +20,7 @@ import { i18n } from '@kbn/i18n';
import { TimeRange } from '@kbn/es-query';
import { flattenObject } from '@kbn/object-utils';
import { isEmpty } from 'lodash';
import { RecursiveRecord } from '@kbn/streams-schema';
import { useKibana } from '../../hooks/use_kibana';
import { StreamsAppSearchBar, StreamsAppSearchBarProps } from '../streams_app_search_bar';
import { PreviewTable } from '../preview_table';
@ -51,7 +52,7 @@ export const ProcessorOutcomePreview = ({
const simulationDocuments = useMemo(() => {
if (!simulation?.documents) {
return samples.map((doc) => flattenObject(doc));
return samples.map((doc) => flattenObject(doc)) as RecursiveRecord[];
}
const filterDocuments = (filter: DocsFilterOption) => {
@ -210,7 +211,7 @@ const OutcomeControls = ({
};
interface OutcomePreviewTableProps {
documents: Array<Record<PropertyKey, unknown>>;
documents: RecursiveRecord[];
columns: string[];
}

View file

@ -42,6 +42,7 @@ import {
IngestUpsertRequest,
getAncestorsAndSelf,
WiredStreamGetResponse,
RecursiveRecord,
} from '@kbn/streams-schema';
import { useUnsavedChangesPrompt } from '@kbn/unsaved-changes-prompt';
import { AbortableAsyncState } from '@kbn/observability-utils-browser/hooks/use_abortable_async';
@ -617,7 +618,7 @@ function PreviewPanelIllustration({
}: {
routingAppState: ReturnType<typeof useRoutingState>;
previewSampleFetch: AbortableAsyncState<{
documents: unknown[];
documents: RecursiveRecord[];
}>;
}) {
return (