[Expression] Cached expression can keep their own side effects (#216519)

## Summary

Fixes the #207204 

This PR introduces a new complementary function for an Expression
definition named `sideEffects`, this goes together with the other `fn`
function and it is used to restore any side effect when the caching
system kicks in.


![side_effects_cache](https://github.com/user-attachments/assets/74b1ddff-a45c-4983-ac09-57559155fba8)

I haven't found how to programmatically test this.
Will add an FTR if it can be reliable to reproduce an expression caching
scenario.

### Checklist

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

### Release notes

The request inspector now shows the correct request and response in any
successful scenario.
This commit is contained in:
Marco Liberati 2025-04-11 14:50:47 +02:00 committed by GitHub
parent 1bf39845da
commit 6984530aa0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 147 additions and 27 deletions

View file

@ -85,7 +85,7 @@ pageLoadAssetSize:
infra: 184320
ingestPipelines: 58003
inputControlVis: 172675
inspector: 17600
inspector: 17740
interactiveSetup: 80000
inventory: 27430
kibanaOverview: 56279

View file

@ -472,8 +472,9 @@ export class AggConfigs {
if (!this.hasTimeShifts()) {
return response;
}
const transformedRawResponse = cloneDeep(response.rawResponse);
if (!transformedRawResponse.aggregations) {
let transformedRawResponse = response.rawResponse;
if (!response.rawResponse.aggregations) {
transformedRawResponse = cloneDeep(response.rawResponse);
transformedRawResponse.aggregations = {
doc_count: response.rawResponse.hits?.total as estypes.AggregationsAggregate,
};

View file

@ -58,7 +58,6 @@ export const getEsaggsMeta: () => Omit<EsaggsExpressionFunctionDefinition, 'fn'>
name,
type: 'datatable',
inputTypes: ['kibana_context', 'null'],
allowCache: true,
help: i18n.translate('data.functions.esaggs.help', {
defaultMessage: 'Run AggConfig aggregation',
}),

View file

@ -33,6 +33,7 @@ import { getTime } from '../../query';
import {
ESQL_ASYNC_SEARCH_STRATEGY,
ESQL_TABLE_TYPE,
getSideEffectFunction,
isRunningResponse,
type KibanaContext,
} from '..';
@ -98,7 +99,6 @@ export const getEsqlFn = ({ getStartDependencies }: EsqlFnArguments) => {
name: 'esql',
type: 'datatable',
inputTypes: ['kibana_context', 'null'],
allowCache: true,
help: i18n.translate('data.search.esql.help', {
defaultMessage: 'Queries Elasticsearch using ES|QL.',
}),
@ -155,6 +155,11 @@ export const getEsqlFn = ({ getStartDependencies }: EsqlFnArguments) => {
}),
},
},
allowCache: {
withSideEffects: (_, { inspectorAdapters }) => {
return getSideEffectFunction(inspectorAdapters);
},
},
fn(
input,
{

View file

@ -9,3 +9,4 @@
export * from './function_wrapper';
export { adaptToExpressionValueFilter } from './filters_adapter';
export { getSideEffectFunction } from './requests_side_effects';

View file

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Adapters } from '@kbn/inspector-plugin/common';
const collectSideEffectsData = (adapters: Adapters) => {
return adapters.requests?.getRequestEntries();
};
export const getSideEffectFunction = (adapters: Adapters) => {
const requestsWithResponses = collectSideEffectsData(adapters);
return () => {
if (!requestsWithResponses || requestsWithResponses.length === 0) {
return;
}
const requestsMap = new Map(requestsWithResponses.map(([request]) => [request.id, request]));
const responsesMap = new WeakMap(requestsWithResponses);
adapters.requests?.loadFromEntries(requestsMap, responsesMap);
};
};

View file

@ -15,6 +15,7 @@ import {
EsaggsExpressionFunctionDefinition,
EsaggsStartDependencies,
getEsaggsMeta,
getSideEffectFunction,
} from '../../../common/search/expressions';
import { DataPublicPluginStart, DataStartDependencies } from '../../types';
@ -37,13 +38,19 @@ export function getFunctionDefinition({
}) {
return (): EsaggsExpressionFunctionDefinition => ({
...getEsaggsMeta(),
allowCache: {
withSideEffects: (_, { inspectorAdapters }) => {
return getSideEffectFunction(inspectorAdapters);
},
},
fn(
input,
args,
{ inspectorAdapters, abortSignal, getSearchSessionId, getExecutionContext, getSearchContext }
) {
return defer(async () => {
const { aggs, indexPatterns, searchSource, getNow } = await getStartDependencies();
const [{ aggs, indexPatterns, searchSource, getNow }, { handleEsaggsRequest }] =
await Promise.all([getStartDependencies(), import('../../../common/search/expressions')]);
const indexPattern = await indexPatterns.create(args.index.value, true);
const aggConfigs = aggs.createAggConfigs(
@ -57,8 +64,6 @@ export function getFunctionDefinition({
}
);
const { handleEsaggsRequest } = await import('../../../common/search/expressions');
return { aggConfigs, indexPattern, searchSource, getNow, handleEsaggsRequest };
}).pipe(
switchMap(({ aggConfigs, indexPattern, searchSource, getNow, handleEsaggsRequest }) => {

View file

@ -59,6 +59,7 @@ type UnwrapReturnType<Function extends (...args: any[]) => unknown> =
export interface FunctionCacheItem {
value: unknown;
time: number;
sideEffectFn?: () => void;
}
/**
* The result returned after an expression function execution.
@ -475,21 +476,19 @@ export class Execution<
.pipe(
map((currentInput) => this.cast(currentInput, fn.inputTypes)),
switchMap((normalizedInput) => {
if (fn.allowCache && this.context.allowCache) {
hash = calculateObjectHash([
fn.name,
normalizedInput,
args,
this.context.getSearchContext(),
]);
const {
hash: fnHash,
value: cachedValue,
valid: cacheValid,
} = this.#canUseCachedResult(fn, normalizedInput, args);
hash = fnHash;
if (cacheValid) {
cachedValue.sideEffectFn?.();
return of(cachedValue.value);
}
if (hash && this.functionCache.has(hash)) {
const cached = this.functionCache.get(hash);
if (cached && Date.now() - cached.time < this.cacheTimeout) {
return of(cached.value);
}
}
return of(fn.fn(normalizedInput, args, this.context));
const output = fn.fn(normalizedInput, args, this.context);
return of(output);
}),
switchMap((fnResult) => {
return (
@ -524,10 +523,15 @@ export class Execution<
}),
finalize(() => {
if (completionFlag && hash) {
const sideEffectResult = this.#getSideEffectFn(fn, args);
while (this.functionCache.size >= maxCacheSize) {
this.functionCache.delete(this.functionCache.keys().next().value);
}
this.functionCache.set(hash, { value: lastValue, time: Date.now() });
this.functionCache.set(hash, {
value: lastValue,
time: Date.now(),
sideEffectFn: sideEffectResult,
});
}
})
)
@ -714,4 +718,41 @@ export class Execution<
return throwError(new Error(`Unknown AST object: ${JSON.stringify(ast)}`));
}
}
#canUseCachedResult<Fn extends ExpressionFunction>(
fn: Fn,
input: unknown,
args: Record<string, unknown>
):
| { hash: string; value: FunctionCacheItem; valid: boolean }
| { hash: string | undefined; value: undefined; valid: false } {
if (!fn.allowCache || !this.context.allowCache) {
return { hash: undefined, value: undefined, valid: false };
}
const hash = calculateObjectHash([fn.name, input, args, this.context.getSearchContext()]);
const cached = this.functionCache.get(hash);
if (hash && cached) {
return {
hash,
value: cached,
valid: Boolean(cached && Date.now() - cached.time < this.cacheTimeout),
};
}
return {
hash,
value: undefined,
valid: false,
};
}
#getSideEffectFn<Fn extends ExpressionFunction>(
fn: Fn,
args: Record<string, unknown>
): undefined | (() => void) {
if (!fn.allowCache || typeof fn.allowCache === 'boolean') {
return undefined;
}
return fn.allowCache.withSideEffects?.(args, this.context);
}
}

View file

@ -42,7 +42,9 @@ export class ExpressionFunction implements PersistableState<ExpressionAstFunctio
/**
* Opt-in to caching this function. By default function outputs are cached and given the same inputs cached result is returned.
*/
allowCache: boolean;
allowCache:
| boolean
| { withSideEffects: (params: Record<string, unknown>, handlers: object) => () => void };
/**
* Function to run function (context, args)
@ -116,7 +118,10 @@ export class ExpressionFunction implements PersistableState<ExpressionAstFunctio
this.fn = fn as ExpressionFunction['fn'];
this.help = help || '';
this.inputTypes = inputTypes || context?.types;
this.allowCache = !!allowCache;
this.allowCache =
allowCache && typeof allowCache !== 'boolean'
? (allowCache as ExpressionFunction['allowCache'])
: Boolean(allowCache);
this.disabled = disabled || false;
this.deprecated = !!deprecated;
this.telemetry = telemetry || ((s, c) => c);

View file

@ -60,8 +60,19 @@ export interface ExpressionFunctionDefinition<
/**
* Opt-in to caching this function. By default function outputs are cached and given the same inputs cached result is returned.
*
* It is possible to collect side effects produced by the function
* (e.g. logging, sending events to the server, etc.) and return a
* handler to reproduce such side effects when the function cache is used
* instead of the original function implementation.
* @param args Parameters set for this function in expression.
* @param context Object with functions to perform side effects. This object
* is created for the duration of the execution of expression and is the
* same for all functions in expression chain.
* @returns A handler to be called to reproduce side effects when the function cache is used.
*
*/
allowCache?: boolean;
allowCache?: boolean | { withSideEffects(args: Arguments, context: Context): () => void };
/**
* List of allowed type names for input value of this function. If this

View file

@ -21,10 +21,12 @@ import { Request, RequestParams, RequestStatus } from './types';
*/
export class RequestAdapter extends EventEmitter {
private requests: Map<string, Request>;
private responses: WeakMap<Request, RequestResponder>;
constructor() {
super();
this.requests = new Map();
this.responses = new WeakMap();
}
/**
@ -52,7 +54,17 @@ export class RequestAdapter extends EventEmitter {
};
this.requests.set(req.id, req);
this._onChange();
return new RequestResponder(req, () => this._onChange());
const responder = new RequestResponder(req, () => this._onChange());
this.responses.set(req, responder);
return responder;
}
public loadFromEntries(
requests: Map<string, Request>,
responses: WeakMap<Request, RequestResponder>
) {
this.requests = requests;
this.responses = responses;
}
public reset(): void {
@ -61,7 +73,11 @@ export class RequestAdapter extends EventEmitter {
}
public resetRequest(id: string): void {
const req = this.requests.get(id);
this.requests.delete(id);
if (req) {
this.responses.delete(req);
}
this._onChange();
}
@ -69,6 +85,12 @@ export class RequestAdapter extends EventEmitter {
return Array.from(this.requests.values());
}
public getRequestEntries(): Array<[Request, RequestResponder]> {
return this.getRequests()
.map((req) => [req, this.responses.get(req)] as [Request, RequestResponder])
.filter(([_req, responder]) => responder != null);
}
private _onChange(): void {
this.emit('change');
}

View file

@ -129,6 +129,7 @@ describe('useStateProps', () => {
"_eventsCount": 0,
"_maxListeners": undefined,
"requests": Map {},
"responses": WeakMap {},
Symbol(shapeMode): false,
Symbol(kCapture): false,
},
@ -217,6 +218,7 @@ describe('useStateProps', () => {
"_eventsCount": 0,
"_maxListeners": undefined,
"requests": Map {},
"responses": WeakMap {},
Symbol(shapeMode): false,
Symbol(kCapture): false,
},
@ -415,6 +417,7 @@ describe('useStateProps', () => {
"_eventsCount": 0,
"_maxListeners": undefined,
"requests": Map {},
"responses": WeakMap {},
Symbol(shapeMode): false,
Symbol(kCapture): false,
},
@ -498,6 +501,7 @@ describe('useStateProps', () => {
"_eventsCount": 0,
"_maxListeners": undefined,
"requests": Map {},
"responses": WeakMap {},
Symbol(shapeMode): false,
Symbol(kCapture): false,
},