🌊 Streams: Add stream.name field (#208514)

Fixes https://github.com/elastic/streams-program/issues/103

This PR adds a constant keyword `stream.name` field that functions
similar to the `data_stream.dataset` field.

It will allow us to clearly associate a document or a set of documents
with their streams and filter data per stream efficiently. It's also
used to validate that documents are sent properly to the root stream
instead of targeting specific child streams directly.

The `stream.name` field reports as `keyword` in the UI, but is actually
mapped per index template as a constant keyword set to the respective
value.
This commit is contained in:
Joe Reuter 2025-01-30 14:25:19 +01:00 committed by GitHub
parent 67719f2b57
commit 15fcb182e3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 81 additions and 3 deletions

View file

@ -35,6 +35,14 @@ export function generateIndexTemplate(id: string, isServerless: boolean) {
default_pipeline: getProcessingPipelineName(id),
},
},
mappings: {
properties: {
'stream.name': {
type: 'constant_keyword' as const,
value: id,
},
},
},
},
allow_auto_create: true,
// ignore missing component templates to be more robust against out-of-order syncs

View file

@ -5,17 +5,50 @@
* 2.0.
*/
import { StreamDefinition, isRoot } from '@kbn/streams-schema';
import {
StreamDefinition,
getParentId,
isRoot,
isWiredStreamDefinition,
} from '@kbn/streams-schema';
import { IngestPutPipelineRequest } from '@elastic/elasticsearch/lib/api/types';
import { ASSET_VERSION } from '../../../../common/constants';
import { logsDefaultPipelineProcessors } from './logs_default_pipeline';
import { getProcessingPipelineName } from './name';
import { formatToIngestProcessors } from '../helpers/processing';
export function generateIngestPipeline(id: string, definition: StreamDefinition) {
export function generateIngestPipeline(
id: string,
definition: StreamDefinition
): IngestPutPipelineRequest {
const isWiredStream = isWiredStreamDefinition(definition);
return {
id: getProcessingPipelineName(id),
processors: [
...(isRoot(definition.name) ? logsDefaultPipelineProcessors : []),
...(!isRoot(definition.name) && isWiredStream
? [
{
script: {
source: `
if (ctx.stream?.name != params.parentName) {
throw new IllegalArgumentException('stream.name is not set properly - did you send the document directly to a child stream instead of the main logs stream?');
}
`,
lang: 'painless',
params: {
parentName: getParentId(definition.name),
},
},
},
]
: []),
{
set: {
field: 'stream.name',
value: definition.name,
},
},
...formatToIngestProcessors(definition.ingest.processing),
{
pipeline: {

View file

@ -27,6 +27,9 @@ export const rootStreamDefinition: WiredStreamDefinition = {
'log.level': {
type: 'keyword',
},
'stream.name': {
type: 'keyword',
},
},
},
},

View file

@ -189,6 +189,12 @@ export const schemaFieldsSimulationRoute = createServerRoute({
},
},
},
// prevent double-processing
pipeline_substitutions: {
[`${params.path.id}@stream.processing`]: {
processors: [],
},
},
};
// TODO: We should be using scopedClusterClient.asCurrentUser.simulate.ingest() but the ES JS lib currently has a bug. The types also aren't available yet, so we use any.

View file

@ -121,6 +121,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
inner_timestamp: '2023-01-01T00:00:10.000Z',
message2: 'test',
'log.level': 'error',
'stream.name': 'logs.nginx',
});
});
@ -143,6 +144,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
'log.logger': 'mylogger',
message2: 'mylogger this is the message',
message3: 'this is the message',
'stream.name': 'logs.nginx',
});
});

View file

@ -41,6 +41,9 @@ const streams: StreamPutItem[] = [
'log.level': {
type: 'keyword',
},
'stream.name': {
type: 'keyword',
},
},
},
routing: [

View file

@ -139,6 +139,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
message: 'test',
'log.level': 'info',
'log.logger': 'nginx',
'stream.name': 'logs',
});
});
@ -176,6 +177,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
message: 'test',
'log.level': 'info',
'log.logger': 'nginx',
'stream.name': 'logs.nginx',
});
});
@ -209,6 +211,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
message: 'test',
'log.level': 'info',
'log.logger': 'nginx',
'stream.name': 'logs.nginx.access',
});
});
@ -242,6 +245,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
message: 'test',
'log.level': 'error',
'log.logger': 'nginx',
'stream.name': 'logs.nginx',
});
});

View file

@ -8,7 +8,7 @@
import expect from '@kbn/expect';
import { IngestStreamUpsertRequest, WiredStreamDefinition } from '@kbn/streams-schema';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import { disableStreams, enableStreams, putStream } from './helpers/requests';
import { disableStreams, enableStreams, indexDocument, putStream } from './helpers/requests';
import {
StreamsSupertestRepositoryClient,
createStreamsRepositoryAdminClient,
@ -34,6 +34,9 @@ const rootStreamDefinition: WiredStreamDefinition = {
'log.level': {
type: 'keyword',
},
'stream.name': {
type: 'keyword',
},
},
},
},
@ -42,6 +45,7 @@ const rootStreamDefinition: WiredStreamDefinition = {
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const roleScopedSupertest = getService('roleScopedSupertest');
let apiClient: StreamsSupertestRepositoryClient;
const esClient = getService('es');
describe('Root stream', () => {
before(async () => {
@ -123,5 +127,20 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const response = await putStream(apiClient, 'logs', body);
expect(response).to.have.property('acknowledged', true);
});
it('Should not allow sending data directly to a child stream', async () => {
const doc = {
'@timestamp': '2024-01-01T00:00:20.000Z',
message: 'test',
};
let threw = false;
try {
await indexDocument(esClient, 'logs.gcpcloud', doc);
} catch (e) {
threw = true;
expect(e.message).to.contain('stream.name is not set properly');
}
expect(threw).to.be(true);
});
});
}