[Search Sessions] Client side search cache (#92439)

* dev docs

* sessions tutorial

* title

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update dev_docs/tutorials/data/search.mdx

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Code review

* client cache

* mock utils

* improve code

* Use cacheOnClient in Lens

* mock

* docs and types

* unit tests!

* Search response cache + tests

* remove cacheOnClient
evict cache on error

* test ts

* shouldCacheOnClient + improve tests

* remove unused

* clear subs

* dont unsubscribe on setItem

* caching mess

* t

* fix jest

* add size to bfetch response @ppisljar
use it to reduce the # of stringify in response cache

* ts

* ts

* docs

* simplify abort controller logic and extract it into a class

* docs

* delete unused tests

* use addAbortSignal

* code review

* Use shareReplay, fix tests

* code review

* bfetch test

* code review

* Leave the bfetch changes out

* docs + isRestore

* make sure to clean up properly

* Make sure that aborting in cache works correctly
Clearer restructuring of code

* fix test

* import

* code review round 1

* ts

* Added functional test for search request caching

* test

* skip before codefreeze

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Liza Katz 2021-04-16 19:59:23 +03:00 committed by GitHub
parent 106afd41b6
commit c187270b5e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 1350 additions and 60 deletions

View file

@ -0,0 +1,22 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-plugins-data-public](./kibana-plugin-plugins-data-public.md) &gt; [SearchInterceptor](./kibana-plugin-plugins-data-public.searchinterceptor.md) &gt; [getSerializableOptions](./kibana-plugin-plugins-data-public.searchinterceptor.getserializableoptions.md)
## SearchInterceptor.getSerializableOptions() method
<b>Signature:</b>
```typescript
protected getSerializableOptions(options?: ISearchOptions): Pick<ISearchOptions, "strategy" | "sessionId" | "isStored" | "isRestore" | "legacyHitsTotal">;
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| options | <code>ISearchOptions</code> | |
<b>Returns:</b>
`Pick<ISearchOptions, "strategy" | "sessionId" | "isStored" | "isRestore" | "legacyHitsTotal">`

View file

@ -26,6 +26,7 @@ export declare class SearchInterceptor
| Method | Modifiers | Description |
| --- | --- | --- |
| [getSerializableOptions(options)](./kibana-plugin-plugins-data-public.searchinterceptor.getserializableoptions.md) | | |
| [getTimeoutMode()](./kibana-plugin-plugins-data-public.searchinterceptor.gettimeoutmode.md) | | |
| [handleSearchError(e, options, isTimeout)](./kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md) | | |
| [search(request, options)](./kibana-plugin-plugins-data-public.searchinterceptor.search.md) | | Searches using the given <code>search</code> method. Overrides the <code>AbortSignal</code> with one that will abort either when the request times out, or when the original <code>AbortSignal</code> is aborted. Updates <code>pendingCount$</code> when the request is started/finalized. |

View file

@ -16,6 +16,7 @@ export interface IMyStrategyRequest extends IEsSearchRequest {
}
export interface IMyStrategyResponse extends IEsSearchResponse {
cool: string;
executed_at: number;
}
export const SERVER_SEARCH_ROUTE_PATH = '/api/examples/search';

View file

@ -111,7 +111,7 @@ export const SearchExamplesApp = ({
setSelectedNumericField(fields?.length ? getNumeric(fields)[0] : null);
}, [fields]);
const doAsyncSearch = async (strategy?: string) => {
const doAsyncSearch = async (strategy?: string, sessionId?: string) => {
if (!indexPattern || !selectedNumericField) return;
// Construct the query portion of the search request
@ -138,6 +138,7 @@ export const SearchExamplesApp = ({
const searchSubscription$ = data.search
.search(req, {
strategy,
sessionId,
})
.subscribe({
next: (res) => {
@ -148,19 +149,30 @@ export const SearchExamplesApp = ({
? // @ts-expect-error @elastic/elasticsearch no way to declare a type for aggregation in the search response
res.rawResponse.aggregations[1].value
: undefined;
const isCool = (res as IMyStrategyResponse).cool;
const executedAt = (res as IMyStrategyResponse).executed_at;
const message = (
<EuiText>
Searched {res.rawResponse.hits.total} documents. <br />
The average of {selectedNumericField!.name} is{' '}
{avgResult ? Math.floor(avgResult) : 0}.
<br />
Is it Cool? {String((res as IMyStrategyResponse).cool)}
{isCool ? `Is it Cool? ${isCool}` : undefined}
<br />
<EuiText data-test-subj="requestExecutedAt">
{executedAt ? `Executed at? ${executedAt}` : undefined}
</EuiText>
</EuiText>
);
notifications.toasts.addSuccess({
title: 'Query result',
text: mountReactNode(message),
});
notifications.toasts.addSuccess(
{
title: 'Query result',
text: mountReactNode(message),
},
{
toastLifeTimeMs: 300000,
}
);
searchSubscription$.unsubscribe();
} else if (isErrorResponse(res)) {
// TODO: Make response error status clearer
@ -227,6 +239,10 @@ export const SearchExamplesApp = ({
doAsyncSearch('myStrategy');
};
const onClientSideSessionCacheClickHandler = () => {
doAsyncSearch('myStrategy', data.search.session.getSessionId());
};
const onServerClickHandler = async () => {
if (!indexPattern || !selectedNumericField) return;
try {
@ -374,6 +390,45 @@ export const SearchExamplesApp = ({
</EuiButtonEmpty>
</EuiText>
<EuiSpacer />
<EuiTitle size="s">
<h3>Client side search session caching</h3>
</EuiTitle>
<EuiText>
<EuiButtonEmpty
size="xs"
onClick={() => data.search.session.start()}
iconType="alert"
data-test-subj="searchExamplesStartSession"
>
<FormattedMessage
id="searchExamples.startNewSession"
defaultMessage="Start a new session"
/>
</EuiButtonEmpty>
<EuiButtonEmpty
size="xs"
onClick={() => data.search.session.clear()}
iconType="alert"
data-test-subj="searchExamplesClearSession"
>
<FormattedMessage
id="searchExamples.clearSession"
defaultMessage="Clear session"
/>
</EuiButtonEmpty>
<EuiButtonEmpty
size="xs"
onClick={onClientSideSessionCacheClickHandler}
iconType="play"
data-test-subj="searchExamplesCacheSearch"
>
<FormattedMessage
id="searchExamples.myStrategyButtonText"
defaultMessage="Request from low-level client via My Strategy"
/>
</EuiButtonEmpty>
</EuiText>
<EuiSpacer />
<EuiTitle size="s">
<h3>Using search on the server</h3>
</EuiTitle>

View file

@ -20,6 +20,7 @@ export const mySearchStrategyProvider = (
map((esSearchRes) => ({
...esSearchRes,
cool: request.get_cool ? 'YES' : 'NOPE',
executed_at: new Date().getTime(),
}))
),
cancel: async (id, options, deps) => {

View file

@ -139,7 +139,7 @@ export function tabifyAggResponse(
const write = new TabbedAggResponseWriter(aggConfigs, respOpts || {});
const topLevelBucket: AggResponseBucket = {
...esResponse.aggregations,
doc_count: esResponse.hits.total,
doc_count: esResponse.hits?.total,
};
collectBucket(aggConfigs, write, topLevelBucket, '', 1);

View file

@ -2353,6 +2353,8 @@ export class SearchInterceptor {
// (undocumented)
protected readonly deps: SearchInterceptorDeps;
// (undocumented)
protected getSerializableOptions(options?: ISearchOptions): Pick<ISearchOptions, "strategy" | "sessionId" | "isStored" | "isRestore" | "legacyHitsTotal">;
// (undocumented)
protected getTimeoutMode(): TimeoutErrorMode;
// Warning: (ae-forgotten-export) The symbol "KibanaServerError" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "AbortError" needs to be exported by the entry point index.d.ts

View file

@ -113,20 +113,14 @@ export class SearchInterceptor {
}
}
/**
* @internal
* @throws `AbortError` | `ErrorLike`
*/
protected runSearch(
request: IKibanaSearchRequest,
options?: ISearchOptions
): Promise<IKibanaSearchResponse> {
const { abortSignal, sessionId, ...requestOptions } = options || {};
protected getSerializableOptions(options?: ISearchOptions) {
const { sessionId, ...requestOptions } = options || {};
const serializableOptions: ISearchOptionsSerializable = {};
const combined = {
...requestOptions,
...this.deps.session.getSearchOptions(sessionId),
};
const serializableOptions: ISearchOptionsSerializable = {};
if (combined.sessionId !== undefined) serializableOptions.sessionId = combined.sessionId;
if (combined.isRestore !== undefined) serializableOptions.isRestore = combined.isRestore;
@ -135,10 +129,22 @@ export class SearchInterceptor {
if (combined.strategy !== undefined) serializableOptions.strategy = combined.strategy;
if (combined.isStored !== undefined) serializableOptions.isStored = combined.isStored;
return serializableOptions;
}
/**
* @internal
* @throws `AbortError` | `ErrorLike`
*/
protected runSearch(
request: IKibanaSearchRequest,
options?: ISearchOptions
): Promise<IKibanaSearchResponse> {
const { abortSignal } = options || {};
return this.batchedFetch(
{
request,
options: serializableOptions,
options: this.getSerializableOptions(options),
},
abortSignal
);

View file

@ -73,7 +73,7 @@ export interface SearchSessionIndicatorUiConfig {
}
/**
* Responsible for tracking a current search session. Supports only a single session at a time.
* Responsible for tracking a current search session. Supports a single session at a time.
*/
export class SessionService {
public readonly state$: Observable<SearchSessionState>;

View file

@ -21,13 +21,15 @@ describe('search abort controller', () => {
test('immediately aborts when passed an aborted signal in the constructor', () => {
const controller = new AbortController();
controller.abort();
const sac = new SearchAbortController(controller.signal);
const sac = new SearchAbortController();
sac.addAbortSignal(controller.signal);
expect(sac.getSignal().aborted).toBe(true);
});
test('aborts when input signal is aborted', () => {
const controller = new AbortController();
const sac = new SearchAbortController(controller.signal);
const sac = new SearchAbortController();
sac.addAbortSignal(controller.signal);
expect(sac.getSignal().aborted).toBe(false);
controller.abort();
expect(sac.getSignal().aborted).toBe(true);
@ -35,7 +37,8 @@ describe('search abort controller', () => {
test('aborts when all input signals are aborted', () => {
const controller = new AbortController();
const sac = new SearchAbortController(controller.signal);
const sac = new SearchAbortController();
sac.addAbortSignal(controller.signal);
const controller2 = new AbortController();
sac.addAbortSignal(controller2.signal);
@ -48,7 +51,8 @@ describe('search abort controller', () => {
test('aborts explicitly even if all inputs are not aborted', () => {
const controller = new AbortController();
const sac = new SearchAbortController(controller.signal);
const sac = new SearchAbortController();
sac.addAbortSignal(controller.signal);
const controller2 = new AbortController();
sac.addAbortSignal(controller2.signal);
@ -60,7 +64,8 @@ describe('search abort controller', () => {
test('doesnt abort, if cleared', () => {
const controller = new AbortController();
const sac = new SearchAbortController(controller.signal);
const sac = new SearchAbortController();
sac.addAbortSignal(controller.signal);
expect(sac.getSignal().aborted).toBe(false);
sac.cleanup();
controller.abort();
@ -77,7 +82,7 @@ describe('search abort controller', () => {
});
test('doesnt abort on timeout, if cleared', () => {
const sac = new SearchAbortController(undefined, 100);
const sac = new SearchAbortController(100);
expect(sac.getSignal().aborted).toBe(false);
sac.cleanup();
timeTravel(100);
@ -85,7 +90,7 @@ describe('search abort controller', () => {
});
test('aborts on timeout, even if no signals passed in', () => {
const sac = new SearchAbortController(undefined, 100);
const sac = new SearchAbortController(100);
expect(sac.getSignal().aborted).toBe(false);
timeTravel(100);
expect(sac.getSignal().aborted).toBe(true);
@ -94,7 +99,8 @@ describe('search abort controller', () => {
test('aborts on timeout, even if there are unaborted signals', () => {
const controller = new AbortController();
const sac = new SearchAbortController(controller.signal, 100);
const sac = new SearchAbortController(100);
sac.addAbortSignal(controller.signal);
expect(sac.getSignal().aborted).toBe(false);
timeTravel(100);

View file

@ -18,11 +18,7 @@ export class SearchAbortController {
private destroyed = false;
private reason?: AbortReason;
constructor(abortSignal?: AbortSignal, timeout?: number) {
if (abortSignal) {
this.addAbortSignal(abortSignal);
}
constructor(timeout?: number) {
if (timeout) {
this.timeoutSub = timer(timeout).subscribe(() => {
this.reason = AbortReason.Timeout;
@ -41,6 +37,7 @@ export class SearchAbortController {
};
public cleanup() {
if (this.destroyed) return;
this.destroyed = true;
this.timeoutSub?.unsubscribe();
this.inputAbortSignals.forEach((abortSignal) => {

View file

@ -23,9 +23,12 @@ import { bfetchPluginMock } from '../../../../../src/plugins/bfetch/public/mocks
import { BehaviorSubject } from 'rxjs';
import * as xpackResourceNotFoundException from '../../common/search/test_data/search_phase_execution_exception.json';
const timeTravel = (msToRun = 0) => {
const flushPromises = () => new Promise((resolve) => setImmediate(resolve));
const timeTravel = async (msToRun = 0) => {
await flushPromises();
jest.advanceTimersByTime(msToRun);
return new Promise((resolve) => setImmediate(resolve));
return flushPromises();
};
const next = jest.fn();
@ -39,10 +42,20 @@ let fetchMock: jest.Mock<any>;
jest.useFakeTimers();
jest.mock('./utils', () => ({
createRequestHash: jest.fn().mockImplementation((input) => {
return Promise.resolve(JSON.stringify(input));
}),
}));
function mockFetchImplementation(responses: any[]) {
let i = 0;
fetchMock.mockImplementation(() => {
fetchMock.mockImplementation((r) => {
if (!r.request.id) i = 0;
const { time = 0, value = {}, isError = false } = responses[i++];
value.meta = {
size: 10,
};
return new Promise((resolve, reject) =>
setTimeout(() => {
return (isError ? reject : resolve)(value);
@ -452,7 +465,7 @@ describe('EnhancedSearchInterceptor', () => {
});
});
describe('session', () => {
describe('session tracking', () => {
beforeEach(() => {
const responses = [
{
@ -559,4 +572,540 @@ describe('EnhancedSearchInterceptor', () => {
expect(sessionService.trackSearch).toBeCalledTimes(0);
});
});
describe('session client caching', () => {
const sessionId = 'sessionId';
const basicReq = {
params: {
test: 1,
},
};
const basicCompleteResponse = [
{
time: 10,
value: {
isPartial: false,
isRunning: false,
id: 1,
rawResponse: {
took: 1,
},
},
},
];
const partialCompleteResponse = [
{
time: 10,
value: {
isPartial: true,
isRunning: true,
id: 1,
rawResponse: {
took: 1,
},
},
},
{
time: 20,
value: {
isPartial: false,
isRunning: false,
id: 1,
rawResponse: {
took: 1,
},
},
},
];
beforeEach(() => {
sessionService.isCurrentSession.mockImplementation((_sessionId) => _sessionId === sessionId);
sessionService.getSessionId.mockImplementation(() => sessionId);
});
test('should be disabled if there is no session', async () => {
mockFetchImplementation(basicCompleteResponse);
searchInterceptor.search(basicReq, {}).subscribe({ next, error, complete });
expect(fetchMock).toBeCalledTimes(1);
searchInterceptor.search(basicReq, {}).subscribe({ next, error, complete });
expect(fetchMock).toBeCalledTimes(2);
});
test('should fetch different requests in a single session', async () => {
mockFetchImplementation(basicCompleteResponse);
const req2 = {
params: {
test: 2,
},
};
searchInterceptor.search(basicReq, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
searchInterceptor.search(req2, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(2);
});
test('should fetch the same request for two different sessions', async () => {
mockFetchImplementation(basicCompleteResponse);
searchInterceptor.search(basicReq, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
searchInterceptor
.search(basicReq, { sessionId: 'anotherSession' })
.subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(2);
});
test('should track searches that come from cache', async () => {
mockFetchImplementation(partialCompleteResponse);
sessionService.isCurrentSession.mockImplementation((_sessionId) => _sessionId === sessionId);
sessionService.getSessionId.mockImplementation(() => sessionId);
const untrack = jest.fn();
sessionService.trackSearch.mockImplementation(() => untrack);
const req = {
params: {
test: 200,
},
};
const response = searchInterceptor.search(req, { pollInterval: 1, sessionId });
const response2 = searchInterceptor.search(req, { pollInterval: 1, sessionId });
response.subscribe({ next, error, complete });
response2.subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
expect(sessionService.trackSearch).toBeCalledTimes(2);
expect(untrack).not.toBeCalled();
await timeTravel(300);
// Should be called only 2 times (once per partial response)
expect(fetchMock).toBeCalledTimes(2);
expect(sessionService.trackSearch).toBeCalledTimes(2);
expect(untrack).toBeCalledTimes(2);
expect(next).toBeCalledTimes(4);
expect(error).toBeCalledTimes(0);
expect(complete).toBeCalledTimes(2);
});
test('should cache partial responses', async () => {
const responses = [
{
time: 10,
value: {
isPartial: true,
isRunning: true,
id: 1,
},
},
];
mockFetchImplementation(responses);
searchInterceptor.search(basicReq, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
searchInterceptor.search(basicReq, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
});
test('should not cache error responses', async () => {
const responses = [
{
time: 10,
value: {
isPartial: true,
isRunning: false,
id: 1,
},
},
];
mockFetchImplementation(responses);
searchInterceptor.search(basicReq, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
searchInterceptor.search(basicReq, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(2);
});
test('should deliver error to all replays', async () => {
const responses = [
{
time: 10,
value: {
isPartial: true,
isRunning: false,
id: 1,
},
},
];
mockFetchImplementation(responses);
searchInterceptor.search(basicReq, { sessionId }).subscribe({ next, error, complete });
searchInterceptor.search(basicReq, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
expect(error).toBeCalledTimes(2);
expect(error.mock.calls[0][0].message).toEqual('Received partial response');
expect(error.mock.calls[1][0].message).toEqual('Received partial response');
});
test('should ignore anything outside params when hashing', async () => {
mockFetchImplementation(basicCompleteResponse);
const req = {
something: 123,
params: {
test: 1,
},
};
const req2 = {
something: 321,
params: {
test: 1,
},
};
searchInterceptor.search(req, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
searchInterceptor.search(req2, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
});
test('should ignore preference when hashing', async () => {
mockFetchImplementation(basicCompleteResponse);
const req = {
params: {
test: 1,
preference: 123,
},
};
const req2 = {
params: {
test: 1,
preference: 321,
},
};
searchInterceptor.search(req, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
searchInterceptor.search(req2, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
});
test('should return from cache for identical requests in the same session', async () => {
mockFetchImplementation(basicCompleteResponse);
searchInterceptor.search(basicReq, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
searchInterceptor.search(basicReq, { sessionId }).subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
});
test('aborting a search that didnt get any response should retrigger search', async () => {
mockFetchImplementation(basicCompleteResponse);
const abortController = new AbortController();
// Start a search request
searchInterceptor
.search(basicReq, { sessionId, abortSignal: abortController.signal })
.subscribe({ next, error, complete });
// Abort the search request before it started
abortController.abort();
// Time travel to make sure nothing appens
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(0);
expect(next).toBeCalledTimes(0);
expect(error).toBeCalledTimes(1);
expect(complete).toBeCalledTimes(0);
const error2 = jest.fn();
const next2 = jest.fn();
const complete2 = jest.fn();
// Search for the same thing again
searchInterceptor
.search(basicReq, { sessionId })
.subscribe({ next: next2, error: error2, complete: complete2 });
// Should search again
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
expect(next2).toBeCalledTimes(1);
expect(error2).toBeCalledTimes(0);
expect(complete2).toBeCalledTimes(1);
});
test('aborting a running first search shouldnt clear cache', async () => {
mockFetchImplementation(partialCompleteResponse);
sessionService.isCurrentSession.mockImplementation((_sessionId) => _sessionId === sessionId);
sessionService.getSessionId.mockImplementation(() => sessionId);
const untrack = jest.fn();
sessionService.trackSearch.mockImplementation(() => untrack);
const req = {
params: {
test: 200,
},
};
const abortController = new AbortController();
const response = searchInterceptor.search(req, {
pollInterval: 1,
sessionId,
abortSignal: abortController.signal,
});
response.subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
expect(next).toBeCalledTimes(1);
expect(error).toBeCalledTimes(0);
expect(complete).toBeCalledTimes(0);
expect(sessionService.trackSearch).toBeCalledTimes(1);
expect(untrack).not.toBeCalled();
const next2 = jest.fn();
const error2 = jest.fn();
const complete2 = jest.fn();
const response2 = searchInterceptor.search(req, { pollInterval: 1, sessionId });
response2.subscribe({ next: next2, error: error2, complete: complete2 });
await timeTravel(0);
abortController.abort();
await timeTravel(300);
// Both searches should be tracked and untracked
expect(sessionService.trackSearch).toBeCalledTimes(2);
expect(untrack).toBeCalledTimes(2);
// First search should error
expect(next).toBeCalledTimes(1);
expect(error).toBeCalledTimes(1);
expect(complete).toBeCalledTimes(0);
// Second search should complete
expect(next2).toBeCalledTimes(2);
expect(error2).toBeCalledTimes(0);
expect(complete2).toBeCalledTimes(1);
// Should be called only 2 times (once per partial response)
expect(fetchMock).toBeCalledTimes(2);
});
test('aborting a running second search shouldnt clear cache', async () => {
mockFetchImplementation(partialCompleteResponse);
sessionService.isCurrentSession.mockImplementation((_sessionId) => _sessionId === sessionId);
sessionService.getSessionId.mockImplementation(() => sessionId);
const untrack = jest.fn();
sessionService.trackSearch.mockImplementation(() => untrack);
const req = {
params: {
test: 200,
},
};
const abortController = new AbortController();
const response = searchInterceptor.search(req, { pollInterval: 1, sessionId });
response.subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
expect(next).toBeCalledTimes(1);
expect(error).toBeCalledTimes(0);
expect(complete).toBeCalledTimes(0);
expect(sessionService.trackSearch).toBeCalledTimes(1);
expect(untrack).not.toBeCalled();
const next2 = jest.fn();
const error2 = jest.fn();
const complete2 = jest.fn();
const response2 = searchInterceptor.search(req, {
pollInterval: 0,
sessionId,
abortSignal: abortController.signal,
});
response2.subscribe({ next: next2, error: error2, complete: complete2 });
await timeTravel(0);
abortController.abort();
await timeTravel(300);
expect(sessionService.trackSearch).toBeCalledTimes(2);
expect(untrack).toBeCalledTimes(2);
expect(next).toBeCalledTimes(2);
expect(error).toBeCalledTimes(0);
expect(complete).toBeCalledTimes(1);
expect(next2).toBeCalledTimes(1);
expect(error2).toBeCalledTimes(1);
expect(complete2).toBeCalledTimes(0);
// Should be called only 2 times (once per partial response)
expect(fetchMock).toBeCalledTimes(2);
});
test('aborting both requests should cancel underlaying search only once', async () => {
mockFetchImplementation(partialCompleteResponse);
sessionService.isCurrentSession.mockImplementation((_sessionId) => _sessionId === sessionId);
sessionService.getSessionId.mockImplementation(() => sessionId);
sessionService.trackSearch.mockImplementation(() => jest.fn());
const req = {
params: {
test: 200,
},
};
const abortController = new AbortController();
const response = searchInterceptor.search(req, {
pollInterval: 1,
sessionId,
abortSignal: abortController.signal,
});
response.subscribe({ next, error, complete });
const response2 = searchInterceptor.search(req, {
pollInterval: 1,
sessionId,
abortSignal: abortController.signal,
});
response2.subscribe({ next, error, complete });
await timeTravel(10);
abortController.abort();
await timeTravel(300);
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
});
test('aborting both searches should stop searching and clear cache', async () => {
mockFetchImplementation(partialCompleteResponse);
sessionService.isCurrentSession.mockImplementation((_sessionId) => _sessionId === sessionId);
sessionService.getSessionId.mockImplementation(() => sessionId);
const untrack = jest.fn();
sessionService.trackSearch.mockImplementation(() => untrack);
const req = {
params: {
test: 200,
},
};
const abortController = new AbortController();
const response = searchInterceptor.search(req, {
pollInterval: 1,
sessionId,
abortSignal: abortController.signal,
});
response.subscribe({ next, error, complete });
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
const response2 = searchInterceptor.search(req, {
pollInterval: 1,
sessionId,
abortSignal: abortController.signal,
});
response2.subscribe({ next, error, complete });
await timeTravel(0);
expect(fetchMock).toBeCalledTimes(1);
abortController.abort();
await timeTravel(300);
expect(next).toBeCalledTimes(2);
expect(error).toBeCalledTimes(2);
expect(complete).toBeCalledTimes(0);
expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError);
expect(error.mock.calls[1][0]).toBeInstanceOf(AbortError);
// Should be called only 1 times (one partial response)
expect(fetchMock).toBeCalledTimes(1);
// Clear mock and research
fetchMock.mockReset();
mockFetchImplementation(partialCompleteResponse);
// Run the search again to see that we don't hit the cache
const response3 = searchInterceptor.search(req, { pollInterval: 1, sessionId });
response3.subscribe({ next, error, complete });
await timeTravel(10);
await timeTravel(10);
await timeTravel(300);
// Should be called 2 times (two partial response)
expect(fetchMock).toBeCalledTimes(2);
expect(complete).toBeCalledTimes(1);
});
test('aborting a completed search shouldnt effect cache', async () => {
mockFetchImplementation(basicCompleteResponse);
const abortController = new AbortController();
// Start a search request
searchInterceptor
.search(basicReq, { sessionId, abortSignal: abortController.signal })
.subscribe({ next, error, complete });
// Get a final response
await timeTravel(10);
expect(fetchMock).toBeCalledTimes(1);
// Abort the search request
abortController.abort();
// Search for the same thing again
searchInterceptor.search(basicReq, { sessionId }).subscribe({ next, error, complete });
// Get the response from cache
expect(fetchMock).toBeCalledTimes(1);
});
});
});

View file

@ -6,8 +6,19 @@
*/
import { once } from 'lodash';
import { throwError, Subscription } from 'rxjs';
import { tap, finalize, catchError, filter, take, skip } from 'rxjs/operators';
import { throwError, Subscription, from, of, fromEvent, EMPTY } from 'rxjs';
import {
tap,
finalize,
catchError,
filter,
take,
skip,
switchMap,
shareReplay,
map,
takeUntil,
} from 'rxjs/operators';
import {
TimeoutErrorMode,
SearchInterceptor,
@ -16,12 +27,21 @@ import {
IKibanaSearchRequest,
SearchSessionState,
} from '../../../../../src/plugins/data/public';
import { AbortError } from '../../../../../src/plugins/kibana_utils/public';
import { ENHANCED_ES_SEARCH_STRATEGY, IAsyncSearchOptions, pollSearch } from '../../common';
import { SearchResponseCache } from './search_response_cache';
import { createRequestHash } from './utils';
import { SearchAbortController } from './search_abort_controller';
const MAX_CACHE_ITEMS = 50;
const MAX_CACHE_SIZE_MB = 10;
export class EnhancedSearchInterceptor extends SearchInterceptor {
private uiSettingsSub: Subscription;
private searchTimeout: number;
private readonly responseCache: SearchResponseCache = new SearchResponseCache(
MAX_CACHE_ITEMS,
MAX_CACHE_SIZE_MB
);
/**
* @internal
@ -38,6 +58,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
}
public stop() {
this.responseCache.clear();
this.uiSettingsSub.unsubscribe();
}
@ -47,19 +68,31 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
: TimeoutErrorMode.CONTACT;
}
public search({ id, ...request }: IKibanaSearchRequest, options: IAsyncSearchOptions = {}) {
const searchOptions = {
strategy: ENHANCED_ES_SEARCH_STRATEGY,
...options,
private createRequestHash$(request: IKibanaSearchRequest, options: IAsyncSearchOptions) {
const { sessionId, isRestore } = options;
// Preference is used to ensure all queries go to the same set of shards and it doesn't need to be hashed
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-shard-routing.html#shard-and-node-preference
const { preference, ...params } = request.params || {};
const hashOptions = {
...params,
sessionId,
isRestore,
};
const { sessionId, strategy, abortSignal } = searchOptions;
const search = () => this.runSearch({ id, ...request }, searchOptions);
const searchAbortController = new SearchAbortController(abortSignal, this.searchTimeout);
this.pendingCount$.next(this.pendingCount$.getValue() + 1);
const untrackSearch = this.deps.session.isCurrentSession(options.sessionId)
? this.deps.session.trackSearch({ abort: () => searchAbortController.abort() })
: undefined;
return from(sessionId ? createRequestHash(hashOptions) : of(undefined));
}
/**
* @internal
* Creates a new pollSearch that share replays its results
*/
private runSearch$(
{ id, ...request }: IKibanaSearchRequest,
options: IAsyncSearchOptions,
searchAbortController: SearchAbortController
) {
const search = () => this.runSearch({ id, ...request }, options);
const { sessionId, strategy } = options;
// track if this search's session will be send to background
// if yes, then we don't need to cancel this search when it is aborted
@ -91,18 +124,97 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
tap((response) => (id = response.id)),
catchError((e: Error) => {
cancel();
return throwError(this.handleSearchError(e, options, searchAbortController.isTimeout()));
return throwError(e);
}),
finalize(() => {
this.pendingCount$.next(this.pendingCount$.getValue() - 1);
searchAbortController.cleanup();
if (untrackSearch && this.deps.session.isCurrentSession(options.sessionId)) {
// untrack if this search still belongs to current session
untrackSearch();
}
if (savedToBackgroundSub) {
savedToBackgroundSub.unsubscribe();
}
}),
// This observable is cached in the responseCache.
// Using shareReplay makes sure that future subscribers will get the final response
shareReplay(1)
);
}
/**
* @internal
* Creates a new search observable and a corresponding search abort controller
* If requestHash is defined, tries to return them first from cache.
*/
private getSearchResponse$(
request: IKibanaSearchRequest,
options: IAsyncSearchOptions,
requestHash?: string
) {
const cached = requestHash ? this.responseCache.get(requestHash) : undefined;
const searchAbortController =
cached?.searchAbortController || new SearchAbortController(this.searchTimeout);
// Create a new abort signal if one was not passed. This fake signal will never be aborted,
// So the underlaying search will not be aborted, even if the other consumers abort.
searchAbortController.addAbortSignal(options.abortSignal ?? new AbortController().signal);
const response$ = cached?.response$ || this.runSearch$(request, options, searchAbortController);
if (requestHash && !this.responseCache.has(requestHash)) {
this.responseCache.set(requestHash, {
response$,
searchAbortController,
});
}
return {
response$,
searchAbortController,
};
}
public search({ id, ...request }: IKibanaSearchRequest, options: IAsyncSearchOptions = {}) {
const searchOptions = {
strategy: ENHANCED_ES_SEARCH_STRATEGY,
...options,
};
const { sessionId, abortSignal } = searchOptions;
return this.createRequestHash$(request, searchOptions).pipe(
switchMap((requestHash) => {
const { searchAbortController, response$ } = this.getSearchResponse$(
request,
searchOptions,
requestHash
);
this.pendingCount$.next(this.pendingCount$.getValue() + 1);
const untrackSearch = this.deps.session.isCurrentSession(sessionId)
? this.deps.session.trackSearch({ abort: () => searchAbortController.abort() })
: undefined;
// Abort the replay if the abortSignal is aborted.
// The underlaying search will not abort unless searchAbortController fires.
const aborted$ = (abortSignal ? fromEvent(abortSignal, 'abort') : EMPTY).pipe(
map(() => {
throw new AbortError();
})
);
return response$.pipe(
takeUntil(aborted$),
catchError((e) => {
return throwError(
this.handleSearchError(e, searchOptions, searchAbortController.isTimeout())
);
}),
finalize(() => {
this.pendingCount$.next(this.pendingCount$.getValue() - 1);
if (untrackSearch && this.deps.session.isCurrentSession(sessionId)) {
// untrack if this search still belongs to current session
untrackSearch();
}
})
);
})
);
}

View file

@ -0,0 +1,318 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { interval, Observable, of, throwError } from 'rxjs';
import { shareReplay, switchMap, take } from 'rxjs/operators';
import { IKibanaSearchResponse } from 'src/plugins/data/public';
import { SearchAbortController } from './search_abort_controller';
import { SearchResponseCache } from './search_response_cache';
describe('SearchResponseCache', () => {
let cache: SearchResponseCache;
let searchAbortController: SearchAbortController;
const r: Array<IKibanaSearchResponse<any>> = [
{
isPartial: true,
isRunning: true,
rawResponse: {
t: 1,
},
},
{
isPartial: true,
isRunning: true,
rawResponse: {
t: 2,
},
},
{
isPartial: true,
isRunning: true,
rawResponse: {
t: 3,
},
},
{
isPartial: false,
isRunning: false,
rawResponse: {
t: 4,
},
},
];
function getSearchObservable$(responses: Array<IKibanaSearchResponse<any>> = r) {
return interval(100).pipe(
take(responses.length),
switchMap((value: number, i: number) => {
if (responses[i].rawResponse.throw === true) {
return throwError('nooo');
} else {
return of(responses[i]);
}
}),
shareReplay(1)
);
}
function wrapWithAbortController(response$: Observable<IKibanaSearchResponse<any>>) {
return {
response$,
searchAbortController,
};
}
beforeEach(() => {
cache = new SearchResponseCache(3, 0.1);
searchAbortController = new SearchAbortController();
});
describe('Cache eviction', () => {
test('clear evicts all', () => {
const finalResult = r[r.length - 1];
cache.set('123', wrapWithAbortController(of(finalResult)));
cache.set('234', wrapWithAbortController(of(finalResult)));
cache.clear();
expect(cache.get('123')).toBeUndefined();
expect(cache.get('234')).toBeUndefined();
});
test('evicts searches that threw an exception', async () => {
const res$ = getSearchObservable$();
const err$ = getSearchObservable$([
{
isPartial: true,
isRunning: true,
rawResponse: {
t: 'a'.repeat(1000),
},
},
{
isPartial: true,
isRunning: true,
rawResponse: {
throw: true,
},
},
]);
cache.set('123', wrapWithAbortController(err$));
cache.set('234', wrapWithAbortController(res$));
const errHandler = jest.fn();
await err$.toPromise().catch(errHandler);
await res$.toPromise().catch(errHandler);
expect(errHandler).toBeCalledTimes(1);
expect(cache.get('123')).toBeUndefined();
expect(cache.get('234')).not.toBeUndefined();
});
test('evicts searches that returned an error response', async () => {
const err$ = getSearchObservable$([
{
isPartial: true,
isRunning: true,
rawResponse: {
t: 1,
},
},
{
isPartial: true,
isRunning: false,
rawResponse: {
t: 2,
},
},
]);
cache.set('123', wrapWithAbortController(err$));
const errHandler = jest.fn();
await err$.toPromise().catch(errHandler);
expect(errHandler).toBeCalledTimes(0);
expect(cache.get('123')).toBeUndefined();
});
test('evicts oldest item if has too many cached items', async () => {
const finalResult = r[r.length - 1];
cache.set('123', wrapWithAbortController(of(finalResult)));
cache.set('234', wrapWithAbortController(of(finalResult)));
cache.set('345', wrapWithAbortController(of(finalResult)));
cache.set('456', wrapWithAbortController(of(finalResult)));
expect(cache.get('123')).toBeUndefined();
expect(cache.get('234')).not.toBeUndefined();
expect(cache.get('345')).not.toBeUndefined();
expect(cache.get('456')).not.toBeUndefined();
});
test('evicts oldest item if cache gets bigger than max size', async () => {
const largeResult$ = getSearchObservable$([
{
isPartial: true,
isRunning: true,
rawResponse: {
t: 'a'.repeat(1000),
},
},
{
isPartial: false,
isRunning: false,
rawResponse: {
t: 'a'.repeat(50000),
},
},
]);
cache.set('123', wrapWithAbortController(largeResult$));
cache.set('234', wrapWithAbortController(largeResult$));
cache.set('345', wrapWithAbortController(largeResult$));
await largeResult$.toPromise();
expect(cache.get('123')).toBeUndefined();
expect(cache.get('234')).not.toBeUndefined();
expect(cache.get('345')).not.toBeUndefined();
});
test('evicts from cache any single item that gets bigger than max size', async () => {
const largeResult$ = getSearchObservable$([
{
isPartial: true,
isRunning: true,
rawResponse: {
t: 'a'.repeat(500),
},
},
{
isPartial: false,
isRunning: false,
rawResponse: {
t: 'a'.repeat(500000),
},
},
]);
cache.set('234', wrapWithAbortController(largeResult$));
await largeResult$.toPromise();
expect(cache.get('234')).toBeUndefined();
});
test('get updates the insertion time of an item', async () => {
const finalResult = r[r.length - 1];
cache.set('123', wrapWithAbortController(of(finalResult)));
cache.set('234', wrapWithAbortController(of(finalResult)));
cache.set('345', wrapWithAbortController(of(finalResult)));
cache.get('123');
cache.get('234');
cache.set('456', wrapWithAbortController(of(finalResult)));
expect(cache.get('123')).not.toBeUndefined();
expect(cache.get('234')).not.toBeUndefined();
expect(cache.get('345')).toBeUndefined();
expect(cache.get('456')).not.toBeUndefined();
});
});
describe('Observable behavior', () => {
test('caches a response and re-emits it', async () => {
const s$ = getSearchObservable$();
cache.set('123', wrapWithAbortController(s$));
const finalRes = await cache.get('123')!.response$.toPromise();
expect(finalRes).toStrictEqual(r[r.length - 1]);
});
test('cached$ should emit same as original search$', async () => {
const s$ = getSearchObservable$();
cache.set('123', wrapWithAbortController(s$));
const next = jest.fn();
const cached$ = cache.get('123');
cached$!.response$.subscribe({
next,
});
// wait for original search to complete
await s$!.toPromise();
// get final response from cached$
const finalRes = await cached$!.response$.toPromise();
expect(finalRes).toStrictEqual(r[r.length - 1]);
expect(next).toHaveBeenCalledTimes(4);
});
test('cached$ should emit only current value and keep emitting if subscribed while search$ is running', async () => {
const s$ = getSearchObservable$();
cache.set('123', wrapWithAbortController(s$));
const next = jest.fn();
let cached$: Observable<IKibanaSearchResponse<any>> | undefined;
s$.subscribe({
next: (res) => {
if (res.rawResponse.t === 3) {
cached$ = cache.get('123')!.response$;
cached$!.subscribe({
next,
});
}
},
});
// wait for original search to complete
await s$!.toPromise();
const finalRes = await cached$!.toPromise();
expect(finalRes).toStrictEqual(r[r.length - 1]);
expect(next).toHaveBeenCalledTimes(2);
});
test('cached$ should emit only last value if subscribed after search$ was complete 1', async () => {
const finalResult = r[r.length - 1];
const s$ = wrapWithAbortController(of(finalResult));
cache.set('123', s$);
// wait for original search to complete
await s$!.response$.toPromise();
const next = jest.fn();
const cached$ = cache.get('123');
cached$!.response$.subscribe({
next,
});
const finalRes = await cached$!.response$.toPromise();
expect(finalRes).toStrictEqual(r[r.length - 1]);
expect(next).toHaveBeenCalledTimes(1);
});
test('cached$ should emit only last value if subscribed after search$ was complete', async () => {
const s$ = getSearchObservable$();
cache.set('123', wrapWithAbortController(s$));
// wait for original search to complete
await s$!.toPromise();
const next = jest.fn();
const cached$ = cache.get('123');
cached$!.response$.subscribe({
next,
});
const finalRes = await cached$!.response$.toPromise();
expect(finalRes).toStrictEqual(r[r.length - 1]);
expect(next).toHaveBeenCalledTimes(1);
});
});
});

View file

@ -0,0 +1,136 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Observable, Subscription } from 'rxjs';
import { IKibanaSearchResponse, isErrorResponse } from '../../../../../src/plugins/data/public';
import { SearchAbortController } from './search_abort_controller';
interface ResponseCacheItem {
response$: Observable<IKibanaSearchResponse<any>>;
searchAbortController: SearchAbortController;
}
interface ResponseCacheItemInternal {
response$: Observable<IKibanaSearchResponse<any>>;
searchAbortController: SearchAbortController;
size: number;
subs: Subscription;
}
export class SearchResponseCache {
private responseCache: Map<string, ResponseCacheItemInternal>;
private cacheSize = 0;
constructor(private maxItems: number, private maxCacheSizeMB: number) {
this.responseCache = new Map();
}
private byteToMb(size: number) {
return size / (1024 * 1024);
}
private deleteItem(key: string, clearSubs = true) {
const item = this.responseCache.get(key);
if (item) {
if (clearSubs) {
item.subs.unsubscribe();
}
this.cacheSize -= item.size;
this.responseCache.delete(key);
}
}
private setItem(key: string, item: ResponseCacheItemInternal) {
// The deletion of the key will move it to the end of the Map's entries.
this.deleteItem(key, false);
this.cacheSize += item.size;
this.responseCache.set(key, item);
}
public clear() {
this.cacheSize = 0;
this.responseCache.forEach((item) => {
item.subs.unsubscribe();
});
this.responseCache.clear();
}
private shrink() {
while (
this.responseCache.size > this.maxItems ||
this.byteToMb(this.cacheSize) > this.maxCacheSizeMB
) {
const [key] = [...this.responseCache.keys()];
this.deleteItem(key);
}
}
public has(key: string) {
return this.responseCache.has(key);
}
/**
*
* @param key key to cache
* @param response$
* @returns A ReplaySubject that mimics the behavior of the original observable
* @throws error if key already exists
*/
public set(key: string, item: ResponseCacheItem) {
if (this.responseCache.has(key)) {
throw new Error('duplicate key');
}
const { response$, searchAbortController } = item;
const cacheItem: ResponseCacheItemInternal = {
response$,
searchAbortController,
subs: new Subscription(),
size: 0,
};
this.setItem(key, cacheItem);
cacheItem.subs.add(
response$.subscribe({
next: (r) => {
// TODO: avoid stringiying. Get the size some other way!
const newSize = new Blob([JSON.stringify(r)]).size;
if (this.byteToMb(newSize) < this.maxCacheSizeMB && !isErrorResponse(r)) {
this.setItem(key, {
...cacheItem,
size: newSize,
});
this.shrink();
} else {
// Single item is too large to be cached, or an error response returned.
// Evict and ignore.
this.deleteItem(key);
}
},
error: (e) => {
// Evict item on error
this.deleteItem(key);
},
})
);
this.shrink();
}
public get(key: string): ResponseCacheItem | undefined {
const item = this.responseCache.get(key);
if (item) {
// touch the item, and move it to the end of the map's entries
this.setItem(key, item);
return {
response$: item.response$,
searchAbortController: item.searchAbortController,
};
}
}
}

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import stringify from 'json-stable-stringify';
export async function createRequestHash(keys: Record<string, any>) {
const msgBuffer = new TextEncoder().encode(stringify(keys));
const hashBuffer = await crypto.subtle.digest('SHA-256', msgBuffer);
const hashArray = Array.from(new Uint8Array(hashBuffer));
return hashArray.map((b) => ('00' + b.toString(16)).slice(-2)).join('');
}

View file

@ -82,6 +82,8 @@ export function App({
dashboardFeatureFlag,
} = useKibana<LensAppServices>().services;
const startSession = useCallback(() => data.search.session.start(), [data]);
const [state, setState] = useState<LensAppState>(() => {
return {
query: data.query.queryString.getQuery(),
@ -96,7 +98,7 @@ export function App({
isSaveModalVisible: false,
indicateNoData: false,
isSaveable: false,
searchSessionId: data.search.session.start(),
searchSessionId: startSession(),
};
});
@ -178,7 +180,7 @@ export function App({
setState((s) => ({
...s,
filters: data.query.filterManager.getFilters(),
searchSessionId: data.search.session.start(),
searchSessionId: startSession(),
}));
trackUiEvent('app_filters_updated');
},
@ -188,7 +190,7 @@ export function App({
next: () => {
setState((s) => ({
...s,
searchSessionId: data.search.session.start(),
searchSessionId: startSession(),
}));
},
});
@ -199,7 +201,7 @@ export function App({
tap(() => {
setState((s) => ({
...s,
searchSessionId: data.search.session.start(),
searchSessionId: startSession(),
}));
}),
switchMap((done) =>
@ -234,6 +236,7 @@ export function App({
data.query,
history,
initialContext,
startSession,
]);
useEffect(() => {
@ -652,7 +655,7 @@ export function App({
// Time change will be picked up by the time subscription
setState((s) => ({
...s,
searchSessionId: data.search.session.start(),
searchSessionId: startSession(),
}));
trackUiEvent('app_query_change');
}

View file

@ -23,6 +23,7 @@ export default function ({ getService, loadTestFile }: PluginFunctionalProviderC
await esArchiver.unload('lens/basic');
});
loadTestFile(require.resolve('./search_sessions_cache'));
loadTestFile(require.resolve('./search_session_example'));
});
}

View file

@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../functional/ftr_provider_context';
// eslint-disable-next-line import/no-default-export
export default function ({ getService, getPageObjects }: FtrProviderContext) {
const testSubjects = getService('testSubjects');
const PageObjects = getPageObjects(['common']);
const toasts = getService('toasts');
const retry = getService('retry');
async function getExecutedAt() {
const toast = await toasts.getToastElement(1);
const timeElem = await testSubjects.findDescendant('requestExecutedAt', toast);
const text = await timeElem.getVisibleText();
await toasts.dismissAllToasts();
await retry.waitFor('toasts gone', async () => {
return (await toasts.getToastCount()) === 0;
});
return text;
}
describe.skip('Search session client side cache', () => {
const appId = 'searchExamples';
before(async function () {
await PageObjects.common.navigateToApp(appId, { insertTimestamp: false });
});
it('should cache responses by search session id', async () => {
await testSubjects.click('searchExamplesCacheSearch');
const noSessionExecutedAt = await getExecutedAt();
// Expect searches executed in a session to share a response
await testSubjects.click('searchExamplesStartSession');
await testSubjects.click('searchExamplesCacheSearch');
const withSessionExecutedAt = await getExecutedAt();
await testSubjects.click('searchExamplesCacheSearch');
const withSessionExecutedAt2 = await getExecutedAt();
expect(withSessionExecutedAt2).to.equal(withSessionExecutedAt);
expect(withSessionExecutedAt).not.to.equal(noSessionExecutedAt);
// Expect new session to run search again
await testSubjects.click('searchExamplesStartSession');
await testSubjects.click('searchExamplesCacheSearch');
const secondSessionExecutedAt = await getExecutedAt();
expect(secondSessionExecutedAt).not.to.equal(withSessionExecutedAt);
// Clear session
await testSubjects.click('searchExamplesClearSession');
await testSubjects.click('searchExamplesCacheSearch');
const afterClearSession1 = await getExecutedAt();
await testSubjects.click('searchExamplesCacheSearch');
const afterClearSession2 = await getExecutedAt();
expect(secondSessionExecutedAt).not.to.equal(afterClearSession1);
expect(afterClearSession2).not.to.equal(afterClearSession1);
});
});
}