mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[embeddable rebuild] change onFetchContextChanged from accepting callback to returning an observable (#180410)
Closes https://github.com/elastic/kibana/issues/180363 --------- Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
3ad523d2a1
commit
1c38891dfc
6 changed files with 496 additions and 493 deletions
|
@ -10,13 +10,12 @@ import { EuiCallOut } from '@elastic/eui';
|
|||
import { DataView } from '@kbn/data-views-plugin/common';
|
||||
import { ReactEmbeddableFactory } from '@kbn/embeddable-plugin/public';
|
||||
import {
|
||||
FetchContext,
|
||||
initializeTimeRange,
|
||||
onFetchContextChanged,
|
||||
fetch$,
|
||||
useBatchedPublishingSubjects,
|
||||
} from '@kbn/presentation-publishing';
|
||||
import React, { useEffect } from 'react';
|
||||
import { BehaviorSubject } from 'rxjs';
|
||||
import { BehaviorSubject, switchMap } from 'rxjs';
|
||||
import { SEARCH_EMBEDDABLE_ID } from './constants';
|
||||
import { getCount } from './get_count';
|
||||
import { Api, Services, State } from './types';
|
||||
|
@ -54,51 +53,49 @@ export const getSearchEmbeddableFactory = (services: Services) => {
|
|||
}
|
||||
);
|
||||
|
||||
let isUnmounted = false;
|
||||
const error$ = new BehaviorSubject<Error | undefined>(undefined);
|
||||
const count$ = new BehaviorSubject<number>(0);
|
||||
const onFetch = (fetchContext: FetchContext, isCanceled: () => boolean) => {
|
||||
error$.next(undefined);
|
||||
if (!defaultDataView) {
|
||||
return;
|
||||
}
|
||||
dataLoading$.next(true);
|
||||
getCount(
|
||||
defaultDataView,
|
||||
services.data,
|
||||
fetchContext.filters ?? [],
|
||||
fetchContext.query,
|
||||
// timeRange and timeslice provided seperatly so consumers can decide
|
||||
// whether to refetch data for just mask current data.
|
||||
// In this example, we must refetch because we need a count within the time range.
|
||||
fetchContext.timeslice
|
||||
? {
|
||||
from: new Date(fetchContext.timeslice[0]).toISOString(),
|
||||
to: new Date(fetchContext.timeslice[1]).toISOString(),
|
||||
mode: 'absolute' as 'absolute',
|
||||
}
|
||||
: fetchContext.timeRange
|
||||
)
|
||||
.then((nextCount: number) => {
|
||||
if (isUnmounted || isCanceled()) {
|
||||
const fetchSubscription = fetch$(api)
|
||||
.pipe(
|
||||
switchMap(async (fetchContext) => {
|
||||
error$.next(undefined);
|
||||
if (!defaultDataView) {
|
||||
return;
|
||||
}
|
||||
dataLoading$.next(false);
|
||||
count$.next(nextCount);
|
||||
|
||||
try {
|
||||
dataLoading$.next(true);
|
||||
const count = await getCount(
|
||||
defaultDataView,
|
||||
services.data,
|
||||
fetchContext.filters ?? [],
|
||||
fetchContext.query,
|
||||
// timeRange and timeslice provided seperatly so consumers can decide
|
||||
// whether to refetch data for just mask current data.
|
||||
// In this example, we must refetch because we need a count within the time range.
|
||||
fetchContext.timeslice
|
||||
? {
|
||||
from: new Date(fetchContext.timeslice[0]).toISOString(),
|
||||
to: new Date(fetchContext.timeslice[1]).toISOString(),
|
||||
mode: 'absolute' as 'absolute',
|
||||
}
|
||||
: fetchContext.timeRange
|
||||
);
|
||||
return { count };
|
||||
} catch (error) {
|
||||
return { error };
|
||||
}
|
||||
})
|
||||
.catch((err) => {
|
||||
if (isUnmounted || isCanceled()) {
|
||||
return;
|
||||
}
|
||||
dataLoading$.next(false);
|
||||
error$.next(err);
|
||||
});
|
||||
};
|
||||
const unsubscribeFromFetch = onFetchContextChanged({
|
||||
api,
|
||||
onFetch,
|
||||
fetchOnSetup: true,
|
||||
});
|
||||
)
|
||||
.subscribe((next) => {
|
||||
dataLoading$.next(false);
|
||||
if (next && next.hasOwnProperty('count') && next.count !== undefined) {
|
||||
count$.next(next.count);
|
||||
}
|
||||
if (next && next.hasOwnProperty('error')) {
|
||||
error$.next(next.error);
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
api,
|
||||
|
@ -107,8 +104,7 @@ export const getSearchEmbeddableFactory = (services: Services) => {
|
|||
|
||||
useEffect(() => {
|
||||
return () => {
|
||||
isUnmounted = true;
|
||||
unsubscribeFromFetch();
|
||||
fetchSubscription.unsubscribe();
|
||||
};
|
||||
}, []);
|
||||
|
||||
|
|
|
@ -29,10 +29,7 @@ export {
|
|||
type CanAccessViewMode,
|
||||
} from './interfaces/can_access_view_mode';
|
||||
export { initializeTimeRange } from './interfaces/fetch/initialize_time_range';
|
||||
export {
|
||||
onFetchContextChanged,
|
||||
type FetchContext,
|
||||
} from './interfaces/fetch/on_fetch_context_changed';
|
||||
export { fetch$, type FetchContext } from './interfaces/fetch/fetch';
|
||||
export {
|
||||
apiPublishesPartialUnifiedSearch,
|
||||
apiPublishesTimeRange,
|
||||
|
|
|
@ -0,0 +1,307 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { AggregateQuery, Filter, Query, TimeRange } from '@kbn/es-query';
|
||||
import { BehaviorSubject, skip, Subject } from 'rxjs';
|
||||
import { fetch$ } from './fetch';
|
||||
|
||||
describe('onFetchContextChanged', () => {
|
||||
const onFetchMock = jest.fn();
|
||||
const parentApi = {
|
||||
filters$: new BehaviorSubject<Filter[] | undefined>(undefined),
|
||||
query$: new BehaviorSubject<Query | AggregateQuery | undefined>(undefined),
|
||||
reload$: new Subject<void>(),
|
||||
searchSessionId$: new BehaviorSubject<string | undefined>(undefined),
|
||||
timeRange$: new BehaviorSubject<TimeRange | undefined>(undefined),
|
||||
timeslice$: new BehaviorSubject<[number, number] | undefined>(undefined),
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
onFetchMock.mockReset();
|
||||
parentApi.filters$.next(undefined);
|
||||
parentApi.query$.next(undefined);
|
||||
parentApi.searchSessionId$.next(undefined);
|
||||
parentApi.timeRange$.next(undefined);
|
||||
parentApi.timeslice$.next(undefined);
|
||||
});
|
||||
|
||||
describe('searchSessionId', () => {
|
||||
let i = 0;
|
||||
function setSearchSession() {
|
||||
i++;
|
||||
parentApi.searchSessionId$.next(`${i}`);
|
||||
}
|
||||
beforeEach(() => {
|
||||
i = 0;
|
||||
setSearchSession();
|
||||
});
|
||||
|
||||
test('should emit on subscribe when only searchSession is provided', async () => {
|
||||
const api = {
|
||||
parentApi: {
|
||||
searchSessionId$: parentApi.searchSessionId$,
|
||||
},
|
||||
};
|
||||
const subscription = fetch$(api).subscribe(onFetchMock);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.searchSessionId).toBe('1');
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
|
||||
test('should emit once on fetch context changes', async () => {
|
||||
const subscription = fetch$({ parentApi }).pipe(skip(1)).subscribe(onFetchMock);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock).not.toHaveBeenCalled();
|
||||
|
||||
parentApi.filters$.next([]);
|
||||
parentApi.query$.next({ language: 'kquery', query: '' });
|
||||
parentApi.timeRange$.next({
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
});
|
||||
parentApi.timeslice$.next([0, 1]);
|
||||
setSearchSession();
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext).toEqual({
|
||||
filters: [],
|
||||
isReload: true,
|
||||
query: {
|
||||
language: 'kquery',
|
||||
query: '',
|
||||
},
|
||||
searchSessionId: '2',
|
||||
timeRange: {
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
},
|
||||
timeslice: [0, 1],
|
||||
});
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
|
||||
test('should emit once on reload', async () => {
|
||||
const subscription = fetch$({ parentApi }).pipe(skip(1)).subscribe(onFetchMock);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock).not.toHaveBeenCalled();
|
||||
|
||||
parentApi.reload$.next();
|
||||
setSearchSession();
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.isReload).toBe(true);
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
|
||||
test('should emit once on local time range change', async () => {
|
||||
const api = {
|
||||
parentApi,
|
||||
timeRange$: new BehaviorSubject<TimeRange | undefined>(undefined),
|
||||
};
|
||||
const subscription = fetch$(api).pipe(skip(1)).subscribe(onFetchMock);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock).not.toHaveBeenCalled();
|
||||
|
||||
api.timeRange$.next({
|
||||
from: 'now-15m',
|
||||
to: 'now',
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.isReload).toBe(false);
|
||||
expect(fetchContext.timeRange).toEqual({
|
||||
from: 'now-15m',
|
||||
to: 'now',
|
||||
});
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
});
|
||||
|
||||
describe('no searchSession$', () => {
|
||||
test('should emit once on reload', async () => {
|
||||
const subscription = fetch$({ parentApi }).pipe(skip(1)).subscribe(onFetchMock);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock).not.toHaveBeenCalled();
|
||||
|
||||
parentApi.query$.next({ language: 'kquery', query: '' });
|
||||
parentApi.reload$.next();
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.isReload).toBe(true);
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
|
||||
test('should emit once on fetch context changes', async () => {
|
||||
const subscription = fetch$({ parentApi }).pipe(skip(1)).subscribe(onFetchMock);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock).not.toHaveBeenCalled();
|
||||
|
||||
parentApi.filters$.next([]);
|
||||
parentApi.query$.next({ language: 'kquery', query: '' });
|
||||
parentApi.timeRange$.next({
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
});
|
||||
parentApi.timeslice$.next([0, 1]);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext).toEqual({
|
||||
filters: [],
|
||||
isReload: false,
|
||||
query: {
|
||||
language: 'kquery',
|
||||
query: '',
|
||||
},
|
||||
searchSessionId: undefined,
|
||||
timeRange: {
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
},
|
||||
timeslice: [0, 1],
|
||||
});
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
|
||||
describe('local and parent time range', () => {
|
||||
const api = {
|
||||
parentApi,
|
||||
timeRange$: new BehaviorSubject<TimeRange | undefined>(undefined),
|
||||
};
|
||||
beforeEach(() => {
|
||||
api.timeRange$.next({
|
||||
from: 'now-15m',
|
||||
to: 'now',
|
||||
});
|
||||
api.parentApi.timeRange$.next({
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
});
|
||||
});
|
||||
|
||||
test('should emit on subscribe (timeRange is local time range)', async () => {
|
||||
const subscription = fetch$(api).subscribe(onFetchMock);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.timeRange).toEqual({
|
||||
from: 'now-15m',
|
||||
to: 'now',
|
||||
});
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
|
||||
test('should emit once on local time range change', async () => {
|
||||
const subscription = fetch$(api).pipe(skip(1)).subscribe(onFetchMock);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock).not.toHaveBeenCalled();
|
||||
|
||||
api.timeRange$.next({
|
||||
from: 'now-16m',
|
||||
to: 'now',
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.timeRange).toEqual({
|
||||
from: 'now-16m',
|
||||
to: 'now',
|
||||
});
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
|
||||
test('should not emit on parent time range change', async () => {
|
||||
const subscription = fetch$(api).pipe(skip(1)).subscribe(onFetchMock);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock).not.toHaveBeenCalled();
|
||||
|
||||
api.parentApi.timeRange$.next({
|
||||
from: 'now-25h',
|
||||
to: 'now',
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock).not.toHaveBeenCalled();
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
|
||||
test('should emit once when local time range is cleared (timeRange is parent time range)', async () => {
|
||||
const subscription = fetch$(api).pipe(skip(1)).subscribe(onFetchMock);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock).not.toHaveBeenCalled();
|
||||
|
||||
api.timeRange$.next(undefined);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.timeRange).toEqual({
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
});
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
});
|
||||
|
||||
describe('only parent time range', () => {
|
||||
const api = {
|
||||
parentApi,
|
||||
};
|
||||
beforeEach(() => {
|
||||
api.parentApi.timeRange$.next({
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
});
|
||||
});
|
||||
|
||||
test('should emit on subscribe (timeRange is parent time range)', async () => {
|
||||
const subscription = fetch$(api).subscribe(onFetchMock);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.timeRange).toEqual({
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
});
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
|
||||
test('should emit once on parent time range change', async () => {
|
||||
const subscription = fetch$(api).pipe(skip(1)).subscribe(onFetchMock);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock).not.toHaveBeenCalled();
|
||||
|
||||
api.parentApi.timeRange$.next({
|
||||
from: 'now-25h',
|
||||
to: 'now',
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.timeRange).toEqual({
|
||||
from: 'now-25h',
|
||||
to: 'now',
|
||||
});
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,147 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import {
|
||||
combineLatest,
|
||||
debounceTime,
|
||||
delay,
|
||||
filter,
|
||||
map,
|
||||
merge,
|
||||
Observable,
|
||||
of,
|
||||
skip,
|
||||
startWith,
|
||||
Subject,
|
||||
switchMap,
|
||||
takeUntil,
|
||||
tap,
|
||||
} from 'rxjs';
|
||||
import { AggregateQuery, Filter, Query, TimeRange } from '@kbn/es-query';
|
||||
import {
|
||||
apiPublishesTimeRange,
|
||||
apiPublishesUnifiedSearch,
|
||||
PublishesTimeRange,
|
||||
PublishesUnifiedSearch,
|
||||
} from './publishes_unified_search';
|
||||
import { apiPublishesSearchSession, PublishesSearchSession } from './publishes_search_session';
|
||||
import { apiHasParentApi, HasParentApi } from '../has_parent_api';
|
||||
import { apiPublishesReload } from './publishes_reload';
|
||||
|
||||
export interface FetchContext {
|
||||
isReload: boolean;
|
||||
filters: Filter[] | undefined;
|
||||
query: Query | AggregateQuery | undefined;
|
||||
searchSessionId: string | undefined;
|
||||
timeRange: TimeRange | undefined;
|
||||
timeslice: [number, number] | undefined;
|
||||
}
|
||||
|
||||
function getFetchContext(api: unknown, isReload: boolean) {
|
||||
const typeApi = api as Partial<
|
||||
PublishesTimeRange & HasParentApi<Partial<PublishesUnifiedSearch & PublishesSearchSession>>
|
||||
>;
|
||||
return {
|
||||
isReload,
|
||||
filters: typeApi?.parentApi?.filters$?.value,
|
||||
query: typeApi?.parentApi?.query$?.value,
|
||||
searchSessionId: typeApi?.parentApi?.searchSessionId$?.value,
|
||||
timeRange: typeApi?.timeRange$?.value ?? typeApi?.parentApi?.timeRange$?.value,
|
||||
timeslice: typeApi?.timeRange$?.value ? undefined : typeApi?.parentApi?.timeslice$?.value,
|
||||
};
|
||||
}
|
||||
|
||||
function hasSearchSession(api: unknown) {
|
||||
return apiHasParentApi(api) && apiPublishesSearchSession(api.parentApi)
|
||||
? typeof api.parentApi.searchSessionId$.value === 'string'
|
||||
: false;
|
||||
}
|
||||
|
||||
function hasLocalTimeRange(api: unknown) {
|
||||
return apiPublishesTimeRange(api) ? typeof api.timeRange$.value === 'object' : false;
|
||||
}
|
||||
|
||||
// Returns observables that emit to changes after subscribe
|
||||
// 1. Observables are not guaranteed to have an initial value (can not be used in combineLatest)
|
||||
// 2. Observables will not emit on subscribe
|
||||
function getBatchedObservables(api: unknown): Array<Observable<unknown>> {
|
||||
const observables: Array<Observable<unknown>> = [];
|
||||
|
||||
if (apiPublishesTimeRange(api)) {
|
||||
observables.push(api.timeRange$.pipe(skip(1)));
|
||||
}
|
||||
|
||||
if (apiHasParentApi(api) && apiPublishesUnifiedSearch(api.parentApi)) {
|
||||
observables.push(
|
||||
combineLatest([api.parentApi.filters$, api.parentApi.query$]).pipe(
|
||||
skip(1),
|
||||
filter(() => !hasSearchSession(api))
|
||||
)
|
||||
);
|
||||
|
||||
if (apiHasParentApi(api) && apiPublishesTimeRange(api.parentApi)) {
|
||||
const timeObservables: Array<Observable<unknown>> = [api.parentApi.timeRange$];
|
||||
if (api.parentApi.timeslice$) {
|
||||
timeObservables.push(api.parentApi.timeslice$);
|
||||
}
|
||||
observables.push(
|
||||
combineLatest(timeObservables).pipe(
|
||||
skip(1),
|
||||
filter(() => !hasSearchSession(api) && !hasLocalTimeRange(api))
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return observables;
|
||||
}
|
||||
|
||||
// Returns observables that emit to changes after subscribe
|
||||
// 1. Observables are not guaranteed to have an initial value (can not be used in combineLatest)
|
||||
// 2. Observables will not emit on subscribe
|
||||
function getImmediateObservables(api: unknown): Array<Observable<unknown>> {
|
||||
const observables: Array<Observable<unknown>> = [];
|
||||
if (apiHasParentApi(api) && apiPublishesSearchSession(api.parentApi)) {
|
||||
observables.push(api.parentApi.searchSessionId$.pipe(skip(1)));
|
||||
}
|
||||
if (apiHasParentApi(api) && apiPublishesReload(api.parentApi)) {
|
||||
observables.push(api.parentApi.reload$.pipe(filter(() => !hasSearchSession(api))));
|
||||
}
|
||||
return observables;
|
||||
}
|
||||
|
||||
export function fetch$(api: unknown): Observable<FetchContext> {
|
||||
const batchedObservables = getBatchedObservables(api);
|
||||
const immediateObservables = getImmediateObservables(api);
|
||||
|
||||
if (immediateObservables.length === 0) {
|
||||
return merge(...batchedObservables).pipe(
|
||||
startWith(getFetchContext(api, false)),
|
||||
debounceTime(0),
|
||||
map(() => getFetchContext(api, false))
|
||||
);
|
||||
}
|
||||
|
||||
const interrupt = new Subject<void>();
|
||||
const batchedChanges$ = merge(...batchedObservables).pipe(
|
||||
switchMap((value) =>
|
||||
of(value).pipe(
|
||||
delay(0),
|
||||
takeUntil(interrupt),
|
||||
map(() => getFetchContext(api, false))
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
const immediateChange$ = merge(...immediateObservables).pipe(
|
||||
tap(() => interrupt.next()),
|
||||
map(() => getFetchContext(api, true))
|
||||
);
|
||||
|
||||
return merge(immediateChange$, batchedChanges$).pipe(startWith(getFetchContext(api, false)));
|
||||
}
|
|
@ -1,309 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { AggregateQuery, Filter, Query, TimeRange } from '@kbn/es-query';
|
||||
import { BehaviorSubject, Subject } from 'rxjs';
|
||||
import { FetchContext, onFetchContextChanged } from './on_fetch_context_changed';
|
||||
|
||||
describe('onFetchContextChanged', () => {
|
||||
const onFetchMock = jest.fn();
|
||||
const parentApi = {
|
||||
filters$: new BehaviorSubject<Filter[] | undefined>(undefined),
|
||||
query$: new BehaviorSubject<Query | AggregateQuery | undefined>(undefined),
|
||||
reload$: new Subject<void>(),
|
||||
searchSessionId$: new BehaviorSubject<string | undefined>(undefined),
|
||||
timeRange$: new BehaviorSubject<TimeRange | undefined>(undefined),
|
||||
timeslice$: new BehaviorSubject<[number, number] | undefined>(undefined),
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
onFetchMock.mockReset();
|
||||
parentApi.filters$.next(undefined);
|
||||
parentApi.query$.next(undefined);
|
||||
parentApi.searchSessionId$.next(undefined);
|
||||
parentApi.timeRange$.next(undefined);
|
||||
parentApi.timeslice$.next(undefined);
|
||||
});
|
||||
|
||||
it('isCanceled should be true when onFetch triggered before previous onFetch finishes', async () => {
|
||||
const FETCH_TIMEOUT = 10;
|
||||
let calledCount = 0;
|
||||
let completedCallCount = 0;
|
||||
let completedContext: FetchContext | undefined;
|
||||
const onFetchInstrumented = async (context: FetchContext, isCanceled: () => boolean) => {
|
||||
calledCount++;
|
||||
await new Promise((resolve) => setTimeout(resolve, FETCH_TIMEOUT));
|
||||
if (isCanceled()) {
|
||||
return;
|
||||
}
|
||||
completedCallCount++;
|
||||
completedContext = context;
|
||||
};
|
||||
const unsubscribe = onFetchContextChanged({
|
||||
api: { parentApi },
|
||||
onFetch: onFetchInstrumented,
|
||||
fetchOnSetup: false,
|
||||
});
|
||||
|
||||
parentApi.timeRange$.next({
|
||||
from: 'now-25h',
|
||||
to: 'now',
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 1));
|
||||
expect(calledCount).toBe(1);
|
||||
|
||||
parentApi.timeRange$.next({
|
||||
from: 'now-26h',
|
||||
to: 'now',
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 1));
|
||||
expect(calledCount).toBe(2);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, FETCH_TIMEOUT));
|
||||
|
||||
expect(completedCallCount).toBe(1);
|
||||
expect(completedContext?.timeRange).toEqual({
|
||||
from: 'now-26h',
|
||||
to: 'now',
|
||||
});
|
||||
|
||||
unsubscribe();
|
||||
});
|
||||
|
||||
describe('searchSessionId', () => {
|
||||
let i = 0;
|
||||
function setSearchSession() {
|
||||
i++;
|
||||
parentApi.searchSessionId$.next(`${i}`);
|
||||
}
|
||||
beforeEach(() => {
|
||||
i = 0;
|
||||
setSearchSession();
|
||||
});
|
||||
|
||||
it('should call onFetch a single time when fetch context changes', async () => {
|
||||
const unsubscribe = onFetchContextChanged({
|
||||
api: { parentApi },
|
||||
onFetch: onFetchMock,
|
||||
fetchOnSetup: false,
|
||||
});
|
||||
parentApi.filters$.next([]);
|
||||
parentApi.query$.next({ language: 'kquery', query: '' });
|
||||
parentApi.timeRange$.next({
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
});
|
||||
parentApi.timeslice$.next([0, 1]);
|
||||
setSearchSession();
|
||||
await new Promise((resolve) => setTimeout(resolve, 1));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext).toEqual({
|
||||
filters: [],
|
||||
isReload: true,
|
||||
query: {
|
||||
language: 'kquery',
|
||||
query: '',
|
||||
},
|
||||
searchSessionId: '2',
|
||||
timeRange: {
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
},
|
||||
timeslice: [0, 1],
|
||||
});
|
||||
unsubscribe();
|
||||
});
|
||||
|
||||
it('should call onFetch a single time with reload', async () => {
|
||||
const unsubscribe = onFetchContextChanged({
|
||||
api: { parentApi },
|
||||
onFetch: onFetchMock,
|
||||
fetchOnSetup: false,
|
||||
});
|
||||
parentApi.reload$.next();
|
||||
setSearchSession();
|
||||
await new Promise((resolve) => setTimeout(resolve, 1));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
unsubscribe();
|
||||
});
|
||||
|
||||
it('should call onFetch on local time range change and no search session change', async () => {
|
||||
const api = {
|
||||
parentApi,
|
||||
timeRange$: new BehaviorSubject<TimeRange | undefined>(undefined),
|
||||
};
|
||||
const unsubscribe = onFetchContextChanged({
|
||||
api,
|
||||
onFetch: onFetchMock,
|
||||
fetchOnSetup: false,
|
||||
});
|
||||
api.timeRange$.next({
|
||||
from: 'now-15m',
|
||||
to: 'now',
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 1));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.timeRange).toEqual({
|
||||
from: 'now-15m',
|
||||
to: 'now',
|
||||
});
|
||||
unsubscribe();
|
||||
});
|
||||
});
|
||||
|
||||
describe('no searchSession$', () => {
|
||||
it('should call onFetch when reload triggered', async () => {
|
||||
const unsubscribe = onFetchContextChanged({
|
||||
api: { parentApi },
|
||||
onFetch: onFetchMock,
|
||||
fetchOnSetup: false,
|
||||
});
|
||||
parentApi.reload$.next();
|
||||
await new Promise((resolve) => setTimeout(resolve, 1));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.isReload).toBe(true);
|
||||
unsubscribe();
|
||||
});
|
||||
|
||||
describe('local and parent time range', () => {
|
||||
const api = {
|
||||
parentApi,
|
||||
timeRange$: new BehaviorSubject<TimeRange | undefined>(undefined),
|
||||
};
|
||||
beforeEach(() => {
|
||||
api.timeRange$.next({
|
||||
from: 'now-15m',
|
||||
to: 'now',
|
||||
});
|
||||
api.parentApi.timeRange$.next({
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
});
|
||||
});
|
||||
|
||||
it('should call onFetch with local time range', async () => {
|
||||
const unsubscribe = onFetchContextChanged({
|
||||
api,
|
||||
onFetch: onFetchMock,
|
||||
fetchOnSetup: true,
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 1));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.timeRange).toEqual({
|
||||
from: 'now-15m',
|
||||
to: 'now',
|
||||
});
|
||||
unsubscribe();
|
||||
});
|
||||
|
||||
it('should call onFetch when local time range changes', async () => {
|
||||
const unsubscribe = onFetchContextChanged({
|
||||
api,
|
||||
onFetch: onFetchMock,
|
||||
fetchOnSetup: false,
|
||||
});
|
||||
api.timeRange$.next({
|
||||
from: 'now-16m',
|
||||
to: 'now',
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 1));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.timeRange).toEqual({
|
||||
from: 'now-16m',
|
||||
to: 'now',
|
||||
});
|
||||
unsubscribe();
|
||||
});
|
||||
|
||||
it('should not call onFetch when parent time range changes', async () => {
|
||||
const unsubscribe = onFetchContextChanged({
|
||||
api,
|
||||
onFetch: onFetchMock,
|
||||
fetchOnSetup: false,
|
||||
});
|
||||
api.parentApi.timeRange$.next({
|
||||
from: 'now-25h',
|
||||
to: 'now',
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 1));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(0);
|
||||
unsubscribe();
|
||||
});
|
||||
|
||||
it('should call onFetch with parent time range when local time range is cleared', async () => {
|
||||
const unsubscribe = onFetchContextChanged({
|
||||
api,
|
||||
onFetch: onFetchMock,
|
||||
fetchOnSetup: false,
|
||||
});
|
||||
api.timeRange$.next(undefined);
|
||||
await new Promise((resolve) => setTimeout(resolve, 1));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.timeRange).toEqual({
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
});
|
||||
unsubscribe();
|
||||
});
|
||||
});
|
||||
|
||||
describe('only parent time range', () => {
|
||||
const api = {
|
||||
parentApi,
|
||||
};
|
||||
beforeEach(() => {
|
||||
api.parentApi.timeRange$.next({
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
});
|
||||
});
|
||||
|
||||
it('should call onFetch with parent time range', async () => {
|
||||
const unsubscribe = onFetchContextChanged({
|
||||
api,
|
||||
onFetch: onFetchMock,
|
||||
fetchOnSetup: true,
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 1));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.timeRange).toEqual({
|
||||
from: 'now-24h',
|
||||
to: 'now',
|
||||
});
|
||||
unsubscribe();
|
||||
});
|
||||
|
||||
it('should call onFetch when parent time range changes', async () => {
|
||||
const unsubscribe = onFetchContextChanged({
|
||||
api,
|
||||
onFetch: onFetchMock,
|
||||
fetchOnSetup: false,
|
||||
});
|
||||
api.parentApi.timeRange$.next({
|
||||
from: 'now-25h',
|
||||
to: 'now',
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 1));
|
||||
expect(onFetchMock.mock.calls).toHaveLength(1);
|
||||
const fetchContext = onFetchMock.mock.calls[0][0];
|
||||
expect(fetchContext.timeRange).toEqual({
|
||||
from: 'now-25h',
|
||||
to: 'now',
|
||||
});
|
||||
unsubscribe();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,135 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { debounce } from 'lodash';
|
||||
import { combineLatest, Observable, skip, Subscription } from 'rxjs';
|
||||
import { AggregateQuery, Filter, Query, TimeRange } from '@kbn/es-query';
|
||||
import {
|
||||
apiPublishesTimeRange,
|
||||
apiPublishesUnifiedSearch,
|
||||
PublishesTimeRange,
|
||||
PublishesUnifiedSearch,
|
||||
} from './publishes_unified_search';
|
||||
import { apiPublishesSearchSession, PublishesSearchSession } from './publishes_search_session';
|
||||
import { apiHasParentApi, HasParentApi } from '../has_parent_api';
|
||||
import { apiPublishesReload } from './publishes_reload';
|
||||
|
||||
export interface FetchContext {
|
||||
isReload: boolean;
|
||||
filters: Filter[] | undefined;
|
||||
query: Query | AggregateQuery | undefined;
|
||||
searchSessionId: string | undefined;
|
||||
timeRange: TimeRange | undefined;
|
||||
timeslice: [number, number] | undefined;
|
||||
}
|
||||
|
||||
function getFetchContext(api: unknown, isReload: boolean) {
|
||||
const typeApi = api as Partial<
|
||||
PublishesTimeRange & HasParentApi<Partial<PublishesUnifiedSearch & PublishesSearchSession>>
|
||||
>;
|
||||
return {
|
||||
isReload,
|
||||
filters: typeApi?.parentApi?.filters$?.value,
|
||||
query: typeApi?.parentApi?.query$?.value,
|
||||
searchSessionId: typeApi?.parentApi?.searchSessionId$?.value,
|
||||
timeRange: typeApi?.timeRange$?.value ?? typeApi?.parentApi?.timeRange$?.value,
|
||||
timeslice: typeApi?.timeRange$?.value ? undefined : typeApi?.parentApi?.timeslice$?.value,
|
||||
};
|
||||
}
|
||||
|
||||
export function onFetchContextChanged({
|
||||
api,
|
||||
onFetch,
|
||||
fetchOnSetup,
|
||||
}: {
|
||||
api: unknown;
|
||||
onFetch: (fetchContext: FetchContext, isCanceled: () => boolean) => void;
|
||||
fetchOnSetup: boolean;
|
||||
}): () => void {
|
||||
let fetchSymbol: symbol | undefined;
|
||||
const debouncedFetch = debounce(fetch, 0);
|
||||
function fetch(isReload: boolean = false) {
|
||||
const currentFetchSymbol = Symbol();
|
||||
fetchSymbol = currentFetchSymbol;
|
||||
onFetch(getFetchContext(api, isReload), () => fetchSymbol !== currentFetchSymbol);
|
||||
}
|
||||
|
||||
const subscriptions: Subscription[] = [];
|
||||
|
||||
if (apiPublishesTimeRange(api)) {
|
||||
subscriptions.push(
|
||||
api.timeRange$.pipe(skip(1)).subscribe(() => {
|
||||
debouncedFetch();
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
if (apiHasParentApi(api) && apiPublishesSearchSession(api.parentApi)) {
|
||||
subscriptions.push(
|
||||
api.parentApi?.searchSessionId$.pipe(skip(1)).subscribe(() => {
|
||||
debouncedFetch.cancel();
|
||||
fetch(true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
if (apiHasParentApi(api) && apiPublishesUnifiedSearch(api.parentApi)) {
|
||||
subscriptions.push(
|
||||
combineLatest([api.parentApi.filters$, api.parentApi.query$])
|
||||
.pipe(skip(1))
|
||||
.subscribe(() => {
|
||||
// Ignore change when searchSessionId is provided.
|
||||
if ((api?.parentApi as Partial<PublishesSearchSession>)?.searchSessionId$?.value) {
|
||||
return;
|
||||
}
|
||||
debouncedFetch();
|
||||
})
|
||||
);
|
||||
|
||||
if (apiHasParentApi(api) && apiPublishesTimeRange(api.parentApi)) {
|
||||
const timeObservables: Array<Observable<unknown>> = [api.parentApi.timeRange$];
|
||||
if (api.parentApi.timeslice$) {
|
||||
timeObservables.push(api.parentApi.timeslice$);
|
||||
}
|
||||
subscriptions.push(
|
||||
combineLatest(timeObservables)
|
||||
.pipe(skip(1))
|
||||
.subscribe(() => {
|
||||
// Ignore changes when searchSessionId is provided or local time range is provided.
|
||||
if (
|
||||
(api?.parentApi as Partial<PublishesSearchSession>)?.searchSessionId$?.value ||
|
||||
(api as Partial<PublishesTimeRange>).timeRange$?.value
|
||||
) {
|
||||
return;
|
||||
}
|
||||
debouncedFetch();
|
||||
})
|
||||
);
|
||||
}
|
||||
if (apiHasParentApi(api) && apiPublishesReload(api.parentApi)) {
|
||||
subscriptions.push(
|
||||
api.parentApi.reload$.subscribe(() => {
|
||||
// Ignore changes when searchSessionId is provided
|
||||
if ((api?.parentApi as Partial<PublishesSearchSession>)?.searchSessionId$?.value) {
|
||||
return;
|
||||
}
|
||||
debouncedFetch.cancel();
|
||||
fetch(true);
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (fetchOnSetup) {
|
||||
debouncedFetch();
|
||||
}
|
||||
|
||||
return () => {
|
||||
subscriptions.forEach((subcription) => subcription.unsubscribe());
|
||||
};
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue