[Search Sessions] Monitoring hardening part 1 (#96196)

* Decrease default pageSize to 100
Set default strategy
Don't create sessions when disabled
Clear monitoring task when disabled
Use concatMap to serialize session checkup

* ts

* ts

* ts

* Update x-pack/plugins/data_enhanced/server/search/session/session_service.ts

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>

* Search sessions are disabled

* Clear task on server start

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>
This commit is contained in:
Liza Katz 2021-04-07 15:36:55 +03:00 committed by GitHub
parent 2efea06392
commit 7584b728c6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 1044 additions and 919 deletions

View file

@ -18,7 +18,7 @@ export const configSchema = schema.object({
* pageSize controls how many search session objects we load at once while monitoring
* session completion
*/
pageSize: schema.number({ defaultValue: 10000 }),
pageSize: schema.number({ defaultValue: 100 }),
/**
* trackingInterval controls how often we track search session objects progress
*/

View file

@ -6,13 +6,7 @@
*/
import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from 'kibana/server';
import { TaskManagerSetupContract, TaskManagerStartContract } from '../../task_manager/server';
import {
PluginSetup as DataPluginSetup,
PluginStart as DataPluginStart,
usageProvider,
} from '../../../../src/plugins/data/server';
import { UsageCollectionSetup } from '../../../../src/plugins/usage_collection/server';
import { usageProvider } from '../../../../src/plugins/data/server';
import { ENHANCED_ES_SEARCH_STRATEGY, EQL_SEARCH_STRATEGY } from '../common';
import { registerSessionRoutes } from './routes';
import { searchSessionSavedObjectType } from './saved_objects';
@ -22,22 +16,13 @@ import {
eqlSearchStrategyProvider,
} from './search';
import { getUiSettings } from './ui_settings';
import type { DataEnhancedRequestHandlerContext } from './type';
import type {
DataEnhancedRequestHandlerContext,
DataEnhancedSetupDependencies as SetupDependencies,
DataEnhancedStartDependencies as StartDependencies,
} from './type';
import { ConfigSchema } from '../config';
import { registerUsageCollector } from './collectors';
import { SecurityPluginSetup } from '../../security/server';
interface SetupDependencies {
data: DataPluginSetup;
usageCollection?: UsageCollectionSetup;
taskManager: TaskManagerSetupContract;
security?: SecurityPluginSetup;
}
export interface StartDependencies {
data: DataPluginStart;
taskManager: TaskManagerStartContract;
}
export class EnhancedDataServerPlugin
implements Plugin<void, void, SetupDependencies, StartDependencies> {
@ -50,7 +35,7 @@ export class EnhancedDataServerPlugin
this.config = this.initializerContext.config.get<ConfigSchema>();
}
public setup(core: CoreSetup<DataPluginStart>, deps: SetupDependencies) {
public setup(core: CoreSetup<StartDependencies>, deps: SetupDependencies) {
const usage = deps.usageCollection ? usageProvider(core) : undefined;
core.uiSettings.register(getUiSettings());

View file

@ -14,7 +14,7 @@ import {
} from 'kibana/server';
import moment from 'moment';
import { EMPTY, from } from 'rxjs';
import { expand, mergeMap } from 'rxjs/operators';
import { expand, concatMap } from 'rxjs/operators';
import { nodeBuilder } from '../../../../../../src/plugins/data/common';
import {
ENHANCED_ES_SEARCH_STRATEGY,
@ -154,7 +154,7 @@ export async function checkRunningSessions(
try {
await getAllSavedSearchSessions$(deps, config)
.pipe(
mergeMap(async (runningSearchSessionsResponse) => {
concatMap(async (runningSearchSessionsResponse) => {
if (!runningSearchSessionsResponse.total) return;
logger.debug(`Found ${runningSearchSessionsResponse.total} running sessions`);

View file

@ -15,6 +15,7 @@ import { checkRunningSessions } from './check_running_sessions';
import { CoreSetup, SavedObjectsClient, Logger } from '../../../../../../src/core/server';
import { ConfigSchema } from '../../../config';
import { SEARCH_SESSION_TYPE } from '../../../common';
import { DataEnhancedStartDependencies } from '../../type';
export const SEARCH_SESSIONS_TASK_TYPE = 'search_sessions_monitor';
export const SEARCH_SESSIONS_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_TASK_TYPE}`;
@ -25,12 +26,19 @@ interface SearchSessionTaskDeps {
config: ConfigSchema;
}
function searchSessionRunner(core: CoreSetup, { logger, config }: SearchSessionTaskDeps) {
function searchSessionRunner(
core: CoreSetup<DataEnhancedStartDependencies>,
{ logger, config }: SearchSessionTaskDeps
) {
return ({ taskInstance }: RunContext) => {
return {
async run() {
const sessionConfig = config.search.sessions;
const [coreStart] = await core.getStartServices();
if (!sessionConfig.enabled) {
logger.debug('Search sessions are disabled. Skipping task.');
return;
}
const internalRepo = coreStart.savedObjects.createInternalRepository([SEARCH_SESSION_TYPE]);
const internalSavedObjectsClient = new SavedObjectsClient(internalRepo);
await checkRunningSessions(
@ -50,7 +58,10 @@ function searchSessionRunner(core: CoreSetup, { logger, config }: SearchSessionT
};
}
export function registerSearchSessionsTask(core: CoreSetup, deps: SearchSessionTaskDeps) {
export function registerSearchSessionsTask(
core: CoreSetup<DataEnhancedStartDependencies>,
deps: SearchSessionTaskDeps
) {
deps.taskManager.registerTaskDefinitions({
[SEARCH_SESSIONS_TASK_TYPE]: {
title: 'Search Sessions Monitor',
@ -59,6 +70,18 @@ export function registerSearchSessionsTask(core: CoreSetup, deps: SearchSessionT
});
}
export async function unscheduleSearchSessionsTask(
taskManager: TaskManagerStartContract,
logger: Logger
) {
try {
await taskManager.removeIfExists(SEARCH_SESSIONS_TASK_ID);
logger.debug(`Search sessions cleared`);
} catch (e) {
logger.error(`Error clearing task, received ${e.message}`);
}
}
export async function scheduleSearchSessionsTasks(
taskManager: TaskManagerStartContract,
logger: Logger,
@ -79,6 +102,6 @@ export async function scheduleSearchSessionsTasks(
logger.debug(`Search sessions task, scheduled to run`);
} catch (e) {
logger.debug(`Error scheduling task, received ${e.message}`);
logger.error(`Error scheduling task, received ${e.message}`);
}
}

View file

@ -29,6 +29,7 @@ import {
TaskManagerStartContract,
} from '../../../../task_manager/server';
import {
ENHANCED_ES_SEARCH_STRATEGY,
SearchSessionRequestInfo,
SearchSessionSavedObjectAttributes,
SearchSessionStatus,
@ -36,8 +37,13 @@ import {
} from '../../../common';
import { createRequestHash } from './utils';
import { ConfigSchema } from '../../../config';
import { registerSearchSessionsTask, scheduleSearchSessionsTasks } from './monitoring_task';
import {
registerSearchSessionsTask,
scheduleSearchSessionsTasks,
unscheduleSearchSessionsTask,
} from './monitoring_task';
import { SearchSessionsConfig, SearchStatus } from './types';
import { DataEnhancedStartDependencies } from '../../type';
export interface SearchSessionDependencies {
savedObjectsClient: SavedObjectsClientContract;
@ -78,7 +84,7 @@ export class SearchSessionService
this.sessionConfig = this.config.search.sessions;
}
public setup(core: CoreSetup, deps: SetupDependencies) {
public setup(core: CoreSetup<DataEnhancedStartDependencies>, deps: SetupDependencies) {
registerSearchSessionsTask(core, {
config: this.config,
taskManager: deps.taskManager,
@ -99,6 +105,8 @@ export class SearchSessionService
this.logger,
this.sessionConfig.trackingInterval
);
} else {
unscheduleSearchSessionsTask(deps.taskManager, this.logger);
}
};
@ -217,6 +225,7 @@ export class SearchSessionService
restoreState = {},
}: Partial<SearchSessionSavedObjectAttributes>
) => {
if (!this.sessionConfig.enabled) throw new Error('Search sessions are disabled');
if (!name) throw new Error('Name is required');
if (!appId) throw new Error('AppId is required');
if (!urlGeneratorId) throw new Error('UrlGeneratorId is required');
@ -316,6 +325,7 @@ export class SearchSessionService
attributes: Partial<SearchSessionSavedObjectAttributes>
) => {
this.logger.debug(`update | ${sessionId}`);
if (!this.sessionConfig.enabled) throw new Error('Search sessions are disabled');
await this.get(deps, user, sessionId); // Verify correct user
return deps.savedObjectsClient.update<SearchSessionSavedObjectAttributes>(
SEARCH_SESSION_TYPE,
@ -353,6 +363,7 @@ export class SearchSessionService
user: AuthenticatedUser | null,
sessionId: string
) => {
if (!this.sessionConfig.enabled) throw new Error('Search sessions are disabled');
this.logger.debug(`delete | ${sessionId}`);
await this.get(deps, user, sessionId); // Verify correct user
return deps.savedObjectsClient.delete(SEARCH_SESSION_TYPE, sessionId);
@ -367,9 +378,9 @@ export class SearchSessionService
user: AuthenticatedUser | null,
searchRequest: IKibanaSearchRequest,
searchId: string,
{ sessionId, strategy }: ISearchOptions
{ sessionId, strategy = ENHANCED_ES_SEARCH_STRATEGY }: ISearchOptions
) => {
if (!sessionId || !searchId) return;
if (!this.sessionConfig.enabled || !sessionId || !searchId) return;
this.logger.debug(`trackId | ${sessionId} | ${searchId}`);
let idMapping: Record<string, SearchSessionRequestInfo> = {};
@ -378,7 +389,7 @@ export class SearchSessionService
const requestHash = createRequestHash(searchRequest.params);
const searchInfo = {
id: searchId,
strategy: strategy!,
strategy,
status: SearchStatus.IN_PROGRESS,
};
idMapping = { [requestHash]: searchInfo };
@ -411,7 +422,9 @@ export class SearchSessionService
searchRequest: IKibanaSearchRequest,
{ sessionId, isStored, isRestore }: ISearchOptions
) => {
if (!sessionId) {
if (!this.sessionConfig.enabled) {
throw new Error('Search sessions are disabled');
} else if (!sessionId) {
throw new Error('Session ID is required');
} else if (!isStored) {
throw new Error('Cannot get search ID from a session that is not stored');

View file

@ -7,6 +7,13 @@
import type { IRouter } from 'kibana/server';
import type { DataRequestHandlerContext } from '../../../../src/plugins/data/server';
import { TaskManagerSetupContract, TaskManagerStartContract } from '../../task_manager/server';
import {
PluginSetup as DataPluginSetup,
PluginStart as DataPluginStart,
} from '../../../../src/plugins/data/server';
import { SecurityPluginSetup } from '../../security/server';
import { UsageCollectionSetup } from '../../../../src/plugins/usage_collection/server';
/**
* @internal
@ -17,3 +24,15 @@ export type DataEnhancedRequestHandlerContext = DataRequestHandlerContext;
* @internal
*/
export type DataEnhancedPluginRouter = IRouter<DataRequestHandlerContext>;
export interface DataEnhancedSetupDependencies {
data: DataPluginSetup;
usageCollection?: UsageCollectionSetup;
taskManager: TaskManagerSetupContract;
security?: SecurityPluginSetup;
}
export interface DataEnhancedStartDependencies {
data: DataPluginStart;
taskManager: TaskManagerStartContract;
}