mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 09:19:04 -04:00
* Update `esaggs` expressions function to support partial results * Add partial results throttling in the expressions loader
This commit is contained in:
parent
e6d944a50a
commit
b0b92a97ac
14 changed files with 263 additions and 156 deletions
|
@ -22,11 +22,12 @@ export interface IExpressionLoaderParams
|
|||
| [hasCompatibleActions](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.hascompatibleactions.md) | <code>ExpressionRenderHandlerParams['hasCompatibleActions']</code> | |
|
||||
| [inspectorAdapters](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.inspectoradapters.md) | <code>Adapters</code> | |
|
||||
| [onRenderError](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.onrendererror.md) | <code>RenderErrorHandlerFnType</code> | |
|
||||
| [partial](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md) | <code>boolean</code> | |
|
||||
| [partial](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md) | <code>boolean</code> | The flag to toggle on emitting partial results. By default, the partial results are disabled. |
|
||||
| [renderMode](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.rendermode.md) | <code>RenderMode</code> | |
|
||||
| [searchContext](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.searchcontext.md) | <code>SerializableState</code> | |
|
||||
| [searchSessionId](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.searchsessionid.md) | <code>string</code> | |
|
||||
| [syncColors](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.synccolors.md) | <code>boolean</code> | |
|
||||
| [throttle](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md) | <code>number</code> | Throttling of partial results in milliseconds. By default, throttling is disabled. |
|
||||
| [uiState](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.uistate.md) | <code>unknown</code> | |
|
||||
| [variables](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.variables.md) | <code>Record<string, any></code> | |
|
||||
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
|
||||
## IExpressionLoaderParams.partial property
|
||||
|
||||
The flag to toggle on emitting partial results. By default, the partial results are disabled.
|
||||
|
||||
<b>Signature:</b>
|
||||
|
||||
```typescript
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
|
||||
|
||||
[Home](./index.md) > [kibana-plugin-plugins-expressions-public](./kibana-plugin-plugins-expressions-public.md) > [IExpressionLoaderParams](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md) > [throttle](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md)
|
||||
|
||||
## IExpressionLoaderParams.throttle property
|
||||
|
||||
Throttling of partial results in milliseconds. By default, throttling is disabled.
|
||||
|
||||
<b>Signature:</b>
|
||||
|
||||
```typescript
|
||||
throttle?: number;
|
||||
```
|
|
@ -7,6 +7,7 @@
|
|||
*/
|
||||
|
||||
import { i18n } from '@kbn/i18n';
|
||||
import { Observable } from 'rxjs';
|
||||
|
||||
import { Datatable, ExpressionFunctionDefinition } from 'src/plugins/expressions/common';
|
||||
|
||||
|
@ -22,7 +23,7 @@ import { handleRequest } from './request_handler';
|
|||
const name = 'esaggs';
|
||||
|
||||
type Input = KibanaContext | null;
|
||||
type Output = Promise<Datatable>;
|
||||
type Output = Observable<Datatable>;
|
||||
|
||||
interface Arguments {
|
||||
index: IndexPatternExpressionType;
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { from } from 'rxjs';
|
||||
import type { MockedKeys } from '@kbn/utility-types/jest';
|
||||
import type { Filter } from '../../../es_query';
|
||||
import type { IndexPattern } from '../../../index_patterns';
|
||||
|
@ -21,6 +22,7 @@ jest.mock('../../tabify', () => ({
|
|||
|
||||
import { tabifyAggResponse } from '../../tabify';
|
||||
import { of } from 'rxjs';
|
||||
import { toArray } from 'rxjs/operators';
|
||||
|
||||
describe('esaggs expression function - public', () => {
|
||||
let mockParams: MockedKeys<RequestHandlerParams>;
|
||||
|
@ -57,7 +59,7 @@ describe('esaggs expression function - public', () => {
|
|||
});
|
||||
|
||||
test('should create a new search source instance', async () => {
|
||||
await handleRequest(mockParams);
|
||||
await handleRequest(mockParams).toPromise();
|
||||
expect(mockParams.searchSourceService.create).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
|
@ -65,7 +67,7 @@ describe('esaggs expression function - public', () => {
|
|||
let searchSource: MockedKeys<ISearchSource>;
|
||||
|
||||
beforeEach(async () => {
|
||||
await handleRequest(mockParams);
|
||||
await handleRequest(mockParams).toPromise();
|
||||
searchSource = await mockParams.searchSourceService.create();
|
||||
});
|
||||
|
||||
|
@ -100,7 +102,7 @@ describe('esaggs expression function - public', () => {
|
|||
await handleRequest({
|
||||
...mockParams,
|
||||
filters: mockFilters,
|
||||
});
|
||||
}).toPromise();
|
||||
searchSource = await mockParams.searchSourceService.create();
|
||||
expect((searchSource.setField as jest.Mock).mock.calls[3]).toEqual(['filter', mockFilters]);
|
||||
});
|
||||
|
@ -118,14 +120,14 @@ describe('esaggs expression function - public', () => {
|
|||
await handleRequest({
|
||||
...mockParams,
|
||||
query: mockQuery,
|
||||
});
|
||||
}).toPromise();
|
||||
searchSource = await mockParams.searchSourceService.create();
|
||||
expect((searchSource.setField as jest.Mock).mock.calls[4]).toEqual(['query', mockQuery]);
|
||||
});
|
||||
});
|
||||
|
||||
test('calls searchSource.fetch', async () => {
|
||||
await handleRequest(mockParams);
|
||||
await handleRequest(mockParams).toPromise();
|
||||
const searchSource = await mockParams.searchSourceService.create();
|
||||
|
||||
expect(searchSource.fetch$).toHaveBeenCalledWith({
|
||||
|
@ -140,7 +142,7 @@ describe('esaggs expression function - public', () => {
|
|||
});
|
||||
|
||||
test('tabifies response data', async () => {
|
||||
await handleRequest(mockParams);
|
||||
await handleRequest(mockParams).toPromise();
|
||||
expect(tabifyAggResponse).toHaveBeenCalledWith(
|
||||
mockParams.aggs,
|
||||
{},
|
||||
|
@ -155,7 +157,7 @@ describe('esaggs expression function - public', () => {
|
|||
await handleRequest({
|
||||
...mockParams,
|
||||
timeRange: { from: '2020-12-01', to: '2020-12-31' },
|
||||
});
|
||||
}).toPromise();
|
||||
expect((tabifyAggResponse as jest.Mock).mock.calls[0][2].timeRange).toMatchInlineSnapshot(`
|
||||
Object {
|
||||
"from": "2020-12-01T05:00:00.000Z",
|
||||
|
@ -167,4 +169,29 @@ describe('esaggs expression function - public', () => {
|
|||
}
|
||||
`);
|
||||
});
|
||||
|
||||
test('returns partial results', async () => {
|
||||
const searchSource = await mockParams.searchSourceService.create();
|
||||
|
||||
(searchSource.fetch$ as jest.MockedFunction<typeof searchSource.fetch$>).mockReturnValue(
|
||||
from([
|
||||
{
|
||||
rawResponse: {},
|
||||
},
|
||||
{
|
||||
rawResponse: {},
|
||||
},
|
||||
]) as ReturnType<typeof searchSource.fetch$>
|
||||
);
|
||||
|
||||
const result = await handleRequest({
|
||||
...mockParams,
|
||||
query: { query: 'foo', language: 'bar' },
|
||||
})
|
||||
.pipe(toArray())
|
||||
.toPromise();
|
||||
|
||||
expect(result).toHaveLength(2);
|
||||
expect(tabifyAggResponse).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -7,6 +7,8 @@
|
|||
*/
|
||||
|
||||
import { i18n } from '@kbn/i18n';
|
||||
import { defer } from 'rxjs';
|
||||
import { map, switchMap } from 'rxjs/operators';
|
||||
import { Adapters } from 'src/plugins/inspector/common';
|
||||
|
||||
import { calculateBounds, Filter, IndexPattern, Query, TimeRange } from '../../../../common';
|
||||
|
@ -32,7 +34,7 @@ export interface RequestHandlerParams {
|
|||
getNow?: () => Date;
|
||||
}
|
||||
|
||||
export const handleRequest = async ({
|
||||
export const handleRequest = ({
|
||||
abortSignal,
|
||||
aggs,
|
||||
filters,
|
||||
|
@ -46,87 +48,95 @@ export const handleRequest = async ({
|
|||
timeRange,
|
||||
getNow,
|
||||
}: RequestHandlerParams) => {
|
||||
const forceNow = getNow?.();
|
||||
const searchSource = await searchSourceService.create();
|
||||
return defer(async () => {
|
||||
const forceNow = getNow?.();
|
||||
const searchSource = await searchSourceService.create();
|
||||
|
||||
searchSource.setField('index', indexPattern);
|
||||
searchSource.setField('size', 0);
|
||||
searchSource.setField('index', indexPattern);
|
||||
searchSource.setField('size', 0);
|
||||
|
||||
// Create a new search source that inherits the original search source
|
||||
// but has the appropriate timeRange applied via a filter.
|
||||
// This is a temporary solution until we properly pass down all required
|
||||
// information for the request to the request handler (https://github.com/elastic/kibana/issues/16641).
|
||||
// Using callParentStartHandlers: true we make sure, that the parent searchSource
|
||||
// onSearchRequestStart will be called properly even though we use an inherited
|
||||
// search source.
|
||||
const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true });
|
||||
const requestSearchSource = timeFilterSearchSource.createChild({ callParentStartHandlers: true });
|
||||
|
||||
// If timeFields have been specified, use the specified ones, otherwise use primary time field of index
|
||||
// pattern if it's available.
|
||||
const defaultTimeField = indexPattern?.getTimeField?.();
|
||||
const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : [];
|
||||
const allTimeFields = timeFields && timeFields.length > 0 ? timeFields : defaultTimeFields;
|
||||
|
||||
aggs.setTimeRange(timeRange as TimeRange);
|
||||
aggs.setForceNow(forceNow);
|
||||
aggs.setTimeFields(allTimeFields);
|
||||
|
||||
// For now we need to mirror the history of the passed search source, since
|
||||
// the request inspector wouldn't work otherwise.
|
||||
Object.defineProperty(requestSearchSource, 'history', {
|
||||
get() {
|
||||
return searchSource.history;
|
||||
},
|
||||
set(history) {
|
||||
return (searchSource.history = history);
|
||||
},
|
||||
});
|
||||
|
||||
requestSearchSource.setField('aggs', aggs);
|
||||
|
||||
requestSearchSource.onRequestStart((paramSearchSource, options) => {
|
||||
return aggs.onSearchRequestStart(paramSearchSource, options);
|
||||
});
|
||||
|
||||
// If a timeRange has been specified and we had at least one timeField available, create range
|
||||
// filters for that those time fields
|
||||
if (timeRange && allTimeFields.length > 0) {
|
||||
timeFilterSearchSource.setField('filter', () => {
|
||||
return aggs.getSearchSourceTimeFilter(forceNow);
|
||||
// Create a new search source that inherits the original search source
|
||||
// but has the appropriate timeRange applied via a filter.
|
||||
// This is a temporary solution until we properly pass down all required
|
||||
// information for the request to the request handler (https://github.com/elastic/kibana/issues/16641).
|
||||
// Using callParentStartHandlers: true we make sure, that the parent searchSource
|
||||
// onSearchRequestStart will be called properly even though we use an inherited
|
||||
// search source.
|
||||
const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true });
|
||||
const requestSearchSource = timeFilterSearchSource.createChild({
|
||||
callParentStartHandlers: true,
|
||||
});
|
||||
}
|
||||
|
||||
requestSearchSource.setField('filter', filters);
|
||||
requestSearchSource.setField('query', query);
|
||||
// If timeFields have been specified, use the specified ones, otherwise use primary time field of index
|
||||
// pattern if it's available.
|
||||
const defaultTimeField = indexPattern?.getTimeField?.();
|
||||
const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : [];
|
||||
const allTimeFields = timeFields?.length ? timeFields : defaultTimeFields;
|
||||
|
||||
const { rawResponse: response } = await requestSearchSource
|
||||
.fetch$({
|
||||
abortSignal,
|
||||
sessionId: searchSessionId,
|
||||
inspector: {
|
||||
adapter: inspectorAdapters.requests,
|
||||
title: i18n.translate('data.functions.esaggs.inspector.dataRequest.title', {
|
||||
defaultMessage: 'Data',
|
||||
}),
|
||||
description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', {
|
||||
defaultMessage:
|
||||
'This request queries Elasticsearch to fetch the data for the visualization.',
|
||||
}),
|
||||
aggs.setTimeRange(timeRange as TimeRange);
|
||||
aggs.setForceNow(forceNow);
|
||||
aggs.setTimeFields(allTimeFields);
|
||||
|
||||
// For now we need to mirror the history of the passed search source, since
|
||||
// the request inspector wouldn't work otherwise.
|
||||
Object.defineProperty(requestSearchSource, 'history', {
|
||||
get() {
|
||||
return searchSource.history;
|
||||
},
|
||||
})
|
||||
.toPromise();
|
||||
set(history) {
|
||||
return (searchSource.history = history);
|
||||
},
|
||||
});
|
||||
|
||||
const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null;
|
||||
const tabifyParams = {
|
||||
metricsAtAllLevels: aggs.hierarchical,
|
||||
partialRows,
|
||||
timeRange: parsedTimeRange
|
||||
? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields }
|
||||
: undefined,
|
||||
};
|
||||
requestSearchSource.setField('aggs', aggs);
|
||||
|
||||
const tabifiedResponse = tabifyAggResponse(aggs, response, tabifyParams);
|
||||
requestSearchSource.onRequestStart((paramSearchSource, options) => {
|
||||
return aggs.onSearchRequestStart(paramSearchSource, options);
|
||||
});
|
||||
|
||||
return tabifiedResponse;
|
||||
// If a timeRange has been specified and we had at least one timeField available, create range
|
||||
// filters for that those time fields
|
||||
if (timeRange && allTimeFields.length > 0) {
|
||||
timeFilterSearchSource.setField('filter', () => {
|
||||
return aggs.getSearchSourceTimeFilter(forceNow);
|
||||
});
|
||||
}
|
||||
|
||||
requestSearchSource.setField('filter', filters);
|
||||
requestSearchSource.setField('query', query);
|
||||
|
||||
return { allTimeFields, forceNow, requestSearchSource };
|
||||
}).pipe(
|
||||
switchMap(({ allTimeFields, forceNow, requestSearchSource }) =>
|
||||
requestSearchSource
|
||||
.fetch$({
|
||||
abortSignal,
|
||||
sessionId: searchSessionId,
|
||||
inspector: {
|
||||
adapter: inspectorAdapters.requests,
|
||||
title: i18n.translate('data.functions.esaggs.inspector.dataRequest.title', {
|
||||
defaultMessage: 'Data',
|
||||
}),
|
||||
description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', {
|
||||
defaultMessage:
|
||||
'This request queries Elasticsearch to fetch the data for the visualization.',
|
||||
}),
|
||||
},
|
||||
})
|
||||
.pipe(
|
||||
map(({ rawResponse: response }) => {
|
||||
const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null;
|
||||
const tabifyParams = {
|
||||
metricsAtAllLevels: aggs.hierarchical,
|
||||
partialRows,
|
||||
timeRange: parsedTimeRange
|
||||
? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields }
|
||||
: undefined,
|
||||
};
|
||||
|
||||
return tabifyAggResponse(aggs, response, tabifyParams);
|
||||
})
|
||||
)
|
||||
)
|
||||
);
|
||||
};
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { of as mockOf } from 'rxjs';
|
||||
import type { MockedKeys } from '@kbn/utility-types/jest';
|
||||
import type { ExecutionContext } from 'src/plugins/expressions/public';
|
||||
import type { IndexPatternsContract } from '../../../common/index_patterns/index_patterns';
|
||||
|
@ -20,7 +21,7 @@ import { getFunctionDefinition } from './esaggs';
|
|||
|
||||
jest.mock('../../../common/search/expressions', () => ({
|
||||
getEsaggsMeta: jest.fn().mockReturnValue({ name: 'esaggs' }),
|
||||
handleEsaggsRequest: jest.fn().mockResolvedValue({}),
|
||||
handleEsaggsRequest: jest.fn(() => mockOf({})),
|
||||
}));
|
||||
|
||||
import { getEsaggsMeta, handleEsaggsRequest } from '../../../common/search/expressions';
|
||||
|
@ -74,13 +75,13 @@ describe('esaggs expression function - public', () => {
|
|||
});
|
||||
|
||||
test('calls indexPatterns.create with the values provided by the subexpression arg', async () => {
|
||||
await definition().fn(null, args, mockHandlers);
|
||||
await definition().fn(null, args, mockHandlers).toPromise();
|
||||
|
||||
expect(startDependencies.indexPatterns.create).toHaveBeenCalledWith(args.index.value, true);
|
||||
});
|
||||
|
||||
test('calls aggs.createAggConfigs with the values provided by the subexpression arg', async () => {
|
||||
await definition().fn(null, args, mockHandlers);
|
||||
await definition().fn(null, args, mockHandlers).toPromise();
|
||||
|
||||
expect(startDependencies.aggs.createAggConfigs).toHaveBeenCalledWith(
|
||||
{},
|
||||
|
@ -96,7 +97,7 @@ describe('esaggs expression function - public', () => {
|
|||
});
|
||||
|
||||
test('calls handleEsaggsRequest with all of the right dependencies', async () => {
|
||||
await definition().fn(null, args, mockHandlers);
|
||||
await definition().fn(null, args, mockHandlers).toPromise();
|
||||
|
||||
expect(handleEsaggsRequest).toHaveBeenCalledWith({
|
||||
abortSignal: mockHandlers.abortSignal,
|
||||
|
@ -128,7 +129,7 @@ describe('esaggs expression function - public', () => {
|
|||
timeRange: { from: 'a', to: 'b' },
|
||||
} as KibanaContext;
|
||||
|
||||
await definition().fn(input, args, mockHandlers);
|
||||
await definition().fn(input, args, mockHandlers).toPromise();
|
||||
|
||||
expect(handleEsaggsRequest).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
|
|
|
@ -7,6 +7,8 @@
|
|||
*/
|
||||
|
||||
import { get } from 'lodash';
|
||||
import { defer } from 'rxjs';
|
||||
import { switchMap } from 'rxjs/operators';
|
||||
import { StartServicesAccessor } from 'src/core/public';
|
||||
import {
|
||||
EsaggsExpressionFunctionDefinition,
|
||||
|
@ -35,30 +37,36 @@ export function getFunctionDefinition({
|
|||
}) {
|
||||
return (): EsaggsExpressionFunctionDefinition => ({
|
||||
...getEsaggsMeta(),
|
||||
async fn(input, args, { inspectorAdapters, abortSignal, getSearchSessionId }) {
|
||||
const { aggs, indexPatterns, searchSource, getNow } = await getStartDependencies();
|
||||
fn(input, args, { inspectorAdapters, abortSignal, getSearchSessionId }) {
|
||||
return defer(async () => {
|
||||
const { aggs, indexPatterns, searchSource, getNow } = await getStartDependencies();
|
||||
|
||||
const indexPattern = await indexPatterns.create(args.index.value, true);
|
||||
const aggConfigs = aggs.createAggConfigs(
|
||||
indexPattern,
|
||||
args.aggs!.map((agg) => agg.value)
|
||||
const indexPattern = await indexPatterns.create(args.index.value, true);
|
||||
const aggConfigs = aggs.createAggConfigs(
|
||||
indexPattern,
|
||||
args.aggs!.map((agg) => agg.value)
|
||||
);
|
||||
aggConfigs.hierarchical = args.metricsAtAllLevels;
|
||||
|
||||
return { aggConfigs, indexPattern, searchSource, getNow };
|
||||
}).pipe(
|
||||
switchMap(({ aggConfigs, indexPattern, searchSource, getNow }) =>
|
||||
handleEsaggsRequest({
|
||||
abortSignal,
|
||||
aggs: aggConfigs,
|
||||
filters: get(input, 'filters', undefined),
|
||||
indexPattern,
|
||||
inspectorAdapters,
|
||||
partialRows: args.partialRows,
|
||||
query: get(input, 'query', undefined) as any,
|
||||
searchSessionId: getSearchSessionId(),
|
||||
searchSourceService: searchSource,
|
||||
timeFields: args.timeFields,
|
||||
timeRange: get(input, 'timeRange', undefined),
|
||||
getNow,
|
||||
})
|
||||
)
|
||||
);
|
||||
aggConfigs.hierarchical = args.metricsAtAllLevels;
|
||||
|
||||
return await handleEsaggsRequest({
|
||||
abortSignal,
|
||||
aggs: aggConfigs,
|
||||
filters: get(input, 'filters', undefined),
|
||||
indexPattern,
|
||||
inspectorAdapters,
|
||||
partialRows: args.partialRows,
|
||||
query: get(input, 'query', undefined) as any,
|
||||
searchSessionId: getSearchSessionId(),
|
||||
searchSourceService: searchSource,
|
||||
timeFields: args.timeFields,
|
||||
timeRange: get(input, 'timeRange', undefined),
|
||||
getNow,
|
||||
});
|
||||
},
|
||||
});
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { of as mockOf } from 'rxjs';
|
||||
import type { MockedKeys } from '@kbn/utility-types/jest';
|
||||
import { KibanaRequest } from 'src/core/server';
|
||||
import type { ExecutionContext } from 'src/plugins/expressions/server';
|
||||
|
@ -21,7 +22,7 @@ import { getFunctionDefinition } from './esaggs';
|
|||
|
||||
jest.mock('../../../common/search/expressions', () => ({
|
||||
getEsaggsMeta: jest.fn().mockReturnValue({ name: 'esaggs' }),
|
||||
handleEsaggsRequest: jest.fn().mockResolvedValue({}),
|
||||
handleEsaggsRequest: jest.fn(() => mockOf({})),
|
||||
}));
|
||||
|
||||
import { getEsaggsMeta, handleEsaggsRequest } from '../../../common/search/expressions';
|
||||
|
@ -76,19 +77,19 @@ describe('esaggs expression function - server', () => {
|
|||
});
|
||||
|
||||
test('calls getStartDependencies with the KibanaRequest', async () => {
|
||||
await definition().fn(null, args, mockHandlers);
|
||||
await definition().fn(null, args, mockHandlers).toPromise();
|
||||
|
||||
expect(getStartDependencies).toHaveBeenCalledWith({ id: 'hi' });
|
||||
});
|
||||
|
||||
test('calls indexPatterns.create with the values provided by the subexpression arg', async () => {
|
||||
await definition().fn(null, args, mockHandlers);
|
||||
await definition().fn(null, args, mockHandlers).toPromise();
|
||||
|
||||
expect(startDependencies.indexPatterns.create).toHaveBeenCalledWith(args.index.value, true);
|
||||
});
|
||||
|
||||
test('calls aggs.createAggConfigs with the values provided by the subexpression arg', async () => {
|
||||
await definition().fn(null, args, mockHandlers);
|
||||
await definition().fn(null, args, mockHandlers).toPromise();
|
||||
|
||||
expect(startDependencies.aggs.createAggConfigs).toHaveBeenCalledWith(
|
||||
{},
|
||||
|
@ -104,7 +105,7 @@ describe('esaggs expression function - server', () => {
|
|||
});
|
||||
|
||||
test('calls handleEsaggsRequest with all of the right dependencies', async () => {
|
||||
await definition().fn(null, args, mockHandlers);
|
||||
await definition().fn(null, args, mockHandlers).toPromise();
|
||||
|
||||
expect(handleEsaggsRequest).toHaveBeenCalledWith({
|
||||
abortSignal: mockHandlers.abortSignal,
|
||||
|
@ -135,7 +136,7 @@ describe('esaggs expression function - server', () => {
|
|||
timeRange: { from: 'a', to: 'b' },
|
||||
} as KibanaContext;
|
||||
|
||||
await definition().fn(input, args, mockHandlers);
|
||||
await definition().fn(input, args, mockHandlers).toPromise();
|
||||
|
||||
expect(handleEsaggsRequest).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
|
|
|
@ -7,6 +7,8 @@
|
|||
*/
|
||||
|
||||
import { get } from 'lodash';
|
||||
import { defer } from 'rxjs';
|
||||
import { switchMap } from 'rxjs/operators';
|
||||
import { i18n } from '@kbn/i18n';
|
||||
import { KibanaRequest, StartServicesAccessor } from 'src/core/server';
|
||||
import {
|
||||
|
@ -36,45 +38,47 @@ export function getFunctionDefinition({
|
|||
}): () => EsaggsExpressionFunctionDefinition {
|
||||
return () => ({
|
||||
...getEsaggsMeta(),
|
||||
async fn(
|
||||
input,
|
||||
args,
|
||||
{ inspectorAdapters, abortSignal, getSearchSessionId, getKibanaRequest }
|
||||
) {
|
||||
const kibanaRequest = getKibanaRequest ? getKibanaRequest() : null;
|
||||
if (!kibanaRequest) {
|
||||
throw new Error(
|
||||
i18n.translate('data.search.esaggs.error.kibanaRequest', {
|
||||
defaultMessage:
|
||||
'A KibanaRequest is required to execute this search on the server. ' +
|
||||
'Please provide a request object to the expression execution params.',
|
||||
})
|
||||
fn(input, args, { inspectorAdapters, abortSignal, getSearchSessionId, getKibanaRequest }) {
|
||||
return defer(async () => {
|
||||
const kibanaRequest = getKibanaRequest ? getKibanaRequest() : null;
|
||||
if (!kibanaRequest) {
|
||||
throw new Error(
|
||||
i18n.translate('data.search.esaggs.error.kibanaRequest', {
|
||||
defaultMessage:
|
||||
'A KibanaRequest is required to execute this search on the server. ' +
|
||||
'Please provide a request object to the expression execution params.',
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
const { aggs, indexPatterns, searchSource } = await getStartDependencies(kibanaRequest);
|
||||
|
||||
const indexPattern = await indexPatterns.create(args.index.value, true);
|
||||
const aggConfigs = aggs.createAggConfigs(
|
||||
indexPattern,
|
||||
args.aggs!.map((agg) => agg.value)
|
||||
);
|
||||
}
|
||||
|
||||
const { aggs, indexPatterns, searchSource } = await getStartDependencies(kibanaRequest);
|
||||
aggConfigs.hierarchical = args.metricsAtAllLevels;
|
||||
|
||||
const indexPattern = await indexPatterns.create(args.index.value, true);
|
||||
const aggConfigs = aggs.createAggConfigs(
|
||||
indexPattern,
|
||||
args.aggs!.map((agg) => agg.value)
|
||||
return { aggConfigs, indexPattern, searchSource };
|
||||
}).pipe(
|
||||
switchMap(({ aggConfigs, indexPattern, searchSource }) =>
|
||||
handleEsaggsRequest({
|
||||
abortSignal,
|
||||
aggs: aggConfigs,
|
||||
filters: get(input, 'filters', undefined),
|
||||
indexPattern,
|
||||
inspectorAdapters,
|
||||
partialRows: args.partialRows,
|
||||
query: get(input, 'query', undefined) as any,
|
||||
searchSessionId: getSearchSessionId(),
|
||||
searchSourceService: searchSource,
|
||||
timeFields: args.timeFields,
|
||||
timeRange: get(input, 'timeRange', undefined),
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
aggConfigs.hierarchical = args.metricsAtAllLevels;
|
||||
|
||||
return await handleEsaggsRequest({
|
||||
abortSignal,
|
||||
aggs: aggConfigs,
|
||||
filters: get(input, 'filters', undefined),
|
||||
indexPattern,
|
||||
inspectorAdapters,
|
||||
partialRows: args.partialRows,
|
||||
query: get(input, 'query', undefined) as any,
|
||||
searchSessionId: getSearchSessionId(),
|
||||
searchSourceService: searchSource,
|
||||
timeFields: args.timeFields,
|
||||
timeRange: get(input, 'timeRange', undefined),
|
||||
});
|
||||
},
|
||||
});
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
import { of } from 'rxjs';
|
||||
import { first, skip, toArray } from 'rxjs/operators';
|
||||
import { TestScheduler } from 'rxjs/testing';
|
||||
import { loader, ExpressionLoader } from './loader';
|
||||
import { Observable } from 'rxjs';
|
||||
import {
|
||||
|
@ -22,6 +23,8 @@ const { __getLastExecution, __getLastRenderMode } = require('./services');
|
|||
|
||||
const element: HTMLElement = null as any;
|
||||
|
||||
let testScheduler: TestScheduler;
|
||||
|
||||
jest.mock('./services', () => {
|
||||
let renderMode: RenderMode | undefined;
|
||||
const renderers: Record<string, unknown> = {
|
||||
|
@ -88,6 +91,10 @@ describe('execute helper function', () => {
|
|||
describe('ExpressionLoader', () => {
|
||||
const expressionString = 'demodata';
|
||||
|
||||
beforeEach(() => {
|
||||
testScheduler = new TestScheduler((actual, expected) => expect(actual).toStrictEqual(expected));
|
||||
});
|
||||
|
||||
describe('constructor', () => {
|
||||
it('accepts expression string', () => {
|
||||
const expressionLoader = new ExpressionLoader(element, expressionString, {});
|
||||
|
@ -130,6 +137,7 @@ describe('ExpressionLoader', () => {
|
|||
const expressionLoader = new ExpressionLoader(element, 'var foo', {
|
||||
variables: { foo: of(1, 2) },
|
||||
partial: true,
|
||||
throttle: 0,
|
||||
});
|
||||
const { result, partial } = await expressionLoader.data$.pipe(first()).toPromise();
|
||||
|
||||
|
@ -137,6 +145,22 @@ describe('ExpressionLoader', () => {
|
|||
expect(result).toBe(1);
|
||||
});
|
||||
|
||||
it('throttles partial results', async () => {
|
||||
testScheduler.run(({ cold, expectObservable }) => {
|
||||
const expressionLoader = new ExpressionLoader(element, 'var foo', {
|
||||
variables: { foo: cold('a 5ms b 5ms c 10ms d', { a: 1, b: 2, c: 3, d: 4 }) },
|
||||
partial: true,
|
||||
throttle: 20,
|
||||
});
|
||||
|
||||
expectObservable(expressionLoader.data$).toBe('a 19ms c 2ms d', {
|
||||
a: expect.objectContaining({ result: 1 }),
|
||||
c: expect.objectContaining({ result: 3 }),
|
||||
d: expect.objectContaining({ result: 4 }),
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('emits on loading$ on initial load and on updates', async () => {
|
||||
const expressionLoader = new ExpressionLoader(element, expressionString, {});
|
||||
const loadingPromise = expressionLoader.loading$.pipe(toArray()).toPromise();
|
||||
|
|
|
@ -6,8 +6,8 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs';
|
||||
import { filter, map, delay } from 'rxjs/operators';
|
||||
import { BehaviorSubject, Observable, Subject, Subscription, asyncScheduler, identity } from 'rxjs';
|
||||
import { filter, map, delay, throttleTime } from 'rxjs/operators';
|
||||
import { defaults } from 'lodash';
|
||||
import { UnwrapObservable } from '@kbn/utility-types';
|
||||
import { Adapters } from '../../inspector/public';
|
||||
|
@ -145,7 +145,10 @@ export class ExpressionLoader {
|
|||
.getData()
|
||||
.pipe(
|
||||
delay(0), // delaying until the next tick since we execute the expression in the constructor
|
||||
filter(({ partial }) => params.partial || !partial)
|
||||
filter(({ partial }) => params.partial || !partial),
|
||||
params.partial && params.throttle
|
||||
? throttleTime(params.throttle, asyncScheduler, { leading: true, trailing: true })
|
||||
: identity
|
||||
)
|
||||
.subscribe((value) => this.dataSubject.next(value));
|
||||
};
|
||||
|
@ -178,6 +181,7 @@ export class ExpressionLoader {
|
|||
this.params.syncColors = params.syncColors;
|
||||
this.params.debug = Boolean(params.debug);
|
||||
this.params.partial = Boolean(params.partial);
|
||||
this.params.throttle = Number(params.throttle ?? 1000);
|
||||
|
||||
this.params.inspectorAdapters = (params.inspectorAdapters ||
|
||||
this.execution?.inspect()) as Adapters;
|
||||
|
|
|
@ -908,7 +908,6 @@ export interface IExpressionLoaderParams {
|
|||
//
|
||||
// (undocumented)
|
||||
onRenderError?: RenderErrorHandlerFnType;
|
||||
// (undocumented)
|
||||
partial?: boolean;
|
||||
// Warning: (ae-forgotten-export) The symbol "RenderMode" needs to be exported by the entry point index.d.ts
|
||||
//
|
||||
|
@ -920,6 +919,7 @@ export interface IExpressionLoaderParams {
|
|||
searchSessionId?: string;
|
||||
// (undocumented)
|
||||
syncColors?: boolean;
|
||||
throttle?: number;
|
||||
// (undocumented)
|
||||
uiState?: unknown;
|
||||
// (undocumented)
|
||||
|
|
|
@ -48,7 +48,18 @@ export interface IExpressionLoaderParams {
|
|||
renderMode?: RenderMode;
|
||||
syncColors?: boolean;
|
||||
hasCompatibleActions?: ExpressionRenderHandlerParams['hasCompatibleActions'];
|
||||
|
||||
/**
|
||||
* The flag to toggle on emitting partial results.
|
||||
* By default, the partial results are disabled.
|
||||
*/
|
||||
partial?: boolean;
|
||||
|
||||
/**
|
||||
* Throttling of partial results in milliseconds. 0 is disabling the throttling.
|
||||
* By default, it equals 1000.
|
||||
*/
|
||||
throttle?: number;
|
||||
}
|
||||
|
||||
export interface ExpressionRenderError extends Error {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue