🌊 Streams: Improve routing condition building (#212661)

This PR fixes two cases for routing:
* Handle special characters in field names like `@` or whitespace
* Do not fail on object/scalar mismatch

It does this by pulling the relevant fields in a safe way into a local
hashmap instead of accessing them inline using the `.?` operator.
This commit is contained in:
Joe Reuter 2025-02-28 11:05:09 +01:00 committed by GitHub
parent 8d7f34e2b0
commit a5c35b80a4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 147 additions and 53 deletions

View file

@ -11,55 +11,55 @@ const operatorConditionAndResults = [
{
condition: { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' },
result:
'(ctx.log?.logger !== null && ((ctx.log?.logger instanceof Number && ctx.log?.logger.toString() == "nginx_proxy") || ctx.log?.logger == "nginx_proxy"))',
"(relevant_fields['log.logger'] !== null && ((relevant_fields['log.logger'] instanceof Number && relevant_fields['log.logger'].toString() == \"nginx_proxy\") || relevant_fields['log.logger'] == \"nginx_proxy\"))",
},
{
condition: { field: 'log.logger', operator: 'neq' as const, value: 'nginx_proxy' },
result:
'(ctx.log?.logger !== null && ((ctx.log?.logger instanceof Number && ctx.log?.logger.toString() != "nginx_proxy") || ctx.log?.logger != "nginx_proxy"))',
"(relevant_fields['log.logger'] !== null && ((relevant_fields['log.logger'] instanceof Number && relevant_fields['log.logger'].toString() != \"nginx_proxy\") || relevant_fields['log.logger'] != \"nginx_proxy\"))",
},
{
condition: { field: 'http.response.status_code', operator: 'lt' as const, value: 500 },
result:
'(ctx.http?.response?.status_code !== null && ((ctx.http?.response?.status_code instanceof String && Float.parseFloat(ctx.http?.response?.status_code) < 500) || ctx.http?.response?.status_code < 500))',
"(relevant_fields['http.response.status_code'] !== null && ((relevant_fields['http.response.status_code'] instanceof String && Float.parseFloat(relevant_fields['http.response.status_code']) < 500) || relevant_fields['http.response.status_code'] < 500))",
},
{
condition: { field: 'http.response.status_code', operator: 'lte' as const, value: 500 },
result:
'(ctx.http?.response?.status_code !== null && ((ctx.http?.response?.status_code instanceof String && Float.parseFloat(ctx.http?.response?.status_code) <= 500) || ctx.http?.response?.status_code <= 500))',
"(relevant_fields['http.response.status_code'] !== null && ((relevant_fields['http.response.status_code'] instanceof String && Float.parseFloat(relevant_fields['http.response.status_code']) <= 500) || relevant_fields['http.response.status_code'] <= 500))",
},
{
condition: { field: 'http.response.status_code', operator: 'gt' as const, value: 500 },
result:
'(ctx.http?.response?.status_code !== null && ((ctx.http?.response?.status_code instanceof String && Float.parseFloat(ctx.http?.response?.status_code) > 500) || ctx.http?.response?.status_code > 500))',
"(relevant_fields['http.response.status_code'] !== null && ((relevant_fields['http.response.status_code'] instanceof String && Float.parseFloat(relevant_fields['http.response.status_code']) > 500) || relevant_fields['http.response.status_code'] > 500))",
},
{
condition: { field: 'http.response.status_code', operator: 'gte' as const, value: 500 },
result:
'(ctx.http?.response?.status_code !== null && ((ctx.http?.response?.status_code instanceof String && Float.parseFloat(ctx.http?.response?.status_code) >= 500) || ctx.http?.response?.status_code >= 500))',
"(relevant_fields['http.response.status_code'] !== null && ((relevant_fields['http.response.status_code'] instanceof String && Float.parseFloat(relevant_fields['http.response.status_code']) >= 500) || relevant_fields['http.response.status_code'] >= 500))",
},
{
condition: { field: 'log.logger', operator: 'startsWith' as const, value: 'nginx' },
result:
'(ctx.log?.logger !== null && ((ctx.log?.logger instanceof Number && ctx.log?.logger.toString().startsWith("nginx")) || ctx.log?.logger.startsWith("nginx")))',
"(relevant_fields['log.logger'] !== null && ((relevant_fields['log.logger'] instanceof Number && relevant_fields['log.logger'].toString().startsWith(\"nginx\")) || relevant_fields['log.logger'].startsWith(\"nginx\")))",
},
{
condition: { field: 'log.logger', operator: 'endsWith' as const, value: 'proxy' },
result:
'(ctx.log?.logger !== null && ((ctx.log?.logger instanceof Number && ctx.log?.logger.toString().endsWith("proxy")) || ctx.log?.logger.endsWith("proxy")))',
"(relevant_fields['log.logger'] !== null && ((relevant_fields['log.logger'] instanceof Number && relevant_fields['log.logger'].toString().endsWith(\"proxy\")) || relevant_fields['log.logger'].endsWith(\"proxy\")))",
},
{
condition: { field: 'log.logger', operator: 'contains' as const, value: 'proxy' },
result:
'(ctx.log?.logger !== null && ((ctx.log?.logger instanceof Number && ctx.log?.logger.toString().contains("proxy")) || ctx.log?.logger.contains("proxy")))',
"(relevant_fields['log.logger'] !== null && ((relevant_fields['log.logger'] instanceof Number && relevant_fields['log.logger'].toString().contains(\"proxy\")) || relevant_fields['log.logger'].contains(\"proxy\")))",
},
{
condition: { field: 'log.logger', operator: 'exists' as const },
result: 'ctx.log?.logger !== null',
result: "relevant_fields['log.logger'] !== null",
},
{
condition: { field: 'log.logger', operator: 'notExists' as const },
result: 'ctx.log?.logger == null',
result: "relevant_fields['log.logger'] == null",
},
];
@ -79,7 +79,7 @@ describe('conditionToPainless', () => {
value: '500',
};
expect(conditionToStatement(condition)).toEqual(
'(ctx.http?.response?.status_code !== null && ((ctx.http?.response?.status_code instanceof String && Float.parseFloat(ctx.http?.response?.status_code) > 500) || ctx.http?.response?.status_code > 500))'
"(relevant_fields['http.response.status_code'] !== null && ((relevant_fields['http.response.status_code'] instanceof String && Float.parseFloat(relevant_fields['http.response.status_code']) > 500) || relevant_fields['http.response.status_code'] > 500))"
);
});
test('ensure string comparasion works with number values', () => {
@ -89,7 +89,7 @@ describe('conditionToPainless', () => {
value: 500,
};
expect(conditionToStatement(condition)).toEqual(
'(ctx.message !== null && ((ctx.message instanceof Number && ctx.message.toString().contains("500")) || ctx.message.contains("500")))'
"(relevant_fields['message'] !== null && ((relevant_fields['message'] instanceof Number && relevant_fields['message'].toString().contains(\"500\")) || relevant_fields['message'].contains(\"500\")))"
);
});
});
@ -104,7 +104,7 @@ describe('conditionToPainless', () => {
};
expect(
expect(conditionToStatement(condition)).toEqual(
'(ctx.log?.logger !== null && ((ctx.log?.logger instanceof Number && ctx.log?.logger.toString() == "nginx_proxy") || ctx.log?.logger == "nginx_proxy")) && (ctx.log?.level !== null && ((ctx.log?.level instanceof Number && ctx.log?.level.toString() == "error") || ctx.log?.level == "error"))'
"(relevant_fields['log.logger'] !== null && ((relevant_fields['log.logger'] instanceof Number && relevant_fields['log.logger'].toString() == \"nginx_proxy\") || relevant_fields['log.logger'] == \"nginx_proxy\")) && (relevant_fields['log.level'] !== null && ((relevant_fields['log.level'] instanceof Number && relevant_fields['log.level'].toString() == \"error\") || relevant_fields['log.level'] == \"error\"))"
)
);
});
@ -120,7 +120,7 @@ describe('conditionToPainless', () => {
};
expect(
expect(conditionToStatement(condition)).toEqual(
'(ctx.log?.logger !== null && ((ctx.log?.logger instanceof Number && ctx.log?.logger.toString() == "nginx_proxy") || ctx.log?.logger == "nginx_proxy")) || (ctx.log?.level !== null && ((ctx.log?.level instanceof Number && ctx.log?.level.toString() == "error") || ctx.log?.level == "error"))'
"(relevant_fields['log.logger'] !== null && ((relevant_fields['log.logger'] instanceof Number && relevant_fields['log.logger'].toString() == \"nginx_proxy\") || relevant_fields['log.logger'] == \"nginx_proxy\")) || (relevant_fields['log.level'] !== null && ((relevant_fields['log.level'] instanceof Number && relevant_fields['log.level'].toString() == \"error\") || relevant_fields['log.level'] == \"error\"))"
)
);
});
@ -141,7 +141,7 @@ describe('conditionToPainless', () => {
};
expect(
expect(conditionToStatement(condition)).toEqual(
'(ctx.log?.logger !== null && ((ctx.log?.logger instanceof Number && ctx.log?.logger.toString() == "nginx_proxy") || ctx.log?.logger == "nginx_proxy")) && ((ctx.log?.level !== null && ((ctx.log?.level instanceof Number && ctx.log?.level.toString() == "error") || ctx.log?.level == "error")) || (ctx.log?.level !== null && ((ctx.log?.level instanceof Number && ctx.log?.level.toString() == "ERROR") || ctx.log?.level == "ERROR")))'
"(relevant_fields['log.logger'] !== null && ((relevant_fields['log.logger'] instanceof Number && relevant_fields['log.logger'].toString() == \"nginx_proxy\") || relevant_fields['log.logger'] == \"nginx_proxy\")) && ((relevant_fields['log.level'] !== null && ((relevant_fields['log.level'] instanceof Number && relevant_fields['log.level'].toString() == \"error\") || relevant_fields['log.level'] == \"error\")) || (relevant_fields['log.level'] !== null && ((relevant_fields['log.level'] instanceof Number && relevant_fields['log.level'].toString() == \"ERROR\") || relevant_fields['log.level'] == \"ERROR\")))"
)
);
});
@ -164,7 +164,7 @@ describe('conditionToPainless', () => {
};
expect(
expect(conditionToStatement(condition)).toEqual(
'((ctx.log?.logger !== null && ((ctx.log?.logger instanceof Number && ctx.log?.logger.toString() == "nginx_proxy") || ctx.log?.logger == "nginx_proxy")) || (ctx.service?.name !== null && ((ctx.service?.name instanceof Number && ctx.service?.name.toString() == "nginx") || ctx.service?.name == "nginx"))) && ((ctx.log?.level !== null && ((ctx.log?.level instanceof Number && ctx.log?.level.toString() == "error") || ctx.log?.level == "error")) || (ctx.log?.level !== null && ((ctx.log?.level instanceof Number && ctx.log?.level.toString() == "ERROR") || ctx.log?.level == "ERROR")))'
"((relevant_fields['log.logger'] !== null && ((relevant_fields['log.logger'] instanceof Number && relevant_fields['log.logger'].toString() == \"nginx_proxy\") || relevant_fields['log.logger'] == \"nginx_proxy\")) || (relevant_fields['service.name'] !== null && ((relevant_fields['service.name'] instanceof Number && relevant_fields['service.name'].toString() == \"nginx\") || relevant_fields['service.name'] == \"nginx\"))) && ((relevant_fields['log.level'] !== null && ((relevant_fields['log.level'] instanceof Number && relevant_fields['log.level'].toString() == \"error\") || relevant_fields['log.level'] == \"error\")) || (relevant_fields['log.level'] !== null && ((relevant_fields['log.level'] instanceof Number && relevant_fields['log.level'].toString() == \"ERROR\") || relevant_fields['log.level'] == \"ERROR\")))"
)
);
});
@ -173,21 +173,29 @@ describe('conditionToPainless', () => {
test('wrapped with type checks for uinary conditions', () => {
const condition = { field: 'log', operator: 'exists' as const };
expect(conditionToPainless(condition)).toEqual(`try {
if (ctx.log !== null) {
return true;
}
return false;
} catch (Exception e) {
return false;
}
`);
expect(conditionToPainless(condition)).toMatchInlineSnapshot(`
"
def relevant_fields = [:];
relevant_fields['log'] = ctx['log'];
try {
if (relevant_fields['log'] !== null) {
return true;
}
return false;
} catch (Exception e) {
return false;
}
"
`);
});
test('wrapped with typechecks and try/catch', () => {
const condition = {
and: [
{ field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' },
{ field: 'log.logger.name', operator: 'eq' as const, value: 'nginx_proxy' },
{
or: [
{ field: 'log.level', operator: 'eq' as const, value: 'error' },
@ -196,20 +204,45 @@ describe('conditionToPainless', () => {
},
],
};
expect(
expect(conditionToPainless(condition))
.toEqual(`if (ctx.log?.logger instanceof Map || ctx.log?.level instanceof Map) {
return false;
}
try {
if ((ctx.log?.logger !== null && ((ctx.log?.logger instanceof Number && ctx.log?.logger.toString() == "nginx_proxy") || ctx.log?.logger == "nginx_proxy")) && ((ctx.log?.level !== null && ((ctx.log?.level instanceof Number && ctx.log?.level.toString() == "error") || ctx.log?.level == "error")) || (ctx.log?.level !== null && ((ctx.log?.level instanceof Number && ctx.log?.level.toString() == "ERROR") || ctx.log?.level == "ERROR")))) {
return true;
}
return false;
} catch (Exception e) {
return false;
}
`)
);
expect(conditionToPainless(condition)).toMatchInlineSnapshot(`
"
def relevant_fields = [:];
relevant_fields['log.logger.name'] = ctx['log'];
if (relevant_fields['log.logger.name'] != null) {
if (relevant_fields['log.logger.name'] instanceof Map) {
relevant_fields['log.logger.name'] = relevant_fields['log.logger.name']['logger'];
} else {
relevant_fields['log.logger.name'] = null;
}
}
if (relevant_fields['log.logger.name'] != null) {
if (relevant_fields['log.logger.name'] instanceof Map) {
relevant_fields['log.logger.name'] = relevant_fields['log.logger.name']['name'];
} else {
relevant_fields['log.logger.name'] = null;
}
}
relevant_fields['log.level'] = ctx['log'];
if (relevant_fields['log.level'] != null) {
if (relevant_fields['log.level'] instanceof Map) {
relevant_fields['log.level'] = relevant_fields['log.level']['level'];
} else {
relevant_fields['log.level'] = null;
}
}
try {
if ((relevant_fields['log.logger.name'] !== null && ((relevant_fields['log.logger.name'] instanceof Number && relevant_fields['log.logger.name'].toString() == \\"nginx_proxy\\") || relevant_fields['log.logger.name'] == \\"nginx_proxy\\")) && ((relevant_fields['log.level'] !== null && ((relevant_fields['log.level'] instanceof Number && relevant_fields['log.level'].toString() == \\"error\\") || relevant_fields['log.level'] == \\"error\\")) || (relevant_fields['log.level'] !== null && ((relevant_fields['log.level'] instanceof Number && relevant_fields['log.level'].toString() == \\"ERROR\\") || relevant_fields['log.level'] == \\"ERROR\\")))) {
return true;
}
return false;
} catch (Exception e) {
return false;
}
"
`);
});
});

View file

@ -21,10 +21,10 @@ import {
function safePainlessField(conditionOrField: FilterCondition | string) {
if (typeof conditionOrField === 'string') {
return `ctx.${conditionOrField.split('.').join('?.')}`;
return `relevant_fields['${conditionOrField}']`;
}
return `ctx.${conditionOrField.field.split('.').join('?.')}`;
return `relevant_fields['${conditionOrField.field}']`;
}
function encodeValue(value: string | number | boolean) {
@ -106,7 +106,7 @@ function unaryToPainless(condition: UnaryFilterCondition) {
}
function extractAllFields(condition: Condition, fields: string[] = []): string[] {
if (isFilterCondition(condition) && !isUnaryFilterCondition(condition)) {
if (isFilterCondition(condition)) {
return uniq([...fields, condition.field]);
} else if (isAndCondition(condition)) {
return uniq(condition.and.map((cond) => extractAllFields(cond, fields)).flat());
@ -116,6 +116,28 @@ function extractAllFields(condition: Condition, fields: string[] = []): string[]
return uniq(fields);
}
function generateFieldDefinition(field: string) {
const parts = field.split('.');
const firstPart = parts[0];
let code = `relevant_fields['${field}'] = ctx['${firstPart}'];\n`;
for (let i = 1; i < parts.length; i++) {
code += `if (relevant_fields['${field}'] != null) {
if (relevant_fields['${field}'] instanceof Map) {
relevant_fields['${field}'] = relevant_fields['${field}']['${parts[i]}'];
} else {
relevant_fields['${field}'] = null;
}
}\n`;
}
return code;
}
function generateFieldDefinitions(fields: string[]) {
return `
${fields.map(generateFieldDefinition).join('\n')}
`;
}
export function conditionToStatement(condition: Condition, nested = false): string {
if (isFilterCondition(condition)) {
if (isUnaryFilterCondition(condition)) {
@ -152,16 +174,14 @@ export function conditionToPainless(condition: Condition): string {
}
const fields = extractAllFields(condition);
let fieldCheck = '';
let fieldDefinitions = '';
if (fields.length !== 0) {
fieldCheck = `if (${fields
.map((field) => `${safePainlessField(field)} instanceof Map`)
.join(' || ')}) {
return false;
}
`;
fieldDefinitions = generateFieldDefinitions(fields);
}
return `${fieldCheck}try {
return `
def relevant_fields = [:];
${fieldDefinitions}
try {
if (${conditionToStatement(condition)}) {
return true;
}

View file

@ -340,6 +340,47 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const response2 = await indexDocument(esClient, 'logs', doc2);
expect(response2.result).to.eql('created');
});
it('Fork logs to logs.weird-characters', async () => {
const body = {
stream: {
name: 'logs.weird-characters',
},
if: {
or: [
{ field: '@abc.weird fieldname', operator: 'contains' as const, value: 'route_it' },
],
},
};
const response = await forkStream(apiClient, 'logs', body);
expect(response).to.have.property('acknowledged', true);
});
it('Index documents with weird characters in their field names correctly', async () => {
const doc1 = {
'@timestamp': '2024-01-01T00:00:20.000Z',
'@abc': {
'weird fieldname': 'Please route_it',
},
};
const doc2 = {
'@timestamp': '2024-01-01T00:00:20.000Z',
'@abc': {
'weird fieldname': 'Keep where it is',
},
};
const response1 = await indexDocument(esClient, 'logs', doc1);
expect(response1.result).to.eql('created');
const result1 = await fetchDocument(esClient, 'logs.weird-characters', response1._id);
expect(result1._index).to.match(/^\.ds\-logs.weird-characters-.*/);
const response2 = await indexDocument(esClient, 'logs', doc2);
expect(response2.result).to.eql('created');
const result2 = await fetchDocument(esClient, 'logs', response2._id);
expect(result2._index).to.match(/^\.ds\-logs-.*/);
});
});
});
}