[Search Sessions] Optimize search session so updates (#142850)

This commit is contained in:
Anton Dosov 2022-10-11 20:42:47 +02:00 committed by GitHub
parent b5bacc3cbe
commit a6b1c7a242
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 109 additions and 307 deletions

View file

@ -19,7 +19,7 @@ How long {kib} stores search results from unsaved sessions,
after the last search in the session completes. The default is `5m`.
`data.search.sessions.maxUpdateRetries` {ess-icon}::
How many retries {kib} can perform while attempting to save a search session. The default is `3`.
How many retries {kib} can perform while attempting to save a search session. The default is `10`.
`data.search.sessions.defaultExpiration` {ess-icon}::
How long search session results are stored before they are deleted.

View file

@ -23,7 +23,7 @@ export const searchSessionsConfigSchema = schema.object({
/**
* maxUpdateRetries controls how many retries we perform while attempting to save a search session
*/
maxUpdateRetries: schema.number({ defaultValue: 3 }),
maxUpdateRetries: schema.number({ defaultValue: 10 }),
/**
* defaultExpiration controls how long search sessions are valid for, until they are expired.

View file

@ -87,11 +87,11 @@ export function searchUsageObserver(
return {
next(response: IEsSearchResponse) {
if (isRestore || !isCompleteResponse(response)) return;
logger.debug(`trackSearchStatus:next ${response.rawResponse.took}`);
logger.debug(`trackSearchStatus:success, took:${response.rawResponse.took}`);
usage?.trackSuccess(response.rawResponse.took);
},
error() {
logger.debug(`trackSearchStatus:error`);
error(e: Error) {
logger.debug(`trackSearchStatus:error, ${e}`);
usage?.trackError();
},
};

View file

@ -187,43 +187,12 @@ describe('SearchSessionService', () => {
).rejects.toMatchInlineSnapshot(`[Error: locatorId is required]`);
});
it('saving updates an existing saved object and persists it', async () => {
const mockUpdateSavedObject = {
...mockSavedObject,
attributes: {},
};
savedObjectsClient.get.mockResolvedValue(mockSavedObject);
savedObjectsClient.update.mockResolvedValue(mockUpdateSavedObject);
await service.save({ savedObjectsClient }, mockUser1, sessionId, {
name: 'banana',
appId: 'nanana',
locatorId: 'panama',
});
expect(savedObjectsClient.update).toHaveBeenCalled();
expect(savedObjectsClient.create).not.toHaveBeenCalled();
const [type, id, callAttributes] = savedObjectsClient.update.mock.calls[0];
expect(type).toBe(SEARCH_SESSION_TYPE);
expect(id).toBe(sessionId);
expect(callAttributes).not.toHaveProperty('idMapping');
expect(callAttributes).toHaveProperty('name', 'banana');
expect(callAttributes).toHaveProperty('appId', 'nanana');
expect(callAttributes).toHaveProperty('locatorId', 'panama');
expect(callAttributes).toHaveProperty('initialState', {});
expect(callAttributes).toHaveProperty('restoreState', {});
});
it('saving creates a new persisted saved object, if it did not exist', async () => {
it('saving creates a new persisted saved object', async () => {
const mockCreatedSavedObject = {
...mockSavedObject,
attributes: {},
};
savedObjectsClient.update.mockRejectedValue(
SavedObjectsErrorHelpers.createGenericNotFoundError(sessionId)
);
savedObjectsClient.create.mockResolvedValue(mockCreatedSavedObject);
await service.save({ savedObjectsClient }, mockUser1, sessionId, {
@ -232,7 +201,7 @@ describe('SearchSessionService', () => {
locatorId: 'panama',
});
expect(savedObjectsClient.update).toHaveBeenCalledTimes(1);
expect(savedObjectsClient.update).toHaveBeenCalledTimes(0);
expect(savedObjectsClient.create).toHaveBeenCalledTimes(1);
const [type, callAttributes, options] = savedObjectsClient.create.mock.calls[0];
@ -738,147 +707,19 @@ describe('SearchSessionService', () => {
});
});
it('retries updating the saved object if there was a ES conflict 409', async () => {
it('passes retryOnConflict param to es', async () => {
const searchRequest = { params: {} };
const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0';
const mockUpdateSavedObject = {
...mockSavedObject,
attributes: {},
};
let counter = 0;
savedObjectsClient.update.mockImplementation(() => {
return new Promise((resolve, reject) => {
if (counter === 0) {
counter++;
reject(SavedObjectsErrorHelpers.createConflictError(SEARCH_SESSION_TYPE, searchId));
} else {
resolve(mockUpdateSavedObject);
}
});
});
await service.trackId({ savedObjectsClient }, mockUser1, searchRequest, searchId, {
sessionId,
strategy: MOCK_STRATEGY,
});
expect(savedObjectsClient.update).toHaveBeenCalledTimes(2);
expect(savedObjectsClient.create).not.toHaveBeenCalled();
});
it('retries updating the saved object if theres a ES conflict 409, but stops after MAX_RETRIES times', async () => {
const searchRequest = { params: {} };
const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0';
savedObjectsClient.update.mockImplementation(() => {
return new Promise((resolve, reject) => {
reject(SavedObjectsErrorHelpers.createConflictError(SEARCH_SESSION_TYPE, searchId));
});
});
await service.trackId({ savedObjectsClient }, mockUser1, searchRequest, searchId, {
sessionId,
strategy: MOCK_STRATEGY,
});
// Track ID doesn't throw errors even in cases of failure!
expect(savedObjectsClient.update).toHaveBeenCalledTimes(MAX_UPDATE_RETRIES);
expect(savedObjectsClient.create).not.toHaveBeenCalled();
});
it('creates the saved object in non persisted state, if search session doesnt exists', async () => {
const searchRequest = { params: {} };
const requestHash = createRequestHash(searchRequest.params);
const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0';
const mockCreatedSavedObject = {
...mockSavedObject,
attributes: {},
};
savedObjectsClient.update.mockRejectedValue(
SavedObjectsErrorHelpers.createGenericNotFoundError(sessionId)
);
savedObjectsClient.create.mockResolvedValue(mockCreatedSavedObject);
await service.trackId({ savedObjectsClient }, mockUser1, searchRequest, searchId, {
sessionId,
strategy: MOCK_STRATEGY,
});
expect(savedObjectsClient.update).toHaveBeenCalled();
expect(savedObjectsClient.create).toHaveBeenCalled();
const [type, callAttributes, options] = savedObjectsClient.create.mock.calls[0];
expect(type).toBe(SEARCH_SESSION_TYPE);
expect(options).toStrictEqual({ id: sessionId });
expect(callAttributes).toHaveProperty('idMapping', {
[requestHash]: {
id: searchId,
strategy: MOCK_STRATEGY,
},
});
expect(callAttributes).toHaveProperty('expires');
expect(callAttributes).toHaveProperty('created');
expect(callAttributes).toHaveProperty('sessionId', sessionId);
});
it('retries updating if update returned 404 and then update returned conflict 409 (first create race condition)', async () => {
const searchRequest = { params: {} };
const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0';
const mockUpdateSavedObject = {
...mockSavedObject,
attributes: {},
};
let counter = 0;
savedObjectsClient.update.mockImplementation(() => {
return new Promise((resolve, reject) => {
if (counter === 0) {
counter++;
reject(SavedObjectsErrorHelpers.createGenericNotFoundError(sessionId));
} else {
resolve(mockUpdateSavedObject);
}
});
});
savedObjectsClient.create.mockRejectedValue(
SavedObjectsErrorHelpers.createConflictError(SEARCH_SESSION_TYPE, searchId)
);
await service.trackId({ savedObjectsClient }, mockUser1, searchRequest, searchId, {
sessionId,
strategy: MOCK_STRATEGY,
});
expect(savedObjectsClient.update).toHaveBeenCalledTimes(2);
expect(savedObjectsClient.create).toHaveBeenCalledTimes(1);
});
it('retries everything at most MAX_RETRIES times', async () => {
const searchRequest = { params: {} };
const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0';
savedObjectsClient.update.mockRejectedValue(
SavedObjectsErrorHelpers.createGenericNotFoundError(sessionId)
);
savedObjectsClient.create.mockRejectedValue(
SavedObjectsErrorHelpers.createConflictError(SEARCH_SESSION_TYPE, searchId)
);
await service.trackId({ savedObjectsClient }, mockUser1, searchRequest, searchId, {
sessionId,
strategy: MOCK_STRATEGY,
});
expect(savedObjectsClient.update).toHaveBeenCalledTimes(MAX_UPDATE_RETRIES);
expect(savedObjectsClient.create).toHaveBeenCalledTimes(MAX_UPDATE_RETRIES);
const [, , , opts] = savedObjectsClient.update.mock.calls[0];
expect(opts).toHaveProperty('retryOnConflict', MAX_UPDATE_RETRIES);
});
it('batches updates for the same session', async () => {

View file

@ -7,7 +7,6 @@
*/
import { notFound } from '@hapi/boom';
import { debounce } from 'lodash';
import { fromKueryExpression, nodeBuilder } from '@kbn/es-query';
import {
CoreSetup,
@ -16,11 +15,12 @@ import {
Logger,
SavedObject,
SavedObjectsClientContract,
SavedObjectsErrorHelpers,
SavedObjectsFindOptions,
ElasticsearchClient,
} from '@kbn/core/server';
import type { AuthenticatedUser, SecurityPluginSetup } from '@kbn/security-plugin/server';
import { defer } from '@kbn/kibana-utils-plugin/common';
import { debounce } from 'lodash';
import {
ENHANCED_ES_SEARCH_STRATEGY,
IKibanaSearchRequest,
@ -51,24 +51,24 @@ interface SetupDependencies {
// eslint-disable-next-line @typescript-eslint/no-empty-interface
interface StartDependencies {}
const DEBOUNCE_UPDATE_OR_CREATE_WAIT = 1000;
const DEBOUNCE_UPDATE_OR_CREATE_MAX_WAIT = 5000;
interface UpdateOrCreateQueueEntry {
/**
* Used to batch requests that add searches into the session saved object
*/
const DEBOUNCE_TRACK_ID_WAIT = 1000;
const DEBOUNCE_TRACK_ID_MAX_WAIT = 5000;
interface TrackIdQueueEntry {
deps: SearchSessionDependencies;
user: AuthenticatedUser | null;
sessionId: string;
attributes: Partial<SearchSessionSavedObjectAttributes>;
resolve: () => void;
reject: (reason?: unknown) => void;
searchInfo: SearchSessionRequestInfo;
requestHash: string;
}
function sleep(ms: number) {
return new Promise((r) => setTimeout(r, ms));
}
export class SearchSessionService implements ISearchSessionService {
private sessionConfig: SearchSessionsConfigSchema;
private readonly updateOrCreateBatchQueue: UpdateOrCreateQueueEntry[] = [];
private security?: SecurityPluginSetup;
private setupCompleted = false;
@ -93,109 +93,6 @@ export class SearchSessionService implements ISearchSessionService {
public stop() {}
private processUpdateOrCreateBatchQueue = debounce(
() => {
const queue = [...this.updateOrCreateBatchQueue];
if (queue.length === 0) return;
this.updateOrCreateBatchQueue.length = 0;
const batchedSessionAttributes = queue.reduce((res, next) => {
if (!res[next.sessionId]) {
res[next.sessionId] = next.attributes;
} else {
res[next.sessionId] = {
...res[next.sessionId],
...next.attributes,
idMapping: {
...res[next.sessionId].idMapping,
...next.attributes.idMapping,
},
};
}
return res;
}, {} as { [sessionId: string]: Partial<SearchSessionSavedObjectAttributes> });
Object.keys(batchedSessionAttributes).forEach((sessionId) => {
const thisSession = queue.filter((s) => s.sessionId === sessionId);
this.updateOrCreate(
thisSession[0].deps,
thisSession[0].user,
sessionId,
batchedSessionAttributes[sessionId]
)
.then(() => {
thisSession.forEach((s) => s.resolve());
})
.catch((e) => {
thisSession.forEach((s) => s.reject(e));
});
});
},
DEBOUNCE_UPDATE_OR_CREATE_WAIT,
{ maxWait: DEBOUNCE_UPDATE_OR_CREATE_MAX_WAIT }
);
private scheduleUpdateOrCreate = (
deps: SearchSessionDependencies,
user: AuthenticatedUser | null,
sessionId: string,
attributes: Partial<SearchSessionSavedObjectAttributes>
): Promise<void> => {
return new Promise((resolve, reject) => {
this.updateOrCreateBatchQueue.push({ deps, user, sessionId, attributes, resolve, reject });
// TODO: this would be better if we'd debounce per sessionId
this.processUpdateOrCreateBatchQueue();
});
};
private updateOrCreate = async (
deps: SearchSessionDependencies,
user: AuthenticatedUser | null,
sessionId: string,
attributes: Partial<SearchSessionSavedObjectAttributes>,
retry: number = 1
): Promise<SavedObject<SearchSessionSavedObjectAttributes> | undefined> => {
const retryOnConflict = async (e: any) => {
this.logger.debug(`Conflict error | ${sessionId}`);
// Randomize sleep to spread updates out in case of conflicts
await sleep(100 + Math.random() * 50);
return await this.updateOrCreate(deps, user, sessionId, attributes, retry + 1);
};
this.logger.debug(`updateOrCreate | ${sessionId} | ${retry}`);
try {
return (await this.update(
deps,
user,
sessionId,
attributes
)) as SavedObject<SearchSessionSavedObjectAttributes>;
} catch (e) {
if (SavedObjectsErrorHelpers.isNotFoundError(e)) {
try {
this.logger.debug(`Object not found | ${sessionId}`);
return await this.create(deps, user, sessionId, attributes);
} catch (createError) {
if (
SavedObjectsErrorHelpers.isConflictError(createError) &&
retry < this.sessionConfig.maxUpdateRetries
) {
return await retryOnConflict(createError);
} else {
this.logger.error(createError);
}
}
} else if (
SavedObjectsErrorHelpers.isConflictError(e) &&
retry < this.sessionConfig.maxUpdateRetries
) {
return await retryOnConflict(e);
} else {
this.logger.error(e);
}
}
return undefined;
};
public save = async (
deps: SearchSessionDependencies,
user: AuthenticatedUser | null,
@ -213,7 +110,7 @@ export class SearchSessionService implements ISearchSessionService {
if (!appId) throw new Error('AppId is required');
if (!locatorId) throw new Error('locatorId is required');
return this.updateOrCreate(deps, user, sessionId, {
return this.create(deps, user, sessionId, {
name,
appId,
locatorId,
@ -228,7 +125,7 @@ export class SearchSessionService implements ISearchSessionService {
sessionId: string,
attributes: Partial<SearchSessionSavedObjectAttributes>
) => {
this.logger.debug(`create | ${sessionId}`);
this.logger.debug(`SearchSessionService: create | ${sessionId}`);
const realmType = user?.authentication_realm.type;
const realmName = user?.authentication_realm.name;
@ -310,10 +207,13 @@ export class SearchSessionService implements ISearchSessionService {
return {
...findResponse,
statuses: sessionStatuses.reduce((res, status, index) => {
res[findResponse.saved_objects[index].id] = { status };
return res;
}, {} as Record<string, SearchSessionStatusResponse>),
statuses: sessionStatuses.reduce<Record<string, SearchSessionStatusResponse>>(
(res, status, index) => {
res[findResponse.saved_objects[index].id] = { status };
return res;
},
{}
),
};
};
@ -323,7 +223,7 @@ export class SearchSessionService implements ISearchSessionService {
sessionId: string,
attributes: Partial<SearchSessionSavedObjectAttributes>
) => {
this.logger.debug(`update | ${sessionId}`);
this.logger.debug(`SearchSessionService: update | ${sessionId}`);
if (!this.sessionConfig.enabled) throw new Error('Search sessions are disabled');
await this.get(deps, user, sessionId); // Verify correct user
return deps.savedObjectsClient.update<SearchSessionSavedObjectAttributes>(
@ -331,7 +231,8 @@ export class SearchSessionService implements ISearchSessionService {
sessionId,
{
...attributes,
}
},
{ retryOnConflict: this.sessionConfig.maxUpdateRetries }
);
};
@ -341,7 +242,7 @@ export class SearchSessionService implements ISearchSessionService {
sessionId: string,
expires: Date
) {
this.logger.debug(`extend | ${sessionId}`);
this.logger.debug(`SearchSessionService: extend | ${sessionId}`);
return this.update(deps, user, sessionId, { expires: expires.toISOString() });
}
@ -350,7 +251,7 @@ export class SearchSessionService implements ISearchSessionService {
user: AuthenticatedUser | null,
sessionId: string
) => {
this.logger.debug(`cancel | ${sessionId}`);
this.logger.debug(`SearchSessionService: cancel | ${sessionId}`);
return this.update(deps, user, sessionId, {
isCanceled: true,
});
@ -362,13 +263,25 @@ export class SearchSessionService implements ISearchSessionService {
sessionId: string
) => {
if (!this.sessionConfig.enabled) throw new Error('Search sessions are disabled');
this.logger.debug(`delete | ${sessionId}`);
this.logger.debug(`SearchSessionService: delete | ${sessionId}`);
await this.get(deps, user, sessionId); // Verify correct user
return deps.savedObjectsClient.delete(SEARCH_SESSION_TYPE, sessionId);
};
/**
* Used to batch requests that add searches into the session saved object
* Requests are grouped and executed per sessionId
* @private
*/
private readonly trackIdBatchQueueMap = new Map<
string /* sessionId */,
{ queue: TrackIdQueueEntry[]; scheduleProcessQueue: () => void }
>();
/**
* Tracks the given search request/search ID in the saved session.
* Instead of updating search-session saved object immediately, it debounces and batches updates internally,
* to reduce number of saved object updates and reduce a chance of running over update retries limit
* @internal
*/
public trackId = async (
@ -380,20 +293,63 @@ export class SearchSessionService implements ISearchSessionService {
) => {
const { sessionId, strategy = ENHANCED_ES_SEARCH_STRATEGY } = options;
if (!this.sessionConfig.enabled || !sessionId || !searchId) return;
this.logger.debug(`trackId | ${sessionId} | ${searchId}`);
if (!searchRequest.params) return;
let idMapping: Record<string, SearchSessionRequestInfo> = {};
const requestHash = createRequestHash(searchRequest.params);
if (searchRequest.params) {
const requestHash = createRequestHash(searchRequest.params);
const searchInfo: SearchSessionRequestInfo = {
id: searchId,
strategy,
};
idMapping = { [requestHash]: searchInfo };
this.logger.debug(
`SearchSessionService: trackId | sessionId: "${sessionId}" | searchId:"${searchId}" | requestHash: "${requestHash}"`
);
const searchInfo: SearchSessionRequestInfo = {
id: searchId,
strategy,
};
if (!this.trackIdBatchQueueMap.has(sessionId)) {
this.trackIdBatchQueueMap.set(sessionId, {
queue: [],
scheduleProcessQueue: debounce(
() => {
const queue = this.trackIdBatchQueueMap.get(sessionId)?.queue ?? [];
if (queue.length === 0) return;
this.trackIdBatchQueueMap.delete(sessionId);
const batchedIdMapping = queue.reduce<SearchSessionSavedObjectAttributes['idMapping']>(
(res, next) => {
res[next.requestHash] = next.searchInfo;
return res;
},
{}
);
this.update(queue[0].deps, queue[0].user, sessionId, { idMapping: batchedIdMapping })
.then(() => {
queue.forEach((q) => q.resolve());
})
.catch((e) => {
queue.forEach((q) => q.reject(e));
});
},
DEBOUNCE_TRACK_ID_WAIT,
{ maxWait: DEBOUNCE_TRACK_ID_MAX_WAIT }
),
});
}
await this.scheduleUpdateOrCreate(deps, user, sessionId, { idMapping });
const deferred = defer<void>();
const { queue, scheduleProcessQueue } = this.trackIdBatchQueueMap.get(sessionId)!;
queue.push({
deps,
sessionId,
searchInfo,
requestHash,
resolve: deferred.resolve,
reject: deferred.reject,
user,
});
scheduleProcessQueue();
return deferred.promise;
};
public async getSearchIdMapping(
@ -415,7 +371,7 @@ export class SearchSessionService implements ISearchSessionService {
user: AuthenticatedUser | null,
sessionId: string
): Promise<SearchSessionStatusResponse> {
this.logger.debug(`status | ${sessionId}`);
this.logger.debug(`SearchSessionService: status | ${sessionId}`);
const session = await this.get(deps, user, sessionId);
const sessionStatus = await getSessionStatus(
@ -451,10 +407,15 @@ export class SearchSessionService implements ISearchSessionService {
const session = await this.get(deps, user, sessionId);
const requestHash = createRequestHash(searchRequest.params);
if (!session.attributes.idMapping.hasOwnProperty(requestHash)) {
this.logger.error(`getId | ${sessionId} | ${requestHash} not found`);
this.logger.error(`SearchSessionService: getId | ${sessionId} | ${requestHash} not found`);
this.logger.debug(
`SearchSessionService: getId not found search with params: ${JSON.stringify(
searchRequest.params
)}`
);
throw new NoSearchIdInSessionError();
}
this.logger.debug(`getId | ${sessionId} | ${requestHash}`);
this.logger.debug(`SearchSessionService: getId | ${sessionId} | ${requestHash}`);
return session.attributes.idMapping[requestHash].id;
};