mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
[Search] Session SO polling (#84225)
* Monitor ids * import fix * solve circular dep * eslint * mock circular dep * max retries test * mock circular dep * test * jest <(-:C * jestttttt * [data.search] Move search method inside session service and add tests * merge * Move background session service to data_enhanced plugin * Better logs Save IDs only in monitoring loop * Fix types * Space aware session service * ts * Fix session service saving * merge fix * stable stringify * INMEM_MAX_SESSIONS * INMEM_MAX_SESSIONS * Update x-pack/plugins/data_enhanced/server/search/session/session_service.ts Co-authored-by: Anton Dosov <dosantappdev@gmail.com> * Update x-pack/plugins/data_enhanced/server/search/session/session_service.ts Co-authored-by: Anton Dosov <dosantappdev@gmail.com> * Use setTimeout to schedule monitoring steps * settimeout Co-authored-by: Lukas Olson <olson.lukas@gmail.com> Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Anton Dosov <dosantappdev@gmail.com>
This commit is contained in:
parent
58fc711626
commit
0a04835ba7
10 changed files with 479 additions and 25 deletions
|
@ -19,10 +19,10 @@
|
|||
|
||||
import Chance from 'chance';
|
||||
|
||||
import { getUpgradeableConfigMock } from './get_upgradeable_config.test.mock';
|
||||
import { SavedObjectsErrorHelpers } from '../../saved_objects';
|
||||
import { savedObjectsClientMock } from '../../saved_objects/service/saved_objects_client.mock';
|
||||
import { loggingSystemMock } from '../../logging/logging_system.mock';
|
||||
import { getUpgradeableConfigMock } from './get_upgradeable_config.test.mock';
|
||||
|
||||
import { createOrUpgradeSavedConfig } from './create_or_upgrade_saved_config';
|
||||
|
||||
|
|
|
@ -106,13 +106,15 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
|
|||
private readonly searchSourceService = new SearchSourceService();
|
||||
private defaultSearchStrategyName: string = ES_SEARCH_STRATEGY;
|
||||
private searchStrategies: StrategyMap = {};
|
||||
private sessionService: ISessionService;
|
||||
private coreStart?: CoreStart;
|
||||
private sessionService: ISessionService = new SessionService();
|
||||
|
||||
constructor(
|
||||
private initializerContext: PluginInitializerContext<ConfigSchema>,
|
||||
private readonly logger: Logger
|
||||
) {}
|
||||
) {
|
||||
this.sessionService = new SessionService();
|
||||
}
|
||||
|
||||
public setup(
|
||||
core: CoreSetup<{}, DataPluginStart>,
|
||||
|
|
|
@ -28,6 +28,7 @@ interface SetupDependencies {
|
|||
|
||||
export class EnhancedDataServerPlugin implements Plugin<void, void, SetupDependencies> {
|
||||
private readonly logger: Logger;
|
||||
private sessionService!: BackgroundSessionService;
|
||||
|
||||
constructor(private initializerContext: PluginInitializerContext) {
|
||||
this.logger = initializerContext.logger.get('data_enhanced');
|
||||
|
@ -53,10 +54,12 @@ export class EnhancedDataServerPlugin implements Plugin<void, void, SetupDepende
|
|||
eqlSearchStrategyProvider(this.logger)
|
||||
);
|
||||
|
||||
this.sessionService = new BackgroundSessionService(this.logger);
|
||||
|
||||
deps.data.__enhance({
|
||||
search: {
|
||||
defaultStrategy: ENHANCED_ES_SEARCH_STRATEGY,
|
||||
sessionService: new BackgroundSessionService(),
|
||||
sessionService: this.sessionService,
|
||||
},
|
||||
});
|
||||
|
||||
|
@ -64,9 +67,13 @@ export class EnhancedDataServerPlugin implements Plugin<void, void, SetupDepende
|
|||
registerSessionRoutes(router);
|
||||
}
|
||||
|
||||
public start(core: CoreStart) {}
|
||||
public start(core: CoreStart) {
|
||||
this.sessionService.start(core, this.initializerContext.config.create());
|
||||
}
|
||||
|
||||
public stop() {}
|
||||
public stop() {
|
||||
this.sessionService.stop();
|
||||
}
|
||||
}
|
||||
|
||||
export { EnhancedDataServerPlugin as Plugin };
|
||||
|
|
|
@ -14,6 +14,9 @@ export const backgroundSessionMapping: SavedObjectsType = {
|
|||
hidden: true,
|
||||
mappings: {
|
||||
properties: {
|
||||
sessionId: {
|
||||
type: 'keyword',
|
||||
},
|
||||
name: {
|
||||
type: 'keyword',
|
||||
},
|
||||
|
|
|
@ -4,19 +4,88 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { of } from 'rxjs';
|
||||
import { BehaviorSubject, of } from 'rxjs';
|
||||
import type { SavedObject, SavedObjectsClientContract } from 'kibana/server';
|
||||
import type { SearchStrategyDependencies } from '../../../../../../src/plugins/data/server';
|
||||
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
|
||||
import { BackgroundSessionStatus } from '../../../common';
|
||||
import { BACKGROUND_SESSION_TYPE } from '../../saved_objects';
|
||||
import { BackgroundSessionDependencies, BackgroundSessionService } from './session_service';
|
||||
import {
|
||||
BackgroundSessionDependencies,
|
||||
BackgroundSessionService,
|
||||
INMEM_TRACKING_INTERVAL,
|
||||
MAX_UPDATE_RETRIES,
|
||||
SessionInfo,
|
||||
} from './session_service';
|
||||
import { createRequestHash } from './utils';
|
||||
import moment from 'moment';
|
||||
import { coreMock } from 'src/core/server/mocks';
|
||||
import { ConfigSchema } from '../../../config';
|
||||
|
||||
const flushPromises = () => new Promise((resolve) => setImmediate(resolve));
|
||||
|
||||
describe('BackgroundSessionService', () => {
|
||||
let savedObjectsClient: jest.Mocked<SavedObjectsClientContract>;
|
||||
let service: BackgroundSessionService;
|
||||
|
||||
const MOCK_SESSION_ID = 'session-id-mock';
|
||||
const MOCK_ASYNC_ID = '123456';
|
||||
const MOCK_KEY_HASH = '608de49a4600dbb5b173492759792e4a';
|
||||
|
||||
const createMockInternalSavedObjectClient = (
|
||||
findSpy?: jest.SpyInstance<any>,
|
||||
bulkUpdateSpy?: jest.SpyInstance<any>
|
||||
) => {
|
||||
Object.defineProperty(service, 'internalSavedObjectsClient', {
|
||||
get: () => {
|
||||
const find =
|
||||
findSpy ||
|
||||
(() => {
|
||||
return {
|
||||
saved_objects: [
|
||||
{
|
||||
attributes: {
|
||||
sessionId: MOCK_SESSION_ID,
|
||||
idMapping: {
|
||||
'another-key': 'another-async-id',
|
||||
},
|
||||
},
|
||||
id: MOCK_SESSION_ID,
|
||||
version: '1',
|
||||
},
|
||||
],
|
||||
};
|
||||
});
|
||||
|
||||
const bulkUpdate =
|
||||
bulkUpdateSpy ||
|
||||
(() => {
|
||||
return {
|
||||
saved_objects: [],
|
||||
};
|
||||
});
|
||||
return {
|
||||
find,
|
||||
bulkUpdate,
|
||||
};
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
const createMockIdMapping = (
|
||||
mapValues: any[],
|
||||
insertTime?: moment.Moment,
|
||||
retryCount?: number
|
||||
): Map<string, SessionInfo> => {
|
||||
const fakeMap = new Map();
|
||||
fakeMap.set(MOCK_SESSION_ID, {
|
||||
ids: new Map(mapValues),
|
||||
insertTime: insertTime || moment(),
|
||||
retryCount: retryCount || 0,
|
||||
});
|
||||
return fakeMap;
|
||||
};
|
||||
|
||||
const sessionId = 'd7170a35-7e2c-48d6-8dec-9a056721b489';
|
||||
const mockSavedObject: SavedObject = {
|
||||
id: 'd7170a35-7e2c-48d6-8dec-9a056721b489',
|
||||
|
@ -30,9 +99,14 @@ describe('BackgroundSessionService', () => {
|
|||
references: [],
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
beforeEach(async () => {
|
||||
savedObjectsClient = savedObjectsClientMock.create();
|
||||
service = new BackgroundSessionService();
|
||||
const mockLogger: any = {
|
||||
debug: jest.fn(),
|
||||
warn: jest.fn(),
|
||||
error: jest.fn(),
|
||||
};
|
||||
service = new BackgroundSessionService(mockLogger);
|
||||
});
|
||||
|
||||
it('search throws if `name` is not provided', () => {
|
||||
|
@ -199,6 +273,13 @@ describe('BackgroundSessionService', () => {
|
|||
const created = new Date().toISOString();
|
||||
const expires = new Date().toISOString();
|
||||
|
||||
const mockIdMapping = createMockIdMapping([]);
|
||||
const setSpy = jest.fn();
|
||||
mockIdMapping.set = setSpy;
|
||||
Object.defineProperty(service, 'sessionSearchMap', {
|
||||
get: () => mockIdMapping,
|
||||
});
|
||||
|
||||
await service.trackId(
|
||||
searchRequest,
|
||||
searchId,
|
||||
|
@ -223,12 +304,17 @@ describe('BackgroundSessionService', () => {
|
|||
initialState: {},
|
||||
restoreState: {},
|
||||
status: BackgroundSessionStatus.IN_PROGRESS,
|
||||
idMapping: { [requestHash]: searchId },
|
||||
idMapping: {},
|
||||
appId,
|
||||
urlGeneratorId,
|
||||
sessionId,
|
||||
},
|
||||
{ id: sessionId }
|
||||
);
|
||||
|
||||
const [setSessionId, setParams] = setSpy.mock.calls[0];
|
||||
expect(setParams.ids.get(requestHash)).toBe(searchId);
|
||||
expect(setSessionId).toBe(sessionId);
|
||||
});
|
||||
|
||||
it('updates saved object when `isStored` is `true`', async () => {
|
||||
|
@ -309,4 +395,204 @@ describe('BackgroundSessionService', () => {
|
|||
expect(id).toBe(searchId);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Monitor', () => {
|
||||
beforeEach(async () => {
|
||||
jest.useFakeTimers();
|
||||
const config$ = new BehaviorSubject<ConfigSchema>({
|
||||
search: {
|
||||
sendToBackground: {
|
||||
enabled: true,
|
||||
},
|
||||
},
|
||||
});
|
||||
await service.start(coreMock.createStart(), config$);
|
||||
await flushPromises();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.useRealTimers();
|
||||
service.stop();
|
||||
});
|
||||
|
||||
it('schedules the next iteration', async () => {
|
||||
const findSpy = jest.fn().mockResolvedValue({ saved_objects: [] });
|
||||
createMockInternalSavedObjectClient(findSpy);
|
||||
|
||||
const mockIdMapping = createMockIdMapping([[MOCK_KEY_HASH, MOCK_ASYNC_ID]], moment());
|
||||
|
||||
Object.defineProperty(service, 'sessionSearchMap', {
|
||||
get: () => mockIdMapping,
|
||||
});
|
||||
|
||||
jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL);
|
||||
expect(findSpy).toHaveBeenCalledTimes(1);
|
||||
await flushPromises();
|
||||
|
||||
jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL);
|
||||
expect(findSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('should delete expired IDs', async () => {
|
||||
const findSpy = jest.fn().mockResolvedValueOnce({ saved_objects: [] });
|
||||
createMockInternalSavedObjectClient(findSpy);
|
||||
|
||||
const mockIdMapping = createMockIdMapping(
|
||||
[[MOCK_KEY_HASH, MOCK_ASYNC_ID]],
|
||||
moment().subtract(2, 'm')
|
||||
);
|
||||
|
||||
const deleteSpy = jest.spyOn(mockIdMapping, 'delete');
|
||||
Object.defineProperty(service, 'sessionSearchMap', {
|
||||
get: () => mockIdMapping,
|
||||
});
|
||||
|
||||
// Get setInterval to fire
|
||||
jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL);
|
||||
|
||||
expect(findSpy).not.toHaveBeenCalled();
|
||||
expect(deleteSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should delete IDs that passed max retries', async () => {
|
||||
const findSpy = jest.fn().mockResolvedValueOnce({ saved_objects: [] });
|
||||
createMockInternalSavedObjectClient(findSpy);
|
||||
|
||||
const mockIdMapping = createMockIdMapping(
|
||||
[[MOCK_KEY_HASH, MOCK_ASYNC_ID]],
|
||||
moment(),
|
||||
MAX_UPDATE_RETRIES
|
||||
);
|
||||
|
||||
const deleteSpy = jest.spyOn(mockIdMapping, 'delete');
|
||||
Object.defineProperty(service, 'sessionSearchMap', {
|
||||
get: () => mockIdMapping,
|
||||
});
|
||||
|
||||
// Get setInterval to fire
|
||||
jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL);
|
||||
|
||||
expect(findSpy).not.toHaveBeenCalled();
|
||||
expect(deleteSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should not fetch when no IDs are mapped', async () => {
|
||||
const findSpy = jest.fn().mockResolvedValueOnce({ saved_objects: [] });
|
||||
createMockInternalSavedObjectClient(findSpy);
|
||||
|
||||
jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL);
|
||||
expect(findSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should try to fetch saved objects if some ids are mapped', async () => {
|
||||
const mockIdMapping = createMockIdMapping([[MOCK_KEY_HASH, MOCK_ASYNC_ID]]);
|
||||
Object.defineProperty(service, 'sessionSearchMap', {
|
||||
get: () => mockIdMapping,
|
||||
});
|
||||
|
||||
const findSpy = jest.fn().mockResolvedValueOnce({ saved_objects: [] });
|
||||
const bulkUpdateSpy = jest.fn().mockResolvedValueOnce({ saved_objects: [] });
|
||||
createMockInternalSavedObjectClient(findSpy, bulkUpdateSpy);
|
||||
|
||||
jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL);
|
||||
expect(findSpy).toHaveBeenCalledTimes(1);
|
||||
expect(bulkUpdateSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should update saved objects if they are found, and delete session on success', async () => {
|
||||
const mockIdMapping = createMockIdMapping([[MOCK_KEY_HASH, MOCK_ASYNC_ID]], undefined, 1);
|
||||
const mockMapDeleteSpy = jest.fn();
|
||||
const mockSessionDeleteSpy = jest.fn();
|
||||
mockIdMapping.delete = mockMapDeleteSpy;
|
||||
mockIdMapping.get(MOCK_SESSION_ID)!.ids.delete = mockSessionDeleteSpy;
|
||||
Object.defineProperty(service, 'sessionSearchMap', {
|
||||
get: () => mockIdMapping,
|
||||
});
|
||||
|
||||
const findSpy = jest.fn().mockResolvedValueOnce({
|
||||
saved_objects: [
|
||||
{
|
||||
id: MOCK_SESSION_ID,
|
||||
attributes: {
|
||||
idMapping: {
|
||||
b: 'c',
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
const bulkUpdateSpy = jest.fn().mockResolvedValueOnce({
|
||||
saved_objects: [
|
||||
{
|
||||
id: MOCK_SESSION_ID,
|
||||
attributes: {
|
||||
idMapping: {
|
||||
b: 'c',
|
||||
[MOCK_KEY_HASH]: MOCK_ASYNC_ID,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
createMockInternalSavedObjectClient(findSpy, bulkUpdateSpy);
|
||||
|
||||
jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL);
|
||||
|
||||
// Release timers to call check after test actions are done.
|
||||
jest.useRealTimers();
|
||||
await new Promise((r) => setTimeout(r, 15));
|
||||
|
||||
expect(findSpy).toHaveBeenCalledTimes(1);
|
||||
expect(bulkUpdateSpy).toHaveBeenCalledTimes(1);
|
||||
expect(mockSessionDeleteSpy).toHaveBeenCalledTimes(2);
|
||||
expect(mockSessionDeleteSpy).toBeCalledWith('b');
|
||||
expect(mockSessionDeleteSpy).toBeCalledWith(MOCK_KEY_HASH);
|
||||
expect(mockMapDeleteSpy).toBeCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should update saved objects if they are found, and increase retryCount on error', async () => {
|
||||
const mockIdMapping = createMockIdMapping([[MOCK_KEY_HASH, MOCK_ASYNC_ID]]);
|
||||
const mockMapDeleteSpy = jest.fn();
|
||||
const mockSessionDeleteSpy = jest.fn();
|
||||
mockIdMapping.delete = mockMapDeleteSpy;
|
||||
mockIdMapping.get(MOCK_SESSION_ID)!.ids.delete = mockSessionDeleteSpy;
|
||||
Object.defineProperty(service, 'sessionSearchMap', {
|
||||
get: () => mockIdMapping,
|
||||
});
|
||||
|
||||
const findSpy = jest.fn().mockResolvedValueOnce({
|
||||
saved_objects: [
|
||||
{
|
||||
id: MOCK_SESSION_ID,
|
||||
attributes: {
|
||||
idMapping: {
|
||||
b: 'c',
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
const bulkUpdateSpy = jest.fn().mockResolvedValueOnce({
|
||||
saved_objects: [
|
||||
{
|
||||
id: MOCK_SESSION_ID,
|
||||
error: 'not ok',
|
||||
},
|
||||
],
|
||||
});
|
||||
createMockInternalSavedObjectClient(findSpy, bulkUpdateSpy);
|
||||
|
||||
jest.advanceTimersByTime(INMEM_TRACKING_INTERVAL);
|
||||
|
||||
// Release timers to call check after test actions are done.
|
||||
jest.useRealTimers();
|
||||
await new Promise((r) => setTimeout(r, 15));
|
||||
|
||||
expect(findSpy).toHaveBeenCalledTimes(1);
|
||||
expect(bulkUpdateSpy).toHaveBeenCalledTimes(1);
|
||||
expect(mockSessionDeleteSpy).not.toHaveBeenCalled();
|
||||
expect(mockMapDeleteSpy).not.toHaveBeenCalled();
|
||||
expect(mockIdMapping.get(MOCK_SESSION_ID)!.retryCount).toBe(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -4,9 +4,17 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { CoreStart, KibanaRequest, SavedObjectsClientContract } from 'kibana/server';
|
||||
import moment, { Moment } from 'moment';
|
||||
import { from, Observable } from 'rxjs';
|
||||
import { switchMap } from 'rxjs/operators';
|
||||
import { first, switchMap } from 'rxjs/operators';
|
||||
import {
|
||||
CoreStart,
|
||||
KibanaRequest,
|
||||
SavedObjectsClient,
|
||||
SavedObjectsClientContract,
|
||||
Logger,
|
||||
SavedObject,
|
||||
} from '../../../../../../src/core/server';
|
||||
import {
|
||||
IKibanaSearchRequest,
|
||||
IKibanaSearchResponse,
|
||||
|
@ -25,21 +33,154 @@ import {
|
|||
} from '../../../common';
|
||||
import { BACKGROUND_SESSION_TYPE } from '../../saved_objects';
|
||||
import { createRequestHash } from './utils';
|
||||
import { ConfigSchema } from '../../../config';
|
||||
|
||||
const INMEM_MAX_SESSIONS = 10000;
|
||||
const DEFAULT_EXPIRATION = 7 * 24 * 60 * 60 * 1000;
|
||||
export const INMEM_TRACKING_INTERVAL = 10 * 1000;
|
||||
export const INMEM_TRACKING_TIMEOUT_SEC = 60;
|
||||
export const MAX_UPDATE_RETRIES = 3;
|
||||
|
||||
export interface BackgroundSessionDependencies {
|
||||
savedObjectsClient: SavedObjectsClientContract;
|
||||
}
|
||||
|
||||
export interface SessionInfo {
|
||||
insertTime: Moment;
|
||||
retryCount: number;
|
||||
ids: Map<string, string>;
|
||||
}
|
||||
|
||||
export class BackgroundSessionService implements ISessionService {
|
||||
/**
|
||||
* Map of sessionId to { [requestHash]: searchId }
|
||||
* @private
|
||||
*/
|
||||
private sessionSearchMap = new Map<string, Map<string, string>>();
|
||||
private sessionSearchMap = new Map<string, SessionInfo>();
|
||||
private internalSavedObjectsClient!: SavedObjectsClientContract;
|
||||
private monitorTimer!: NodeJS.Timeout;
|
||||
|
||||
constructor() {}
|
||||
constructor(private readonly logger: Logger) {}
|
||||
|
||||
public async start(core: CoreStart, config$: Observable<ConfigSchema>) {
|
||||
return this.setupMonitoring(core, config$);
|
||||
}
|
||||
|
||||
public stop() {
|
||||
this.sessionSearchMap.clear();
|
||||
clearTimeout(this.monitorTimer);
|
||||
}
|
||||
|
||||
private setupMonitoring = async (core: CoreStart, config$: Observable<ConfigSchema>) => {
|
||||
const config = await config$.pipe(first()).toPromise();
|
||||
if (config.search.sendToBackground.enabled) {
|
||||
this.logger.debug(`setupMonitoring | Enabling monitoring`);
|
||||
const internalRepo = core.savedObjects.createInternalRepository([BACKGROUND_SESSION_TYPE]);
|
||||
this.internalSavedObjectsClient = new SavedObjectsClient(internalRepo);
|
||||
this.monitorMappedIds();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Gets all {@link SessionSavedObjectAttributes | Background Searches} that
|
||||
* currently being tracked by the service.
|
||||
*
|
||||
* @remarks
|
||||
* Uses `internalSavedObjectsClient` as this is called asynchronously, not within the
|
||||
* context of a user's session.
|
||||
*/
|
||||
private async getAllMappedSavedObjects() {
|
||||
const activeMappingIds = Array.from(this.sessionSearchMap.keys())
|
||||
.map((sessionId) => `"${sessionId}"`)
|
||||
.join(' | ');
|
||||
const res = await this.internalSavedObjectsClient.find<BackgroundSessionSavedObjectAttributes>({
|
||||
perPage: INMEM_MAX_SESSIONS, // If there are more sessions in memory, they will be synced when some items are cleared out.
|
||||
type: BACKGROUND_SESSION_TYPE,
|
||||
search: activeMappingIds,
|
||||
searchFields: ['sessionId'],
|
||||
namespaces: ['*'],
|
||||
});
|
||||
this.logger.debug(`getAllMappedSavedObjects | Got ${res.saved_objects.length} items`);
|
||||
return res.saved_objects;
|
||||
}
|
||||
|
||||
private clearSessions = () => {
|
||||
const curTime = moment();
|
||||
|
||||
this.sessionSearchMap.forEach((sessionInfo, sessionId) => {
|
||||
if (
|
||||
moment.duration(curTime.diff(sessionInfo.insertTime)).asSeconds() >
|
||||
INMEM_TRACKING_TIMEOUT_SEC
|
||||
) {
|
||||
this.logger.debug(`clearSessions | Deleting expired session ${sessionId}`);
|
||||
this.sessionSearchMap.delete(sessionId);
|
||||
} else if (sessionInfo.retryCount >= MAX_UPDATE_RETRIES) {
|
||||
this.logger.warn(`clearSessions | Deleting failed session ${sessionId}`);
|
||||
this.sessionSearchMap.delete(sessionId);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
private async monitorMappedIds() {
|
||||
this.monitorTimer = setTimeout(async () => {
|
||||
try {
|
||||
this.clearSessions();
|
||||
|
||||
if (!this.sessionSearchMap.size) return;
|
||||
this.logger.debug(`monitorMappedIds | Map contains ${this.sessionSearchMap.size} items`);
|
||||
|
||||
const savedSessions = await this.getAllMappedSavedObjects();
|
||||
const updatedSessions = await this.updateAllSavedObjects(savedSessions);
|
||||
|
||||
updatedSessions.forEach((updatedSavedObject) => {
|
||||
const sessionInfo = this.sessionSearchMap.get(updatedSavedObject.id)!;
|
||||
if (updatedSavedObject.error) {
|
||||
// Retry next time
|
||||
sessionInfo.retryCount++;
|
||||
} else if (updatedSavedObject.attributes.idMapping) {
|
||||
// Delete the ids that we just saved, avoiding a potential new ids being lost.
|
||||
Object.keys(updatedSavedObject.attributes.idMapping).forEach((key) => {
|
||||
sessionInfo.ids.delete(key);
|
||||
});
|
||||
// If the session object is empty, delete it as well
|
||||
if (!sessionInfo.ids.entries.length) {
|
||||
this.sessionSearchMap.delete(updatedSavedObject.id);
|
||||
} else {
|
||||
sessionInfo.retryCount = 0;
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (e) {
|
||||
this.logger.error(`monitorMappedIds | Error while updating sessions. ${e}`);
|
||||
} finally {
|
||||
this.monitorMappedIds();
|
||||
}
|
||||
}, INMEM_TRACKING_INTERVAL);
|
||||
}
|
||||
|
||||
private async updateAllSavedObjects(
|
||||
activeMappingObjects: Array<SavedObject<BackgroundSessionSavedObjectAttributes>>
|
||||
) {
|
||||
if (!activeMappingObjects.length) return [];
|
||||
|
||||
this.logger.debug(`updateAllSavedObjects | Updating ${activeMappingObjects.length} items`);
|
||||
const updatedSessions = activeMappingObjects
|
||||
.filter((so) => !so.error)
|
||||
.map((sessionSavedObject) => {
|
||||
const sessionInfo = this.sessionSearchMap.get(sessionSavedObject.id);
|
||||
const idMapping = sessionInfo ? Object.fromEntries(sessionInfo.ids.entries()) : {};
|
||||
sessionSavedObject.attributes.idMapping = {
|
||||
...sessionSavedObject.attributes.idMapping,
|
||||
...idMapping,
|
||||
};
|
||||
return sessionSavedObject;
|
||||
});
|
||||
|
||||
const updateResults = await this.internalSavedObjectsClient.bulkUpdate<BackgroundSessionSavedObjectAttributes>(
|
||||
updatedSessions
|
||||
);
|
||||
return updateResults.saved_objects;
|
||||
}
|
||||
|
||||
public search<Request extends IKibanaSearchRequest, Response extends IKibanaSearchResponse>(
|
||||
strategy: ISearchStrategy<Request, Response>,
|
||||
|
@ -85,9 +226,8 @@ export class BackgroundSessionService implements ISessionService {
|
|||
if (!appId) throw new Error('AppId is required');
|
||||
if (!urlGeneratorId) throw new Error('UrlGeneratorId is required');
|
||||
|
||||
// Get the mapping of request hash/search ID for this session
|
||||
const searchMap = this.sessionSearchMap.get(sessionId) ?? new Map<string, string>();
|
||||
const idMapping = Object.fromEntries(searchMap.entries());
|
||||
this.logger.debug(`save | ${sessionId}`);
|
||||
|
||||
const attributes = {
|
||||
name,
|
||||
created,
|
||||
|
@ -95,9 +235,10 @@ export class BackgroundSessionService implements ISessionService {
|
|||
status,
|
||||
initialState,
|
||||
restoreState,
|
||||
idMapping,
|
||||
idMapping: {},
|
||||
urlGeneratorId,
|
||||
appId,
|
||||
sessionId,
|
||||
};
|
||||
const session = await savedObjectsClient.create<BackgroundSessionSavedObjectAttributes>(
|
||||
BACKGROUND_SESSION_TYPE,
|
||||
|
@ -105,14 +246,12 @@ export class BackgroundSessionService implements ISessionService {
|
|||
{ id: sessionId }
|
||||
);
|
||||
|
||||
// Clear out the entries for this session ID so they don't get saved next time
|
||||
this.sessionSearchMap.delete(sessionId);
|
||||
|
||||
return session;
|
||||
};
|
||||
|
||||
// TODO: Throw an error if this session doesn't belong to this user
|
||||
public get = (sessionId: string, { savedObjectsClient }: BackgroundSessionDependencies) => {
|
||||
this.logger.debug(`get | ${sessionId}`);
|
||||
return savedObjectsClient.get<BackgroundSessionSavedObjectAttributes>(
|
||||
BACKGROUND_SESSION_TYPE,
|
||||
sessionId
|
||||
|
@ -136,6 +275,7 @@ export class BackgroundSessionService implements ISessionService {
|
|||
attributes: Partial<BackgroundSessionSavedObjectAttributes>,
|
||||
{ savedObjectsClient }: BackgroundSessionDependencies
|
||||
) => {
|
||||
this.logger.debug(`update | ${sessionId}`);
|
||||
return savedObjectsClient.update<BackgroundSessionSavedObjectAttributes>(
|
||||
BACKGROUND_SESSION_TYPE,
|
||||
sessionId,
|
||||
|
@ -160,6 +300,7 @@ export class BackgroundSessionService implements ISessionService {
|
|||
deps: BackgroundSessionDependencies
|
||||
) => {
|
||||
if (!sessionId || !searchId) return;
|
||||
this.logger.debug(`trackId | ${sessionId} | ${searchId}`);
|
||||
const requestHash = createRequestHash(searchRequest.params);
|
||||
|
||||
// If there is already a saved object for this session, update it to include this request/ID.
|
||||
|
@ -168,8 +309,12 @@ export class BackgroundSessionService implements ISessionService {
|
|||
const attributes = { idMapping: { [requestHash]: searchId } };
|
||||
await this.update(sessionId, attributes, deps);
|
||||
} else {
|
||||
const map = this.sessionSearchMap.get(sessionId) ?? new Map<string, string>();
|
||||
map.set(requestHash, searchId);
|
||||
const map = this.sessionSearchMap.get(sessionId) ?? {
|
||||
insertTime: moment(),
|
||||
retryCount: 0,
|
||||
ids: new Map<string, string>(),
|
||||
};
|
||||
map.ids.set(requestHash, searchId);
|
||||
this.sessionSearchMap.set(sessionId, map);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
|
||||
import { createHash } from 'crypto';
|
||||
import stringify from 'json-stable-stringify';
|
||||
|
||||
/**
|
||||
* Generate the hash for this request so that, in the future, this hash can be used to look up
|
||||
|
@ -13,5 +14,5 @@ import { createHash } from 'crypto';
|
|||
*/
|
||||
export function createRequestHash(keys: Record<any, any>) {
|
||||
const { preference, ...params } = keys;
|
||||
return createHash(`sha256`).update(JSON.stringify(params)).digest('hex');
|
||||
return createHash(`sha256`).update(stringify(params)).digest('hex');
|
||||
}
|
||||
|
|
|
@ -18,6 +18,9 @@ import {
|
|||
} from 'src/core/server/mocks';
|
||||
import { copySavedObjectsToSpacesFactory } from './copy_to_spaces';
|
||||
|
||||
// Mock out circular dependency
|
||||
jest.mock('../../../../../../src/core/server/saved_objects/es_query', () => {});
|
||||
|
||||
jest.mock('../../../../../../src/core/server', () => {
|
||||
return {
|
||||
...(jest.requireActual('../../../../../../src/core/server') as Record<string, unknown>),
|
||||
|
|
|
@ -18,6 +18,9 @@ import {
|
|||
} from 'src/core/server/mocks';
|
||||
import { resolveCopySavedObjectsToSpacesConflictsFactory } from './resolve_copy_conflicts';
|
||||
|
||||
// Mock out circular dependency
|
||||
jest.mock('../../../../../../src/core/server/saved_objects/es_query', () => {});
|
||||
|
||||
jest.mock('../../../../../../src/core/server', () => {
|
||||
return {
|
||||
...(jest.requireActual('../../../../../../src/core/server') as Record<string, unknown>),
|
||||
|
|
|
@ -27,6 +27,10 @@ import { usageStatsServiceMock } from '../../../usage_stats/usage_stats_service.
|
|||
import { initCopyToSpacesApi } from './copy_to_space';
|
||||
import { spacesConfig } from '../../../lib/__fixtures__';
|
||||
import { ObjectType } from '@kbn/config-schema';
|
||||
|
||||
// Mock out circular dependency
|
||||
jest.mock('../../../../../../../src/core/server/saved_objects/es_query', () => {});
|
||||
|
||||
jest.mock('../../../../../../../src/core/server', () => {
|
||||
return {
|
||||
...(jest.requireActual('../../../../../../../src/core/server') as Record<string, unknown>),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue