🌊 Streams: Fix unnecessary rollovers (#213594)

This PR fixes the problem of unnecessary rollovers because of the way
the `stream.name` field is declared as a keyword. As we auto-magically
inject this field when building the mapping, it doesn't behave as
expected.

This PR makes the special handling more explicit by marking it as `type:
system` to make it clear that this field can't be controlled by the user
at all.

<img width="399" alt="Screenshot 2025-03-07 at 16 08 32"
src="https://github.com/user-attachments/assets/ea5cca8b-a487-4452-919c-4aafe43f992b"
/>

<img width="992" alt="Screenshot 2025-03-07 at 16 08 57"
src="https://github.com/user-attachments/assets/1f9455c7-43b5-4573-a76b-246ccde938a2"
/>

It's a little annoying having to deal with this special case everywhere
we handle fields, but I actually think it will be good to have this
expressed in typescript, because otherwise it's easy to forget and it
can bite us later (like changing the stream.name in a processor or
remapping it with a different type).
This commit is contained in:
Joe Reuter 2025-03-11 17:12:40 +01:00 committed by GitHub
parent b7412d94e7
commit 38893c939b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 203 additions and 31 deletions

View file

@ -23,10 +23,14 @@ export const FIELD_DEFINITION_TYPES = [
export type FieldDefinitionType = (typeof FIELD_DEFINITION_TYPES)[number];
// We redefine "first class" parameters
export type FieldDefinitionConfig = MappingProperty & {
type: FieldDefinitionType;
format?: string;
};
export type FieldDefinitionConfig =
| (MappingProperty & {
type: FieldDefinitionType;
format?: string;
})
| {
type: 'system';
};
// Parameters that we provide a generic (JSON blob) experience for
export type FieldDefinitionConfigAdvancedParameters = Omit<
@ -36,10 +40,15 @@ export type FieldDefinitionConfigAdvancedParameters = Omit<
export const fieldDefinitionConfigSchema: z.Schema<FieldDefinitionConfig> = z.intersection(
recursiveRecord,
z.object({
type: z.enum(FIELD_DEFINITION_TYPES),
format: z.optional(NonEmptyString),
})
z.union([
z.object({
type: z.enum(FIELD_DEFINITION_TYPES),
format: z.optional(NonEmptyString),
}),
z.object({
type: z.literal('system'),
}),
])
);
export interface FieldDefinition {

View file

@ -45,7 +45,11 @@ import {
syncUnwiredStreamDefinitionObjects,
syncWiredStreamDefinitionObjects,
} from './helpers/sync';
import { validateAncestorFields, validateDescendantFields } from './helpers/validate_fields';
import {
validateAncestorFields,
validateDescendantFields,
validateSystemFields,
} from './helpers/validate_fields';
import {
validateRootStreamChanges,
validateStreamChildrenChanges,
@ -462,6 +466,8 @@ export class StreamsClient {
fields: definition.ingest.wired.fields,
});
validateSystemFields(definition);
validateDescendantFields({
descendants,
fields: definition.ingest.wired.fields,

View file

@ -28,6 +28,9 @@ export function generateLayer(
): ClusterPutComponentTemplateRequest {
const properties: Record<string, MappingProperty> = {};
Object.entries(definition.ingest.wired.fields).forEach(([field, props]) => {
if (props.type === 'system') {
return;
}
const property: MappingProperty = {
type: props.type,
};

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { FieldDefinition, WiredStreamDefinition } from '@kbn/streams-schema';
import { FieldDefinition, WiredStreamDefinition, isRoot } from '@kbn/streams-schema';
import { MalformedFieldsError } from '../errors/malformed_fields_error';
export function validateAncestorFields({
@ -32,6 +32,19 @@ export function validateAncestorFields({
}
}
export function validateSystemFields(definition: WiredStreamDefinition) {
if (isRoot(definition.name)) {
// the root stream is allowed to have system fields
return;
}
// child streams are not supposed to have system fields
if (Object.values(definition.ingest.wired.fields).some((field) => field.type === 'system')) {
throw new MalformedFieldsError(
`Stream ${definition.name} is not allowed to have system fields`
);
}
}
export function validateDescendantFields({
descendants,
fields,

View file

@ -30,7 +30,7 @@ export const rootStreamDefinition: WiredStreamDefinition = {
type: 'keyword',
},
'stream.name': {
type: 'keyword',
type: 'system',
},
},
},

View file

@ -30,6 +30,7 @@ import {
NamedFieldDefinitionConfig,
FieldDefinitionConfig,
InheritedFieldDefinitionConfig,
FieldDefinition,
} from '@kbn/streams-schema';
import { mapValues, uniq, omit, isEmpty, uniqBy, some } from 'lodash';
import { StreamsClient } from '../../../lib/streams/client';
@ -59,6 +60,7 @@ export interface SimulationError {
type:
| 'generic_processor_failure'
| 'generic_simulation_failure'
| 'reserved_field_failure'
| 'non_additive_processor_failure';
}
@ -129,15 +131,23 @@ export const simulateProcessing = async ({
return prepareSimulationFailureResponse(pipelineSimulationResult.error);
}
const streamFields = await getStreamFields(streamsClient, params.path.name);
/* 4. Extract all the documents reports and processor metrics from the pipeline simulation */
const { docReports, processorsMetrics } = computePipelineSimulationResult(
pipelineSimulationResult.simulation,
simulationData.docs,
params.body.processing
params.body.processing,
streamFields
);
/* 5. Extract valid detected fields asserting existing mapped fields from stream and ancestors */
const detectedFields = await computeDetectedFields(processorsMetrics, streamsClient, params);
const detectedFields = await computeDetectedFields(
processorsMetrics,
streamsClient,
params,
streamFields
);
/* 6. Derive general insights and process final response body */
return prepareSimulationResponse(docReports, processorsMetrics, detectedFields);
@ -352,17 +362,22 @@ const conditionallyExecuteIngestSimulation = async (
const computePipelineSimulationResult = (
simulationResult: SuccessfulIngestSimulateResponse,
sampleDocs: Array<{ _source: FlattenRecord }>,
processing: ProcessorDefinitionWithId[]
processing: ProcessorDefinitionWithId[],
streamFields: FieldDefinition
): {
docReports: SimulationDocReport[];
processorsMetrics: Record<string, ProcessorMetrics>;
} => {
const processorsMap = initProcessorMetricsMap(processing);
const forbiddenFields = Object.entries(streamFields)
.filter(([, { type }]) => type === 'system')
.map(([name]) => name);
const docReports = simulationResult.docs.map((docResult, id) => {
const { errors, status, value } = getLastDoc(docResult, sampleDocs[id]._source);
const diff = computeSimulationDocDiff(docResult, sampleDocs[id]._source);
const diff = computeSimulationDocDiff(docResult, sampleDocs[id]._source, forbiddenFields);
docResult.processor_results.forEach((processor) => {
const procId = processor.tag;
@ -488,7 +503,8 @@ const getLastDoc = (docResult: SuccessfulIngestSimulateDocumentResult, sample: F
*/
const computeSimulationDocDiff = (
docResult: SuccessfulIngestSimulateDocumentResult,
sample: FlattenRecord
sample: FlattenRecord,
forbiddenFields: string[]
) => {
// Keep only the successful processors defined from the user, skipping the on_failure processors from the simulation
const successfulProcessors = docResult.processor_results.filter(isSuccessfulProcessor);
@ -530,7 +546,16 @@ const computeSimulationDocDiff = (
// We might have updated fields that are not present in the original document because are generated by the previous processors.
// We exclude them from the list of fields that make the processor non-additive.
const originalUpdatedFields = updatedFields.filter((field) => field in sample).sort();
const originalUpdatedFields = updatedFields
.filter((field) => field in sample && !forbiddenFields.includes(field))
.sort();
if (forbiddenFields.some((field) => updatedFields.includes(field))) {
diffResult.errors.push({
processor_id: nextDoc.processor_id,
type: 'reserved_field_failure',
message: `The processor is trying to update a reserved field [${forbiddenFields.join()}]`,
});
}
if (!isEmpty(originalUpdatedFields)) {
diffResult.errors.push({
processor_id: nextDoc.processor_id,
@ -586,7 +611,10 @@ const prepareSimulationFailureResponse = (error: SimulationError) => {
};
};
const getStreamFields = async (streamsClient: StreamsClient, streamName: string) => {
const getStreamFields = async (
streamsClient: StreamsClient,
streamName: string
): Promise<FieldDefinition> => {
const [stream, ancestors] = await Promise.all([
streamsClient.getStream(streamName),
streamsClient.getAncestors(streamName),
@ -605,9 +633,9 @@ const getStreamFields = async (streamsClient: StreamsClient, streamName: string)
const computeDetectedFields = async (
processorsMetrics: Record<string, ProcessorMetrics>,
streamsClient: StreamsClient,
params: ProcessingSimulationParams
params: ProcessingSimulationParams,
streamFields: FieldDefinition
): Promise<DetectedField[]> => {
const streamName = params.path.name;
const fields = Object.values(processorsMetrics).flatMap((metrics) => metrics.detected_fields);
const uniqueFields = uniq(fields);
@ -617,7 +645,6 @@ const computeDetectedFields = async (
return [];
}
const streamFields = await getStreamFields(streamsClient, streamName);
const confirmedValidDetectedFields = computeMappingProperties(params.body.detected_fields ?? []);
return uniqueFields.map((name) => {
@ -643,7 +670,14 @@ const computeSkippedRate = (docs: SimulationDocReport[]) => {
};
const computeMappingProperties = (detectedFields: NamedFieldDefinitionConfig[]) => {
return Object.fromEntries(detectedFields.map(({ name, type }) => [name, { type }]));
return Object.fromEntries(
detectedFields.flatMap(({ name, type }) => {
if (type === 'system') {
return [];
}
return [[name, { type }]];
})
);
};
/**

View file

@ -121,8 +121,16 @@ export const schemaFieldsSimulationRoute = createServerRoute({
throw new DefinitionNotFoundError(`Stream definition for ${params.path.name} not found.`);
}
const userFieldDefinitions = params.body.field_definitions.flatMap((field) => {
// filter out potential system fields since we can't simulate them anyway
if (field.type === 'system') {
return [];
}
return [field];
});
const propertiesForSample = Object.fromEntries(
params.body.field_definitions.map((field) => [field.name, { type: 'keyword' as const }])
userFieldDefinitions.map((field) => [field.name, { type: 'keyword' as const }])
);
const documentSamplesSearchBody = {
@ -163,7 +171,7 @@ export const schemaFieldsSimulationRoute = createServerRoute({
}
const propertiesForSimulation = Object.fromEntries(
params.body.field_definitions.map((field) => [
userFieldDefinitions.map((field) => [
field.name,
{
type: field.type,
@ -230,7 +238,7 @@ export const schemaFieldsSimulationRoute = createServerRoute({
// Convert the field definitions to a format that can be used in runtime mappings (match_only_text -> keyword)
const propertiesCompatibleWithRuntimeMappings = Object.fromEntries(
params.body.field_definitions.map((field) => [
userFieldDefinitions.map((field) => [
field.name,
{
type: field.type === 'match_only_text' ? 'keyword' : field.type,

View file

@ -45,6 +45,11 @@ export const FIELD_TYPE_MAP = {
defaultMessage: 'IP',
}),
},
system: {
label: i18n.translate('xpack.streams.streamDetailSchemaEditorFieldsTableSystemType', {
defaultMessage: 'System managed',
}),
},
} as const;
export type FieldTypeOption = keyof typeof FIELD_TYPE_MAP;

View file

@ -125,6 +125,10 @@ export const FieldActionsCell = ({ field }: { field: SchemaField }) => {
];
}, [closePopover, context, core, field, schemaEditorContext]);
if (field.type === 'system') {
return null;
}
return (
<EuiPopover
id={contextMenuPopoverId}

View file

@ -11,5 +11,10 @@ import { FieldNameWithIcon } from '@kbn/react-field';
import { FIELD_TYPE_MAP } from './constants';
export const FieldType = ({ type }: { type: FieldDefinitionConfig['type'] }) => {
return <FieldNameWithIcon name={FIELD_TYPE_MAP[type].label} type={type} />;
return (
<FieldNameWithIcon
name={FIELD_TYPE_MAP[type].label}
type={type !== 'system' ? type : undefined}
/>
);
};

View file

@ -63,7 +63,7 @@ export const useSchemaFields = ({
([name, field]) => ({
name,
type: field.type,
format: field.format,
format: 'format' in field ? field.format : undefined,
additionalParameters: getAdvancedParameters(name, field),
parent: field.from,
status: 'inherited',
@ -74,7 +74,7 @@ export const useSchemaFields = ({
([name, field]) => ({
name,
type: field.type,
format: field.format,
format: 'format' in field ? field.format : undefined,
additionalParameters: getAdvancedParameters(name, field),
parent: definition.stream.name,
status: 'mapped',

View file

@ -22,7 +22,7 @@ export type ProcessorDefinitionWithUIAttributes = WithUIAttributes<ProcessorDefi
export interface DetectedField {
name: string;
type?: FieldDefinitionType;
type?: FieldDefinitionType | 'system';
}
interface BaseFormState {

View file

@ -6,6 +6,7 @@
*/
import expect from '@kbn/expect';
import { IngestStreamUpsertRequest } from '@kbn/streams-schema';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import {
StreamsSupertestRepositoryClient,
@ -18,6 +19,7 @@ import {
forkStream,
indexAndAssertTargetStream,
indexDocument,
putStream,
} from './helpers/requests';
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
@ -353,6 +355,89 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
await indexAndAssertTargetStream(esClient, 'logs.weird-characters', doc1);
await indexAndAssertTargetStream(esClient, 'logs', doc2);
});
it('should allow to update field type to incompatible type', async () => {
const body: IngestStreamUpsertRequest = {
dashboards: [],
stream: {
ingest: {
lifecycle: { inherit: {} },
processing: [],
wired: {
fields: {
myfield: {
type: 'boolean',
},
},
routing: [],
},
},
},
};
await putStream(apiClient, 'logs.rollovertest', body, 200);
await putStream(
apiClient,
'logs.rollovertest',
{
...body,
stream: {
ingest: {
...body.stream.ingest,
wired: {
...body.stream.ingest.wired,
fields: {
myfield: {
type: 'keyword',
},
},
},
},
},
},
200
);
});
it('should not allow to update field type to system', async () => {
const body: IngestStreamUpsertRequest = {
dashboards: [],
stream: {
ingest: {
lifecycle: { inherit: {} },
processing: [],
wired: {
fields: {
myfield: {
type: 'system',
},
},
routing: [],
},
},
},
};
await putStream(apiClient, 'logs.willfail', body, 400);
});
it('should not roll over more often than necessary', async () => {
const expectedIndexCounts: Record<string, number> = {
logs: 1,
'logs.nginx': 1,
'logs.nginx.access': 1,
'logs.nginx.error': 1,
'logs.number-test': 1,
'logs.string-test': 1,
'logs.weird-characters': 1,
'logs.rollovertest': 2,
};
const dataStreams = await esClient.indices.getDataStream({
name: Object.keys(expectedIndexCounts).join(','),
});
const actualIndexCounts = Object.fromEntries(
dataStreams.data_streams.map((stream) => [stream.name, stream.indices.length])
);
expect(actualIndexCounts).to.eql(expectedIndexCounts);
});
});
});
}

View file

@ -33,7 +33,7 @@ const streams: StreamPutItem[] = [
type: 'keyword',
},
'stream.name': {
type: 'keyword',
type: 'system',
},
},
routing: [

View file

@ -35,7 +35,7 @@ const rootStreamDefinition: WiredStreamDefinition = {
type: 'keyword',
},
'stream.name': {
type: 'keyword',
type: 'system',
},
},
},