🌊 Fix "stream" field conflicting with stream.name (#211004)

Currently, streams logs with a `stream` field fail ingest because the
set processor tries to set a
```
"stream": {
  "name": "<name field>"
}
```

which doesn't work if `"stream": "abc"` is already in the document (some
shippers do this, e.g. docker or kubernetes)

Using a painless processor this problem can be avoided and you can have
`"stream"` and `"stream.name"`
This commit is contained in:
Joe Reuter 2025-02-13 19:27:16 +01:00 committed by GitHub
parent 3f7b13e792
commit 231733429c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 31 additions and 4 deletions

View file

@ -32,7 +32,7 @@ export function generateIngestPipeline(
{
script: {
source: `
if (ctx.stream?.name != params.parentName) {
if (ctx["stream.name"] != params.parentName) {
throw new IllegalArgumentException('stream.name is not set properly - did you send the document directly to a child stream instead of the main logs stream?');
}
`,
@ -45,9 +45,12 @@ export function generateIngestPipeline(
]
: []),
{
set: {
field: 'stream.name',
value: definition.name,
script: {
source: 'ctx["stream.name"] = params.field',
lang: 'painless',
params: {
field: definition.name,
},
},
},
...((isWiredStream && formatToIngestProcessors(definition.ingest.processing)) || []),

View file

@ -143,6 +143,30 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
});
it('Index a doc with a stream field', async () => {
const doc = {
'@timestamp': '2024-01-01T00:00:00.000Z',
message: JSON.stringify({
'log.level': 'info',
'log.logger': 'nginx',
message: 'test',
stream: 'somethingelse', // a field named stream should work as well
}),
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
const result = await fetchDocument(esClient, 'logs', response._id);
expect(result._index).to.match(/^\.ds\-logs-.*/);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:00.000Z',
message: 'test',
'log.level': 'info',
'log.logger': 'nginx',
'stream.name': 'logs',
stream: 'somethingelse',
});
});
it('Fork logs to logs.nginx', async () => {
const body = {
stream: {