mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[Search Sessions] Split tasks (#99967)
* cancel the previous session * split to 3 tasks * fixes * cancellation * updated tests * split out and improve jest tests * cleanup previous session properly * don't fail delete and cancel if item was already cleaned up * test * test * ignore resource_not_found_exception when deleting an already cleared \ expired async search * jest * update jest * api int * fix jest * testssss * Code review @dosant * types * remove any * Fix merge * type * test * jest Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
aee0585bc5
commit
01a486000e
20 changed files with 1446 additions and 651 deletions
|
@ -44,10 +44,20 @@ export const searchSessionsConfigSchema = schema.object({
|
|||
*/
|
||||
pageSize: schema.number({ defaultValue: 100 }),
|
||||
/**
|
||||
* trackingInterval controls how often we track search session objects progress
|
||||
* trackingInterval controls how often we track persisted search session objects progress
|
||||
*/
|
||||
trackingInterval: schema.duration({ defaultValue: '10s' }),
|
||||
|
||||
/**
|
||||
* cleanupInterval controls how often we track non-persisted search session objects for cleanup
|
||||
*/
|
||||
cleanupInterval: schema.duration({ defaultValue: '60s' }),
|
||||
|
||||
/**
|
||||
* expireInterval controls how often we track persisted search session objects for expiration
|
||||
*/
|
||||
expireInterval: schema.duration({ defaultValue: '60m' }),
|
||||
|
||||
/**
|
||||
* monitoringTaskTimeout controls for how long task manager waits for search session monitoring task to complete before considering it timed out,
|
||||
* If tasks timeouts it receives cancel signal and next task starts in "trackingInterval" time
|
||||
|
|
|
@ -98,6 +98,14 @@ describe('Session service', () => {
|
|||
expect(nowProvider.reset).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("Can clear other apps' session", async () => {
|
||||
sessionService.start();
|
||||
expect(sessionService.getSessionId()).not.toBeUndefined();
|
||||
currentAppId$.next('change');
|
||||
sessionService.clear();
|
||||
expect(sessionService.getSessionId()).toBeUndefined();
|
||||
});
|
||||
|
||||
it("Can start a new session in case there is other apps' stale session", async () => {
|
||||
const s1 = sessionService.start();
|
||||
expect(sessionService.getSessionId()).not.toBeUndefined();
|
||||
|
|
|
@ -5,10 +5,7 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import {
|
||||
checkRunningSessions as checkRunningSessions$,
|
||||
CheckRunningSessionsDeps,
|
||||
} from './check_running_sessions';
|
||||
import { checkNonPersistedSessions as checkNonPersistedSessions$ } from './check_non_persiseted_sessions';
|
||||
import {
|
||||
SearchSessionStatus,
|
||||
SearchSessionSavedObjectAttributes,
|
||||
|
@ -16,22 +13,20 @@ import {
|
|||
EQL_SEARCH_STRATEGY,
|
||||
} from '../../../../../../src/plugins/data/common';
|
||||
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
|
||||
import { SearchSessionsConfig, SearchStatus } from './types';
|
||||
import { SearchSessionsConfig, CheckSearchSessionsDeps, SearchStatus } from './types';
|
||||
import moment from 'moment';
|
||||
import {
|
||||
SavedObjectsBulkUpdateObject,
|
||||
SavedObjectsDeleteOptions,
|
||||
SavedObjectsClientContract,
|
||||
} from '../../../../../../src/core/server';
|
||||
import { Subject } from 'rxjs';
|
||||
import { takeUntil } from 'rxjs/operators';
|
||||
|
||||
jest.useFakeTimers();
|
||||
|
||||
const checkRunningSessions = (deps: CheckRunningSessionsDeps, config: SearchSessionsConfig) =>
|
||||
checkRunningSessions$(deps, config).toPromise();
|
||||
const checkNonPersistedSessions = (deps: CheckSearchSessionsDeps, config: SearchSessionsConfig) =>
|
||||
checkNonPersistedSessions$(deps, config).toPromise();
|
||||
|
||||
describe('getSearchStatus', () => {
|
||||
describe('checkNonPersistedSessions', () => {
|
||||
let mockClient: any;
|
||||
let savedObjectsClient: jest.Mocked<SavedObjectsClientContract>;
|
||||
const config: SearchSessionsConfig = {
|
||||
|
@ -42,7 +37,9 @@ describe('getSearchStatus', () => {
|
|||
maxUpdateRetries: 3,
|
||||
defaultExpiration: moment.duration(7, 'd'),
|
||||
trackingInterval: moment.duration(10, 's'),
|
||||
expireInterval: moment.duration(10, 'm'),
|
||||
monitoringTaskTimeout: moment.duration(5, 'm'),
|
||||
cleanupInterval: moment.duration(10, 's'),
|
||||
management: {} as any,
|
||||
};
|
||||
const mockLogger: any = {
|
||||
|
@ -51,16 +48,6 @@ describe('getSearchStatus', () => {
|
|||
error: jest.fn(),
|
||||
};
|
||||
|
||||
const emptySO = {
|
||||
attributes: {
|
||||
persisted: false,
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
created: moment().subtract(moment.duration(3, 'm')),
|
||||
touched: moment().subtract(moment.duration(10, 's')),
|
||||
idMapping: {},
|
||||
},
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
savedObjectsClient = savedObjectsClientMock.create();
|
||||
mockClient = {
|
||||
|
@ -81,7 +68,7 @@ describe('getSearchStatus', () => {
|
|||
total: 0,
|
||||
} as any);
|
||||
|
||||
await checkRunningSessions(
|
||||
await checkNonPersistedSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
|
@ -94,240 +81,7 @@ describe('getSearchStatus', () => {
|
|||
expect(savedObjectsClient.delete).not.toBeCalled();
|
||||
});
|
||||
|
||||
describe('pagination', () => {
|
||||
test('fetches one page if not objects exist', async () => {
|
||||
savedObjectsClient.find.mockResolvedValueOnce({
|
||||
saved_objects: [],
|
||||
total: 0,
|
||||
} as any);
|
||||
|
||||
await checkRunningSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config
|
||||
);
|
||||
|
||||
expect(savedObjectsClient.find).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
test('fetches one page if less than page size object are returned', async () => {
|
||||
savedObjectsClient.find.mockResolvedValueOnce({
|
||||
saved_objects: [emptySO, emptySO],
|
||||
total: 5,
|
||||
} as any);
|
||||
|
||||
await checkRunningSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config
|
||||
);
|
||||
|
||||
expect(savedObjectsClient.find).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
test('fetches two pages if exactly page size objects are returned', async () => {
|
||||
let i = 0;
|
||||
savedObjectsClient.find.mockImplementation(() => {
|
||||
return new Promise((resolve) => {
|
||||
resolve({
|
||||
saved_objects: i++ === 0 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [],
|
||||
total: 5,
|
||||
page: i,
|
||||
} as any);
|
||||
});
|
||||
});
|
||||
|
||||
await checkRunningSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config
|
||||
);
|
||||
|
||||
expect(savedObjectsClient.find).toHaveBeenCalledTimes(2);
|
||||
|
||||
// validate that page number increases
|
||||
const { page: page1 } = savedObjectsClient.find.mock.calls[0][0];
|
||||
const { page: page2 } = savedObjectsClient.find.mock.calls[1][0];
|
||||
expect(page1).toBe(1);
|
||||
expect(page2).toBe(2);
|
||||
});
|
||||
|
||||
test('fetches two pages if page size +1 objects are returned', async () => {
|
||||
let i = 0;
|
||||
savedObjectsClient.find.mockImplementation(() => {
|
||||
return new Promise((resolve) => {
|
||||
resolve({
|
||||
saved_objects: i++ === 0 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [emptySO],
|
||||
total: 5,
|
||||
page: i,
|
||||
} as any);
|
||||
});
|
||||
});
|
||||
|
||||
await checkRunningSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config
|
||||
);
|
||||
|
||||
expect(savedObjectsClient.find).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
test('fetching is abortable', async () => {
|
||||
let i = 0;
|
||||
const abort$ = new Subject();
|
||||
savedObjectsClient.find.mockImplementation(() => {
|
||||
return new Promise((resolve) => {
|
||||
if (++i === 2) {
|
||||
abort$.next();
|
||||
}
|
||||
resolve({
|
||||
saved_objects: i <= 5 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [],
|
||||
total: 25,
|
||||
page: i,
|
||||
} as any);
|
||||
});
|
||||
});
|
||||
|
||||
await checkRunningSessions$(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config
|
||||
)
|
||||
.pipe(takeUntil(abort$))
|
||||
.toPromise();
|
||||
|
||||
jest.runAllTimers();
|
||||
|
||||
// if not for `abort$` then this would be called 6 times!
|
||||
expect(savedObjectsClient.find).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
test('sorting is by "touched"', async () => {
|
||||
savedObjectsClient.find.mockResolvedValueOnce({
|
||||
saved_objects: [],
|
||||
total: 0,
|
||||
} as any);
|
||||
|
||||
await checkRunningSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config
|
||||
);
|
||||
|
||||
expect(savedObjectsClient.find).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ sortField: 'touched', sortOrder: 'asc' })
|
||||
);
|
||||
});
|
||||
|
||||
test('sessions fetched in the beginning are processed even if sessions in the end fail', async () => {
|
||||
let i = 0;
|
||||
savedObjectsClient.find.mockImplementation(() => {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (++i === 2) {
|
||||
reject(new Error('Fake find error...'));
|
||||
}
|
||||
resolve({
|
||||
saved_objects:
|
||||
i <= 5
|
||||
? [
|
||||
i === 1
|
||||
? {
|
||||
id: '123',
|
||||
attributes: {
|
||||
persisted: false,
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
created: moment().subtract(moment.duration(3, 'm')),
|
||||
touched: moment().subtract(moment.duration(2, 'm')),
|
||||
idMapping: {
|
||||
'map-key': {
|
||||
strategy: ENHANCED_ES_SEARCH_STRATEGY,
|
||||
id: 'async-id',
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
: emptySO,
|
||||
emptySO,
|
||||
emptySO,
|
||||
emptySO,
|
||||
emptySO,
|
||||
]
|
||||
: [],
|
||||
total: 25,
|
||||
page: i,
|
||||
} as any);
|
||||
});
|
||||
});
|
||||
|
||||
await checkRunningSessions$(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config
|
||||
).toPromise();
|
||||
|
||||
jest.runAllTimers();
|
||||
|
||||
expect(savedObjectsClient.find).toHaveBeenCalledTimes(2);
|
||||
|
||||
// by checking that delete was called we validate that sessions from session that were successfully fetched were processed
|
||||
expect(mockClient.asyncSearch.delete).toBeCalled();
|
||||
const { id } = mockClient.asyncSearch.delete.mock.calls[0][0];
|
||||
expect(id).toBe('async-id');
|
||||
});
|
||||
});
|
||||
|
||||
describe('delete', () => {
|
||||
test('doesnt delete a persisted session', async () => {
|
||||
savedObjectsClient.find.mockResolvedValue({
|
||||
saved_objects: [
|
||||
{
|
||||
id: '123',
|
||||
attributes: {
|
||||
persisted: true,
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
created: moment().subtract(moment.duration(30, 'm')),
|
||||
touched: moment().subtract(moment.duration(10, 'm')),
|
||||
idMapping: {},
|
||||
},
|
||||
},
|
||||
],
|
||||
total: 1,
|
||||
} as any);
|
||||
await checkRunningSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config
|
||||
);
|
||||
|
||||
expect(savedObjectsClient.bulkUpdate).not.toBeCalled();
|
||||
expect(savedObjectsClient.delete).not.toBeCalled();
|
||||
});
|
||||
|
||||
test('doesnt delete a non persisted, recently touched session', async () => {
|
||||
savedObjectsClient.find.mockResolvedValue({
|
||||
saved_objects: [
|
||||
|
@ -336,6 +90,7 @@ describe('getSearchStatus', () => {
|
|||
attributes: {
|
||||
persisted: false,
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
expires: moment().add(moment.duration(3, 'm')),
|
||||
created: moment().subtract(moment.duration(3, 'm')),
|
||||
touched: moment().subtract(moment.duration(10, 's')),
|
||||
idMapping: {},
|
||||
|
@ -344,7 +99,7 @@ describe('getSearchStatus', () => {
|
|||
],
|
||||
total: 1,
|
||||
} as any);
|
||||
await checkRunningSessions(
|
||||
await checkNonPersistedSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
|
@ -367,6 +122,7 @@ describe('getSearchStatus', () => {
|
|||
status: SearchSessionStatus.COMPLETE,
|
||||
created: moment().subtract(moment.duration(3, 'm')),
|
||||
touched: moment().subtract(moment.duration(1, 'm')),
|
||||
expires: moment().add(moment.duration(3, 'm')),
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
id: 'search-id',
|
||||
|
@ -379,7 +135,7 @@ describe('getSearchStatus', () => {
|
|||
],
|
||||
total: 1,
|
||||
} as any);
|
||||
await checkRunningSessions(
|
||||
await checkNonPersistedSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
|
@ -401,6 +157,7 @@ describe('getSearchStatus', () => {
|
|||
attributes: {
|
||||
persisted: false,
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
expires: moment().add(moment.duration(3, 'm')),
|
||||
created: moment().subtract(moment.duration(3, 'm')),
|
||||
touched: moment().subtract(moment.duration(2, 'm')),
|
||||
idMapping: {
|
||||
|
@ -415,7 +172,7 @@ describe('getSearchStatus', () => {
|
|||
total: 1,
|
||||
} as any);
|
||||
|
||||
await checkRunningSessions(
|
||||
await checkNonPersistedSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
|
@ -441,6 +198,7 @@ describe('getSearchStatus', () => {
|
|||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
created: moment().subtract(moment.duration(3, 'm')),
|
||||
touched: moment().subtract(moment.duration(2, 'm')),
|
||||
expires: moment().add(moment.duration(3, 'm')),
|
||||
idMapping: {
|
||||
'map-key': {
|
||||
strategy: ENHANCED_ES_SEARCH_STRATEGY,
|
||||
|
@ -453,7 +211,7 @@ describe('getSearchStatus', () => {
|
|||
total: 1,
|
||||
} as any);
|
||||
|
||||
await checkRunningSessions(
|
||||
await checkNonPersistedSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
|
@ -481,6 +239,7 @@ describe('getSearchStatus', () => {
|
|||
attributes: {
|
||||
persisted: false,
|
||||
status: SearchSessionStatus.COMPLETE,
|
||||
expires: moment().add(moment.duration(3, 'm')),
|
||||
created: moment().subtract(moment.duration(30, 'm')),
|
||||
touched: moment().subtract(moment.duration(6, 'm')),
|
||||
idMapping: {
|
||||
|
@ -501,7 +260,7 @@ describe('getSearchStatus', () => {
|
|||
total: 1,
|
||||
} as any);
|
||||
|
||||
await checkRunningSessions(
|
||||
await checkNonPersistedSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
|
@ -530,6 +289,7 @@ describe('getSearchStatus', () => {
|
|||
attributes: {
|
||||
persisted: false,
|
||||
status: SearchSessionStatus.COMPLETE,
|
||||
expires: moment().add(moment.duration(3, 'm')),
|
||||
created: moment().subtract(moment.duration(30, 'm')),
|
||||
touched: moment().subtract(moment.duration(6, 'm')),
|
||||
idMapping: {
|
||||
|
@ -545,7 +305,7 @@ describe('getSearchStatus', () => {
|
|||
total: 1,
|
||||
} as any);
|
||||
|
||||
await checkRunningSessions(
|
||||
await checkNonPersistedSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
|
@ -573,6 +333,7 @@ describe('getSearchStatus', () => {
|
|||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
created: moment().subtract(moment.duration(3, 'm')),
|
||||
touched: moment().subtract(moment.duration(10, 's')),
|
||||
expires: moment().add(moment.duration(3, 'm')),
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
id: 'search-id',
|
||||
|
@ -594,7 +355,7 @@ describe('getSearchStatus', () => {
|
|||
},
|
||||
});
|
||||
|
||||
await checkRunningSessions(
|
||||
await checkNonPersistedSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
|
@ -614,6 +375,7 @@ describe('getSearchStatus', () => {
|
|||
id: '123',
|
||||
attributes: {
|
||||
status: SearchSessionStatus.ERROR,
|
||||
expires: moment().add(moment.duration(3, 'm')),
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
id: 'search-id',
|
||||
|
@ -633,7 +395,7 @@ describe('getSearchStatus', () => {
|
|||
total: 1,
|
||||
} as any);
|
||||
|
||||
await checkRunningSessions(
|
||||
await checkNonPersistedSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
|
@ -653,6 +415,7 @@ describe('getSearchStatus', () => {
|
|||
namespaces: ['awesome'],
|
||||
attributes: {
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
expires: moment().add(moment.duration(3, 'm')),
|
||||
touched: '123',
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
|
@ -676,7 +439,7 @@ describe('getSearchStatus', () => {
|
|||
},
|
||||
});
|
||||
|
||||
await checkRunningSessions(
|
||||
await checkNonPersistedSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
|
@ -696,6 +459,7 @@ describe('getSearchStatus', () => {
|
|||
const so = {
|
||||
attributes: {
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
expires: moment().add(moment.duration(3, 'm')),
|
||||
touched: '123',
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
|
@ -719,7 +483,7 @@ describe('getSearchStatus', () => {
|
|||
},
|
||||
});
|
||||
|
||||
await checkRunningSessions(
|
||||
await checkNonPersistedSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
|
@ -744,6 +508,7 @@ describe('getSearchStatus', () => {
|
|||
savedObjectsClient.bulkUpdate = jest.fn();
|
||||
const so = {
|
||||
attributes: {
|
||||
expires: moment().add(moment.duration(3, 'm')),
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
id: 'search-id',
|
||||
|
@ -766,7 +531,7 @@ describe('getSearchStatus', () => {
|
|||
},
|
||||
});
|
||||
|
||||
await checkRunningSessions(
|
||||
await checkNonPersistedSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { SavedObjectsFindResult } from 'kibana/server';
|
||||
import moment from 'moment';
|
||||
import { EMPTY } from 'rxjs';
|
||||
import { catchError, concatMap } from 'rxjs/operators';
|
||||
import {
|
||||
nodeBuilder,
|
||||
ENHANCED_ES_SEARCH_STRATEGY,
|
||||
SEARCH_SESSION_TYPE,
|
||||
SearchSessionSavedObjectAttributes,
|
||||
SearchSessionStatus,
|
||||
KueryNode,
|
||||
} from '../../../../../../src/plugins/data/common';
|
||||
import { checkSearchSessionsByPage, getSearchSessionsPage$ } from './get_search_session_page';
|
||||
import { SearchSessionsConfig, CheckSearchSessionsDeps } from './types';
|
||||
import { bulkUpdateSessions, getAllSessionsStatusUpdates } from './update_session_status';
|
||||
|
||||
export const SEARCH_SESSIONS_CLEANUP_TASK_TYPE = 'search_sessions_cleanup';
|
||||
export const SEARCH_SESSIONS_CLEANUP_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_CLEANUP_TASK_TYPE}`;
|
||||
|
||||
function isSessionStale(
|
||||
session: SavedObjectsFindResult<SearchSessionSavedObjectAttributes>,
|
||||
config: SearchSessionsConfig
|
||||
) {
|
||||
const curTime = moment();
|
||||
// Delete cancelled sessions immediately
|
||||
if (session.attributes.status === SearchSessionStatus.CANCELLED) return true;
|
||||
// Delete if a running session wasn't polled for in the last notTouchedInProgressTimeout OR
|
||||
// if a completed \ errored \ canceled session wasn't saved for within notTouchedTimeout
|
||||
return (
|
||||
(session.attributes.status === SearchSessionStatus.IN_PROGRESS &&
|
||||
curTime.diff(moment(session.attributes.touched), 'ms') >
|
||||
config.notTouchedInProgressTimeout.asMilliseconds()) ||
|
||||
(session.attributes.status !== SearchSessionStatus.IN_PROGRESS &&
|
||||
curTime.diff(moment(session.attributes.touched), 'ms') >
|
||||
config.notTouchedTimeout.asMilliseconds())
|
||||
);
|
||||
}
|
||||
|
||||
function checkNonPersistedSessionsPage(
|
||||
deps: CheckSearchSessionsDeps,
|
||||
config: SearchSessionsConfig,
|
||||
filter: KueryNode,
|
||||
page: number
|
||||
) {
|
||||
const { logger, client, savedObjectsClient } = deps;
|
||||
logger.debug(`${SEARCH_SESSIONS_CLEANUP_TASK_TYPE} Fetching sessions from page ${page}`);
|
||||
return getSearchSessionsPage$(deps, filter, config.pageSize, page).pipe(
|
||||
concatMap(async (nonPersistedSearchSessions) => {
|
||||
if (!nonPersistedSearchSessions.total) return nonPersistedSearchSessions;
|
||||
|
||||
logger.debug(
|
||||
`${SEARCH_SESSIONS_CLEANUP_TASK_TYPE} Found ${nonPersistedSearchSessions.total} sessions, processing ${nonPersistedSearchSessions.saved_objects.length}`
|
||||
);
|
||||
|
||||
const updatedSessions = await getAllSessionsStatusUpdates(deps, nonPersistedSearchSessions);
|
||||
const deletedSessionIds: string[] = [];
|
||||
|
||||
await Promise.all(
|
||||
nonPersistedSearchSessions.saved_objects.map(async (session) => {
|
||||
if (isSessionStale(session, config)) {
|
||||
// delete saved object to free up memory
|
||||
// TODO: there's a potential rare edge case of deleting an object and then receiving a new trackId for that same session!
|
||||
// Maybe we want to change state to deleted and cleanup later?
|
||||
logger.debug(`Deleting stale session | ${session.id}`);
|
||||
try {
|
||||
deletedSessionIds.push(session.id);
|
||||
await savedObjectsClient.delete(SEARCH_SESSION_TYPE, session.id, {
|
||||
namespace: session.namespaces?.[0],
|
||||
});
|
||||
} catch (e) {
|
||||
logger.error(
|
||||
`${SEARCH_SESSIONS_CLEANUP_TASK_TYPE} Error while deleting session ${session.id}: ${e.message}`
|
||||
);
|
||||
}
|
||||
|
||||
// Send a delete request for each async search to ES
|
||||
Object.keys(session.attributes.idMapping).map(async (searchKey: string) => {
|
||||
const searchInfo = session.attributes.idMapping[searchKey];
|
||||
if (searchInfo.strategy === ENHANCED_ES_SEARCH_STRATEGY) {
|
||||
try {
|
||||
await client.asyncSearch.delete({ id: searchInfo.id });
|
||||
} catch (e) {
|
||||
if (e.message !== 'resource_not_found_exception') {
|
||||
logger.error(
|
||||
`${SEARCH_SESSIONS_CLEANUP_TASK_TYPE} Error while deleting async_search ${searchInfo.id}: ${e.message}`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
const nonDeletedSessions = updatedSessions.filter((updateSession) => {
|
||||
return deletedSessionIds.indexOf(updateSession.id) === -1;
|
||||
});
|
||||
|
||||
await bulkUpdateSessions(deps, nonDeletedSessions);
|
||||
|
||||
return nonPersistedSearchSessions;
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
export function checkNonPersistedSessions(
|
||||
deps: CheckSearchSessionsDeps,
|
||||
config: SearchSessionsConfig
|
||||
) {
|
||||
const { logger } = deps;
|
||||
|
||||
const filters = nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.persisted`, 'false');
|
||||
|
||||
return checkSearchSessionsByPage(checkNonPersistedSessionsPage, deps, config, filters).pipe(
|
||||
catchError((e) => {
|
||||
logger.error(
|
||||
`${SEARCH_SESSIONS_CLEANUP_TASK_TYPE} Error while processing sessions: ${e?.message}`
|
||||
);
|
||||
return EMPTY;
|
||||
})
|
||||
);
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { checkPersistedSessionsProgress } from './check_persisted_sessions';
|
||||
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
|
||||
import { SearchSessionsConfig } from './types';
|
||||
import moment from 'moment';
|
||||
import { SavedObjectsClientContract } from '../../../../../../src/core/server';
|
||||
|
||||
describe('checkPersistedSessionsProgress', () => {
|
||||
let mockClient: any;
|
||||
let savedObjectsClient: jest.Mocked<SavedObjectsClientContract>;
|
||||
const config: SearchSessionsConfig = {
|
||||
enabled: true,
|
||||
pageSize: 5,
|
||||
notTouchedInProgressTimeout: moment.duration(1, 'm'),
|
||||
notTouchedTimeout: moment.duration(5, 'm'),
|
||||
maxUpdateRetries: 3,
|
||||
defaultExpiration: moment.duration(7, 'd'),
|
||||
trackingInterval: moment.duration(10, 's'),
|
||||
cleanupInterval: moment.duration(10, 's'),
|
||||
expireInterval: moment.duration(10, 'm'),
|
||||
monitoringTaskTimeout: moment.duration(5, 'm'),
|
||||
management: {} as any,
|
||||
};
|
||||
const mockLogger: any = {
|
||||
debug: jest.fn(),
|
||||
warn: jest.fn(),
|
||||
error: jest.fn(),
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
savedObjectsClient = savedObjectsClientMock.create();
|
||||
mockClient = {
|
||||
asyncSearch: {
|
||||
status: jest.fn(),
|
||||
delete: jest.fn(),
|
||||
},
|
||||
eql: {
|
||||
status: jest.fn(),
|
||||
delete: jest.fn(),
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
test('fetches only running persisted sessions', async () => {
|
||||
savedObjectsClient.find.mockResolvedValue({
|
||||
saved_objects: [],
|
||||
total: 0,
|
||||
} as any);
|
||||
|
||||
await checkPersistedSessionsProgress(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config
|
||||
);
|
||||
|
||||
const [findInput] = savedObjectsClient.find.mock.calls[0];
|
||||
|
||||
expect(findInput.filter.arguments[0].arguments[0].value).toBe(
|
||||
'search-session.attributes.persisted'
|
||||
);
|
||||
expect(findInput.filter.arguments[0].arguments[1].value).toBe('true');
|
||||
expect(findInput.filter.arguments[1].arguments[0].value).toBe(
|
||||
'search-session.attributes.status'
|
||||
);
|
||||
expect(findInput.filter.arguments[1].arguments[1].value).toBe('in_progress');
|
||||
});
|
||||
});
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { EMPTY, Observable } from 'rxjs';
|
||||
import { catchError, concatMap } from 'rxjs/operators';
|
||||
import {
|
||||
nodeBuilder,
|
||||
SEARCH_SESSION_TYPE,
|
||||
SearchSessionStatus,
|
||||
KueryNode,
|
||||
} from '../../../../../../src/plugins/data/common';
|
||||
import { checkSearchSessionsByPage, getSearchSessionsPage$ } from './get_search_session_page';
|
||||
import { SearchSessionsConfig, CheckSearchSessionsDeps, SearchSessionsResponse } from './types';
|
||||
import { bulkUpdateSessions, getAllSessionsStatusUpdates } from './update_session_status';
|
||||
|
||||
export const SEARCH_SESSIONS_TASK_TYPE = 'search_sessions_monitor';
|
||||
export const SEARCH_SESSIONS_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_TASK_TYPE}`;
|
||||
|
||||
function checkPersistedSessionsPage(
|
||||
deps: CheckSearchSessionsDeps,
|
||||
config: SearchSessionsConfig,
|
||||
filter: KueryNode,
|
||||
page: number
|
||||
): Observable<SearchSessionsResponse> {
|
||||
const { logger } = deps;
|
||||
logger.debug(`${SEARCH_SESSIONS_TASK_TYPE} Fetching sessions from page ${page}`);
|
||||
return getSearchSessionsPage$(deps, filter, config.pageSize, page).pipe(
|
||||
concatMap(async (persistedSearchSessions) => {
|
||||
if (!persistedSearchSessions.total) return persistedSearchSessions;
|
||||
|
||||
logger.debug(
|
||||
`${SEARCH_SESSIONS_TASK_TYPE} Found ${persistedSearchSessions.total} sessions, processing ${persistedSearchSessions.saved_objects.length}`
|
||||
);
|
||||
|
||||
const updatedSessions = await getAllSessionsStatusUpdates(deps, persistedSearchSessions);
|
||||
await bulkUpdateSessions(deps, updatedSessions);
|
||||
|
||||
return persistedSearchSessions;
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
export function checkPersistedSessionsProgress(
|
||||
deps: CheckSearchSessionsDeps,
|
||||
config: SearchSessionsConfig
|
||||
) {
|
||||
const { logger } = deps;
|
||||
|
||||
const persistedSessionsFilter = nodeBuilder.and([
|
||||
nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.persisted`, 'true'),
|
||||
nodeBuilder.is(
|
||||
`${SEARCH_SESSION_TYPE}.attributes.status`,
|
||||
SearchSessionStatus.IN_PROGRESS.toString()
|
||||
),
|
||||
]);
|
||||
|
||||
return checkSearchSessionsByPage(
|
||||
checkPersistedSessionsPage,
|
||||
deps,
|
||||
config,
|
||||
persistedSessionsFilter
|
||||
).pipe(
|
||||
catchError((e) => {
|
||||
logger.error(`${SEARCH_SESSIONS_TASK_TYPE} Error while processing sessions: ${e?.message}`);
|
||||
return EMPTY;
|
||||
})
|
||||
);
|
||||
}
|
|
@ -1,257 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import {
|
||||
ElasticsearchClient,
|
||||
Logger,
|
||||
SavedObjectsClientContract,
|
||||
SavedObjectsFindResult,
|
||||
SavedObjectsUpdateResponse,
|
||||
} from 'kibana/server';
|
||||
import moment from 'moment';
|
||||
import { EMPTY, from, Observable } from 'rxjs';
|
||||
import { catchError, concatMap } from 'rxjs/operators';
|
||||
import {
|
||||
nodeBuilder,
|
||||
ENHANCED_ES_SEARCH_STRATEGY,
|
||||
SEARCH_SESSION_TYPE,
|
||||
SearchSessionRequestInfo,
|
||||
SearchSessionSavedObjectAttributes,
|
||||
SearchSessionStatus,
|
||||
} from '../../../../../../src/plugins/data/common';
|
||||
import { getSearchStatus } from './get_search_status';
|
||||
import { getSessionStatus } from './get_session_status';
|
||||
import { SearchSessionsConfig, SearchStatus } from './types';
|
||||
|
||||
export interface CheckRunningSessionsDeps {
|
||||
savedObjectsClient: SavedObjectsClientContract;
|
||||
client: ElasticsearchClient;
|
||||
logger: Logger;
|
||||
}
|
||||
|
||||
function isSessionStale(
|
||||
session: SavedObjectsFindResult<SearchSessionSavedObjectAttributes>,
|
||||
config: SearchSessionsConfig,
|
||||
logger: Logger
|
||||
) {
|
||||
const curTime = moment();
|
||||
// Delete if a running session wasn't polled for in the last notTouchedInProgressTimeout OR
|
||||
// if a completed \ errored \ canceled session wasn't saved for within notTouchedTimeout
|
||||
return (
|
||||
(session.attributes.status === SearchSessionStatus.IN_PROGRESS &&
|
||||
curTime.diff(moment(session.attributes.touched), 'ms') >
|
||||
config.notTouchedInProgressTimeout.asMilliseconds()) ||
|
||||
(session.attributes.status !== SearchSessionStatus.IN_PROGRESS &&
|
||||
curTime.diff(moment(session.attributes.touched), 'ms') >
|
||||
config.notTouchedTimeout.asMilliseconds())
|
||||
);
|
||||
}
|
||||
|
||||
async function updateSessionStatus(
|
||||
session: SavedObjectsFindResult<SearchSessionSavedObjectAttributes>,
|
||||
client: ElasticsearchClient,
|
||||
logger: Logger
|
||||
) {
|
||||
let sessionUpdated = false;
|
||||
|
||||
// Check statuses of all running searches
|
||||
await Promise.all(
|
||||
Object.keys(session.attributes.idMapping).map(async (searchKey: string) => {
|
||||
const updateSearchRequest = (
|
||||
currentStatus: Pick<SearchSessionRequestInfo, 'status' | 'error'>
|
||||
) => {
|
||||
sessionUpdated = true;
|
||||
session.attributes.idMapping[searchKey] = {
|
||||
...session.attributes.idMapping[searchKey],
|
||||
...currentStatus,
|
||||
};
|
||||
};
|
||||
|
||||
const searchInfo = session.attributes.idMapping[searchKey];
|
||||
if (searchInfo.status === SearchStatus.IN_PROGRESS) {
|
||||
try {
|
||||
const currentStatus = await getSearchStatus(client, searchInfo.id);
|
||||
|
||||
if (currentStatus.status !== searchInfo.status) {
|
||||
logger.debug(`search ${searchInfo.id} | status changed to ${currentStatus.status}`);
|
||||
updateSearchRequest(currentStatus);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(e);
|
||||
updateSearchRequest({
|
||||
status: SearchStatus.ERROR,
|
||||
error: e.message || e.meta.error?.caused_by?.reason,
|
||||
});
|
||||
}
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
// And only then derive the session's status
|
||||
const sessionStatus = getSessionStatus(session.attributes);
|
||||
if (sessionStatus !== session.attributes.status) {
|
||||
const now = new Date().toISOString();
|
||||
session.attributes.status = sessionStatus;
|
||||
session.attributes.touched = now;
|
||||
if (sessionStatus === SearchSessionStatus.COMPLETE) {
|
||||
session.attributes.completed = now;
|
||||
} else if (session.attributes.completed) {
|
||||
session.attributes.completed = null;
|
||||
}
|
||||
sessionUpdated = true;
|
||||
}
|
||||
|
||||
return sessionUpdated;
|
||||
}
|
||||
|
||||
function getSavedSearchSessionsPage$(
|
||||
{ savedObjectsClient, logger }: CheckRunningSessionsDeps,
|
||||
config: SearchSessionsConfig,
|
||||
page: number
|
||||
) {
|
||||
logger.debug(`Fetching saved search sessions page ${page}`);
|
||||
return from(
|
||||
savedObjectsClient.find<SearchSessionSavedObjectAttributes>({
|
||||
page,
|
||||
perPage: config.pageSize,
|
||||
type: SEARCH_SESSION_TYPE,
|
||||
namespaces: ['*'],
|
||||
// process older sessions first
|
||||
sortField: 'touched',
|
||||
sortOrder: 'asc',
|
||||
filter: nodeBuilder.or([
|
||||
nodeBuilder.and([
|
||||
nodeBuilder.is(
|
||||
`${SEARCH_SESSION_TYPE}.attributes.status`,
|
||||
SearchSessionStatus.IN_PROGRESS.toString()
|
||||
),
|
||||
nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.persisted`, 'true'),
|
||||
]),
|
||||
nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.persisted`, 'false'),
|
||||
]),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
function checkRunningSessionsPage(
|
||||
deps: CheckRunningSessionsDeps,
|
||||
config: SearchSessionsConfig,
|
||||
page: number
|
||||
) {
|
||||
const { logger, client, savedObjectsClient } = deps;
|
||||
return getSavedSearchSessionsPage$(deps, config, page).pipe(
|
||||
concatMap(async (runningSearchSessionsResponse) => {
|
||||
if (!runningSearchSessionsResponse.total) return;
|
||||
|
||||
logger.debug(
|
||||
`Found ${runningSearchSessionsResponse.total} running sessions, processing ${runningSearchSessionsResponse.saved_objects.length} sessions from page ${page}`
|
||||
);
|
||||
|
||||
const updatedSessions = new Array<
|
||||
SavedObjectsFindResult<SearchSessionSavedObjectAttributes>
|
||||
>();
|
||||
|
||||
await Promise.all(
|
||||
runningSearchSessionsResponse.saved_objects.map(async (session) => {
|
||||
const updated = await updateSessionStatus(session, client, logger);
|
||||
let deleted = false;
|
||||
|
||||
if (!session.attributes.persisted) {
|
||||
if (isSessionStale(session, config, logger)) {
|
||||
// delete saved object to free up memory
|
||||
// TODO: there's a potential rare edge case of deleting an object and then receiving a new trackId for that same session!
|
||||
// Maybe we want to change state to deleted and cleanup later?
|
||||
logger.debug(`Deleting stale session | ${session.id}`);
|
||||
try {
|
||||
await savedObjectsClient.delete(SEARCH_SESSION_TYPE, session.id, {
|
||||
namespace: session.namespaces?.[0],
|
||||
});
|
||||
deleted = true;
|
||||
} catch (e) {
|
||||
logger.error(
|
||||
`Error while deleting stale search session ${session.id}: ${e.message}`
|
||||
);
|
||||
}
|
||||
|
||||
// Send a delete request for each async search to ES
|
||||
Object.keys(session.attributes.idMapping).map(async (searchKey: string) => {
|
||||
const searchInfo = session.attributes.idMapping[searchKey];
|
||||
if (searchInfo.strategy === ENHANCED_ES_SEARCH_STRATEGY) {
|
||||
try {
|
||||
await client.asyncSearch.delete({ id: searchInfo.id });
|
||||
} catch (e) {
|
||||
logger.error(
|
||||
`Error while deleting async_search ${searchInfo.id}: ${e.message}`
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (updated && !deleted) {
|
||||
updatedSessions.push(session);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
// Do a bulk update
|
||||
if (updatedSessions.length) {
|
||||
// If there's an error, we'll try again in the next iteration, so there's no need to check the output.
|
||||
const updatedResponse = await savedObjectsClient.bulkUpdate<SearchSessionSavedObjectAttributes>(
|
||||
updatedSessions.map((session) => ({
|
||||
...session,
|
||||
namespace: session.namespaces?.[0],
|
||||
}))
|
||||
);
|
||||
|
||||
const success: Array<SavedObjectsUpdateResponse<SearchSessionSavedObjectAttributes>> = [];
|
||||
const fail: Array<SavedObjectsUpdateResponse<SearchSessionSavedObjectAttributes>> = [];
|
||||
|
||||
updatedResponse.saved_objects.forEach((savedObjectResponse) => {
|
||||
if ('error' in savedObjectResponse) {
|
||||
fail.push(savedObjectResponse);
|
||||
logger.error(
|
||||
`Error while updating search session ${savedObjectResponse?.id}: ${savedObjectResponse.error?.message}`
|
||||
);
|
||||
} else {
|
||||
success.push(savedObjectResponse);
|
||||
}
|
||||
});
|
||||
|
||||
logger.debug(`Updating search sessions: success: ${success.length}, fail: ${fail.length}`);
|
||||
}
|
||||
|
||||
return runningSearchSessionsResponse;
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
export function checkRunningSessions(deps: CheckRunningSessionsDeps, config: SearchSessionsConfig) {
|
||||
const { logger } = deps;
|
||||
|
||||
const checkRunningSessionsByPage = (nextPage = 1): Observable<void> =>
|
||||
checkRunningSessionsPage(deps, config, nextPage).pipe(
|
||||
concatMap((result) => {
|
||||
if (!result || !result.saved_objects || result.saved_objects.length < config.pageSize) {
|
||||
return EMPTY;
|
||||
} else {
|
||||
// TODO: while processing previous page session list might have been changed and we might skip a session,
|
||||
// because it would appear now on a different "page".
|
||||
// This isn't critical, as we would pick it up on a next task iteration, but maybe we could improve this somehow
|
||||
return checkRunningSessionsByPage(result.page + 1);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
return checkRunningSessionsByPage().pipe(
|
||||
catchError((e) => {
|
||||
logger.error(`Error while processing search sessions: ${e?.message}`);
|
||||
return EMPTY;
|
||||
})
|
||||
);
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { EMPTY, Observable } from 'rxjs';
|
||||
import { catchError, concatMap } from 'rxjs/operators';
|
||||
import {
|
||||
nodeBuilder,
|
||||
SEARCH_SESSION_TYPE,
|
||||
SearchSessionStatus,
|
||||
KueryNode,
|
||||
} from '../../../../../../src/plugins/data/common';
|
||||
import { checkSearchSessionsByPage, getSearchSessionsPage$ } from './get_search_session_page';
|
||||
import { SearchSessionsConfig, CheckSearchSessionsDeps, SearchSessionsResponse } from './types';
|
||||
import { bulkUpdateSessions, getAllSessionsStatusUpdates } from './update_session_status';
|
||||
|
||||
export const SEARCH_SESSIONS_EXPIRE_TASK_TYPE = 'search_sessions_expire';
|
||||
export const SEARCH_SESSIONS_EXPIRE_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_EXPIRE_TASK_TYPE}`;
|
||||
|
||||
function checkSessionExpirationPage(
|
||||
deps: CheckSearchSessionsDeps,
|
||||
config: SearchSessionsConfig,
|
||||
filter: KueryNode,
|
||||
page: number
|
||||
): Observable<SearchSessionsResponse> {
|
||||
const { logger } = deps;
|
||||
logger.debug(`${SEARCH_SESSIONS_EXPIRE_TASK_TYPE} Fetching sessions from page ${page}`);
|
||||
return getSearchSessionsPage$(deps, filter, config.pageSize, page).pipe(
|
||||
concatMap(async (searchSessions) => {
|
||||
if (!searchSessions.total) return searchSessions;
|
||||
|
||||
logger.debug(
|
||||
`${SEARCH_SESSIONS_EXPIRE_TASK_TYPE} Found ${searchSessions.total} sessions, processing ${searchSessions.saved_objects.length}`
|
||||
);
|
||||
|
||||
const updatedSessions = await getAllSessionsStatusUpdates(deps, searchSessions);
|
||||
await bulkUpdateSessions(deps, updatedSessions);
|
||||
|
||||
return searchSessions;
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
export function checkPersistedCompletedSessionExpiration(
|
||||
deps: CheckSearchSessionsDeps,
|
||||
config: SearchSessionsConfig
|
||||
) {
|
||||
const { logger } = deps;
|
||||
|
||||
const persistedSessionsFilter = nodeBuilder.and([
|
||||
nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.persisted`, 'true'),
|
||||
nodeBuilder.is(
|
||||
`${SEARCH_SESSION_TYPE}.attributes.status`,
|
||||
SearchSessionStatus.COMPLETE.toString()
|
||||
),
|
||||
]);
|
||||
|
||||
return checkSearchSessionsByPage(
|
||||
checkSessionExpirationPage,
|
||||
deps,
|
||||
config,
|
||||
persistedSessionsFilter
|
||||
).pipe(
|
||||
catchError((e) => {
|
||||
logger.error(
|
||||
`${SEARCH_SESSIONS_EXPIRE_TASK_TYPE} Error while processing sessions: ${e?.message}`
|
||||
);
|
||||
return EMPTY;
|
||||
})
|
||||
);
|
||||
}
|
|
@ -0,0 +1,282 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { checkSearchSessionsByPage, getSearchSessionsPage$ } from './get_search_session_page';
|
||||
import {
|
||||
SearchSessionStatus,
|
||||
ENHANCED_ES_SEARCH_STRATEGY,
|
||||
} from '../../../../../../src/plugins/data/common';
|
||||
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
|
||||
import { SearchSessionsConfig, SearchStatus } from './types';
|
||||
import moment from 'moment';
|
||||
import { SavedObjectsClientContract } from '../../../../../../src/core/server';
|
||||
import { of, Subject, throwError } from 'rxjs';
|
||||
import { takeUntil } from 'rxjs/operators';
|
||||
jest.useFakeTimers();
|
||||
|
||||
describe('checkSearchSessionsByPage', () => {
|
||||
const mockClient = {} as any;
|
||||
let savedObjectsClient: jest.Mocked<SavedObjectsClientContract>;
|
||||
const config: SearchSessionsConfig = {
|
||||
enabled: true,
|
||||
pageSize: 5,
|
||||
management: {} as any,
|
||||
} as any;
|
||||
const mockLogger: any = {
|
||||
debug: jest.fn(),
|
||||
warn: jest.fn(),
|
||||
error: jest.fn(),
|
||||
};
|
||||
|
||||
const emptySO = {
|
||||
attributes: {
|
||||
persisted: false,
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
created: moment().subtract(moment.duration(3, 'm')),
|
||||
touched: moment().subtract(moment.duration(10, 's')),
|
||||
idMapping: {},
|
||||
},
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
savedObjectsClient = savedObjectsClientMock.create();
|
||||
});
|
||||
|
||||
describe('getSearchSessionsPage$', () => {
|
||||
test('sorting is by "touched"', async () => {
|
||||
savedObjectsClient.find.mockResolvedValueOnce({
|
||||
saved_objects: [],
|
||||
total: 0,
|
||||
} as any);
|
||||
|
||||
await getSearchSessionsPage$(
|
||||
{
|
||||
savedObjectsClient,
|
||||
} as any,
|
||||
{
|
||||
type: 'literal',
|
||||
},
|
||||
1,
|
||||
1
|
||||
);
|
||||
|
||||
expect(savedObjectsClient.find).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ sortField: 'touched', sortOrder: 'asc' })
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('pagination', () => {
|
||||
test('fetches one page if got empty response', async () => {
|
||||
const checkFn = jest.fn().mockReturnValue(of(undefined));
|
||||
|
||||
await checkSearchSessionsByPage(
|
||||
checkFn,
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config,
|
||||
[]
|
||||
).toPromise();
|
||||
|
||||
expect(checkFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
test('fetches one page if got response with no saved objects', async () => {
|
||||
const checkFn = jest.fn().mockReturnValue(
|
||||
of({
|
||||
total: 0,
|
||||
})
|
||||
);
|
||||
|
||||
await checkSearchSessionsByPage(
|
||||
checkFn,
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config,
|
||||
[]
|
||||
).toPromise();
|
||||
|
||||
expect(checkFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
test('fetches one page if less than page size object are returned', async () => {
|
||||
const checkFn = jest.fn().mockReturnValue(
|
||||
of({
|
||||
saved_objects: [emptySO, emptySO],
|
||||
total: 5,
|
||||
})
|
||||
);
|
||||
|
||||
await checkSearchSessionsByPage(
|
||||
checkFn,
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config,
|
||||
[]
|
||||
).toPromise();
|
||||
|
||||
expect(checkFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
test('fetches two pages if exactly page size objects are returned', async () => {
|
||||
let i = 0;
|
||||
|
||||
const checkFn = jest.fn().mockImplementation(() =>
|
||||
of({
|
||||
saved_objects: i++ === 0 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [],
|
||||
total: 5,
|
||||
page: i,
|
||||
})
|
||||
);
|
||||
|
||||
await checkSearchSessionsByPage(
|
||||
checkFn,
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config,
|
||||
[]
|
||||
).toPromise();
|
||||
|
||||
expect(checkFn).toHaveBeenCalledTimes(2);
|
||||
|
||||
// validate that page number increases
|
||||
const page1 = checkFn.mock.calls[0][3];
|
||||
const page2 = checkFn.mock.calls[1][3];
|
||||
expect(page1).toBe(1);
|
||||
expect(page2).toBe(2);
|
||||
});
|
||||
|
||||
test('fetches two pages if page size +1 objects are returned', async () => {
|
||||
let i = 0;
|
||||
|
||||
const checkFn = jest.fn().mockImplementation(() =>
|
||||
of({
|
||||
saved_objects: i++ === 0 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [emptySO],
|
||||
total: i === 0 ? 5 : 1,
|
||||
page: i,
|
||||
})
|
||||
);
|
||||
|
||||
await checkSearchSessionsByPage(
|
||||
checkFn,
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config,
|
||||
[]
|
||||
).toPromise();
|
||||
|
||||
expect(checkFn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
test('sessions fetched in the beginning are processed even if sessions in the end fail', async () => {
|
||||
let i = 0;
|
||||
|
||||
const checkFn = jest.fn().mockImplementation(() => {
|
||||
if (++i === 2) {
|
||||
return throwError('Fake find error...');
|
||||
}
|
||||
return of({
|
||||
saved_objects:
|
||||
i <= 5
|
||||
? [
|
||||
i === 1
|
||||
? {
|
||||
id: '123',
|
||||
attributes: {
|
||||
persisted: false,
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
created: moment().subtract(moment.duration(3, 'm')),
|
||||
touched: moment().subtract(moment.duration(2, 'm')),
|
||||
idMapping: {
|
||||
'map-key': {
|
||||
strategy: ENHANCED_ES_SEARCH_STRATEGY,
|
||||
id: 'async-id',
|
||||
status: SearchStatus.IN_PROGRESS,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
: emptySO,
|
||||
emptySO,
|
||||
emptySO,
|
||||
emptySO,
|
||||
emptySO,
|
||||
]
|
||||
: [],
|
||||
total: 25,
|
||||
page: i,
|
||||
});
|
||||
});
|
||||
|
||||
await checkSearchSessionsByPage(
|
||||
checkFn,
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config,
|
||||
[]
|
||||
)
|
||||
.toPromise()
|
||||
.catch(() => {});
|
||||
|
||||
expect(checkFn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
test('fetching is abortable', async () => {
|
||||
let i = 0;
|
||||
const abort$ = new Subject();
|
||||
|
||||
const checkFn = jest.fn().mockImplementation(() => {
|
||||
if (++i === 2) {
|
||||
abort$.next();
|
||||
}
|
||||
|
||||
return of({
|
||||
saved_objects: i <= 5 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [],
|
||||
total: 25,
|
||||
page: i,
|
||||
});
|
||||
});
|
||||
|
||||
await checkSearchSessionsByPage(
|
||||
checkFn,
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
config,
|
||||
[]
|
||||
)
|
||||
.pipe(takeUntil(abort$))
|
||||
.toPromise()
|
||||
.catch(() => {});
|
||||
|
||||
jest.runAllTimers();
|
||||
|
||||
// if not for `abort$` then this would be called 6 times!
|
||||
expect(checkFn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { SavedObjectsClientContract, Logger } from 'kibana/server';
|
||||
import { from, Observable, EMPTY } from 'rxjs';
|
||||
import { concatMap } from 'rxjs/operators';
|
||||
import {
|
||||
SearchSessionSavedObjectAttributes,
|
||||
SEARCH_SESSION_TYPE,
|
||||
KueryNode,
|
||||
} from '../../../../../../src/plugins/data/common';
|
||||
import { CheckSearchSessionsDeps, CheckSearchSessionsFn, SearchSessionsConfig } from './types';
|
||||
|
||||
export interface GetSessionsDeps {
|
||||
savedObjectsClient: SavedObjectsClientContract;
|
||||
logger: Logger;
|
||||
}
|
||||
|
||||
export function getSearchSessionsPage$(
|
||||
{ savedObjectsClient }: GetSessionsDeps,
|
||||
filter: KueryNode,
|
||||
pageSize: number,
|
||||
page: number
|
||||
) {
|
||||
return from(
|
||||
savedObjectsClient.find<SearchSessionSavedObjectAttributes>({
|
||||
page,
|
||||
perPage: pageSize,
|
||||
type: SEARCH_SESSION_TYPE,
|
||||
namespaces: ['*'],
|
||||
// process older sessions first
|
||||
sortField: 'touched',
|
||||
sortOrder: 'asc',
|
||||
filter,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
export const checkSearchSessionsByPage = (
|
||||
checkFn: CheckSearchSessionsFn,
|
||||
deps: CheckSearchSessionsDeps,
|
||||
config: SearchSessionsConfig,
|
||||
filters: any,
|
||||
nextPage = 1
|
||||
): Observable<void> =>
|
||||
checkFn(deps, config, filters, nextPage).pipe(
|
||||
concatMap((result) => {
|
||||
if (!result || !result.saved_objects || result.saved_objects.length < config.pageSize) {
|
||||
return EMPTY;
|
||||
} else {
|
||||
// TODO: while processing previous page session list might have been changed and we might skip a session,
|
||||
// because it would appear now on a different "page".
|
||||
// This isn't critical, as we would pick it up on a next task iteration, but maybe we could improve this somehow
|
||||
return checkSearchSessionsByPage(checkFn, deps, config, filters, result.page + 1);
|
||||
}
|
||||
})
|
||||
);
|
|
@ -6,4 +6,3 @@
|
|||
*/
|
||||
|
||||
export * from './session_service';
|
||||
export { registerSearchSessionsTask, scheduleSearchSessionsTasks } from './monitoring_task';
|
||||
|
|
|
@ -1,119 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { Duration } from 'moment';
|
||||
import { filter, takeUntil } from 'rxjs/operators';
|
||||
import { BehaviorSubject } from 'rxjs';
|
||||
import {
|
||||
TaskManagerSetupContract,
|
||||
TaskManagerStartContract,
|
||||
RunContext,
|
||||
TaskRunCreatorFunction,
|
||||
} from '../../../../task_manager/server';
|
||||
import { checkRunningSessions } from './check_running_sessions';
|
||||
import { CoreSetup, SavedObjectsClient, Logger } from '../../../../../../src/core/server';
|
||||
import { ConfigSchema } from '../../../config';
|
||||
import { SEARCH_SESSION_TYPE } from '../../../../../../src/plugins/data/common';
|
||||
import { DataEnhancedStartDependencies } from '../../type';
|
||||
|
||||
export const SEARCH_SESSIONS_TASK_TYPE = 'search_sessions_monitor';
|
||||
export const SEARCH_SESSIONS_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_TASK_TYPE}`;
|
||||
|
||||
interface SearchSessionTaskDeps {
|
||||
taskManager: TaskManagerSetupContract;
|
||||
logger: Logger;
|
||||
config: ConfigSchema;
|
||||
}
|
||||
|
||||
function searchSessionRunner(
|
||||
core: CoreSetup<DataEnhancedStartDependencies>,
|
||||
{ logger, config }: SearchSessionTaskDeps
|
||||
): TaskRunCreatorFunction {
|
||||
return ({ taskInstance }: RunContext) => {
|
||||
const aborted$ = new BehaviorSubject<boolean>(false);
|
||||
return {
|
||||
async run() {
|
||||
const sessionConfig = config.search.sessions;
|
||||
const [coreStart] = await core.getStartServices();
|
||||
if (!sessionConfig.enabled) {
|
||||
logger.debug('Search sessions are disabled. Skipping task.');
|
||||
return;
|
||||
}
|
||||
if (aborted$.getValue()) return;
|
||||
|
||||
const internalRepo = coreStart.savedObjects.createInternalRepository([SEARCH_SESSION_TYPE]);
|
||||
const internalSavedObjectsClient = new SavedObjectsClient(internalRepo);
|
||||
await checkRunningSessions(
|
||||
{
|
||||
savedObjectsClient: internalSavedObjectsClient,
|
||||
client: coreStart.elasticsearch.client.asInternalUser,
|
||||
logger,
|
||||
},
|
||||
sessionConfig
|
||||
)
|
||||
.pipe(takeUntil(aborted$.pipe(filter((aborted) => aborted))))
|
||||
.toPromise();
|
||||
|
||||
return {
|
||||
state: {},
|
||||
};
|
||||
},
|
||||
cancel: async () => {
|
||||
aborted$.next(true);
|
||||
},
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
export function registerSearchSessionsTask(
|
||||
core: CoreSetup<DataEnhancedStartDependencies>,
|
||||
deps: SearchSessionTaskDeps
|
||||
) {
|
||||
deps.taskManager.registerTaskDefinitions({
|
||||
[SEARCH_SESSIONS_TASK_TYPE]: {
|
||||
title: 'Search Sessions Monitor',
|
||||
createTaskRunner: searchSessionRunner(core, deps),
|
||||
timeout: `${deps.config.search.sessions.monitoringTaskTimeout.asSeconds()}s`,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function unscheduleSearchSessionsTask(
|
||||
taskManager: TaskManagerStartContract,
|
||||
logger: Logger
|
||||
) {
|
||||
try {
|
||||
await taskManager.removeIfExists(SEARCH_SESSIONS_TASK_ID);
|
||||
logger.debug(`Search sessions cleared`);
|
||||
} catch (e) {
|
||||
logger.error(`Error clearing task, received ${e.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function scheduleSearchSessionsTasks(
|
||||
taskManager: TaskManagerStartContract,
|
||||
logger: Logger,
|
||||
trackingInterval: Duration
|
||||
) {
|
||||
await taskManager.removeIfExists(SEARCH_SESSIONS_TASK_ID);
|
||||
|
||||
try {
|
||||
await taskManager.ensureScheduled({
|
||||
id: SEARCH_SESSIONS_TASK_ID,
|
||||
taskType: SEARCH_SESSIONS_TASK_TYPE,
|
||||
schedule: {
|
||||
interval: `${trackingInterval.asSeconds()}s`,
|
||||
},
|
||||
state: {},
|
||||
params: {},
|
||||
});
|
||||
|
||||
logger.debug(`Search sessions task, scheduled to run`);
|
||||
} catch (e) {
|
||||
logger.error(`Error scheduling task, received ${e.message}`);
|
||||
}
|
||||
}
|
|
@ -79,7 +79,9 @@ describe('SearchSessionService', () => {
|
|||
maxUpdateRetries: MAX_UPDATE_RETRIES,
|
||||
defaultExpiration: moment.duration(7, 'd'),
|
||||
monitoringTaskTimeout: moment.duration(5, 'm'),
|
||||
cleanupInterval: moment.duration(10, 's'),
|
||||
trackingInterval: moment.duration(10, 's'),
|
||||
expireInterval: moment.duration(10, 'm'),
|
||||
management: {} as any,
|
||||
},
|
||||
},
|
||||
|
@ -157,7 +159,9 @@ describe('SearchSessionService', () => {
|
|||
maxUpdateRetries: MAX_UPDATE_RETRIES,
|
||||
defaultExpiration: moment.duration(7, 'd'),
|
||||
trackingInterval: moment.duration(10, 's'),
|
||||
expireInterval: moment.duration(10, 'm'),
|
||||
monitoringTaskTimeout: moment.duration(5, 'm'),
|
||||
cleanupInterval: moment.duration(10, 's'),
|
||||
management: {} as any,
|
||||
},
|
||||
},
|
||||
|
|
|
@ -43,11 +43,26 @@ import { createRequestHash } from './utils';
|
|||
import { ConfigSchema } from '../../../config';
|
||||
import {
|
||||
registerSearchSessionsTask,
|
||||
scheduleSearchSessionsTasks,
|
||||
scheduleSearchSessionsTask,
|
||||
unscheduleSearchSessionsTask,
|
||||
} from './monitoring_task';
|
||||
} from './setup_task';
|
||||
import { SearchSessionsConfig, SearchStatus } from './types';
|
||||
import { DataEnhancedStartDependencies } from '../../type';
|
||||
import {
|
||||
checkPersistedSessionsProgress,
|
||||
SEARCH_SESSIONS_TASK_ID,
|
||||
SEARCH_SESSIONS_TASK_TYPE,
|
||||
} from './check_persisted_sessions';
|
||||
import {
|
||||
SEARCH_SESSIONS_CLEANUP_TASK_TYPE,
|
||||
checkNonPersistedSessions,
|
||||
SEARCH_SESSIONS_CLEANUP_TASK_ID,
|
||||
} from './check_non_persiseted_sessions';
|
||||
import {
|
||||
SEARCH_SESSIONS_EXPIRE_TASK_TYPE,
|
||||
SEARCH_SESSIONS_EXPIRE_TASK_ID,
|
||||
checkPersistedCompletedSessionExpiration,
|
||||
} from './expire_persisted_sessions';
|
||||
|
||||
export interface SearchSessionDependencies {
|
||||
savedObjectsClient: SavedObjectsClientContract;
|
||||
|
@ -89,11 +104,35 @@ export class SearchSessionService
|
|||
}
|
||||
|
||||
public setup(core: CoreSetup<DataEnhancedStartDependencies>, deps: SetupDependencies) {
|
||||
registerSearchSessionsTask(core, {
|
||||
const taskDeps = {
|
||||
config: this.config,
|
||||
taskManager: deps.taskManager,
|
||||
logger: this.logger,
|
||||
});
|
||||
};
|
||||
|
||||
registerSearchSessionsTask(
|
||||
core,
|
||||
taskDeps,
|
||||
SEARCH_SESSIONS_TASK_TYPE,
|
||||
'persisted session progress',
|
||||
checkPersistedSessionsProgress
|
||||
);
|
||||
|
||||
registerSearchSessionsTask(
|
||||
core,
|
||||
taskDeps,
|
||||
SEARCH_SESSIONS_CLEANUP_TASK_TYPE,
|
||||
'non persisted session cleanup',
|
||||
checkNonPersistedSessions
|
||||
);
|
||||
|
||||
registerSearchSessionsTask(
|
||||
core,
|
||||
taskDeps,
|
||||
SEARCH_SESSIONS_EXPIRE_TASK_TYPE,
|
||||
'complete session expiration',
|
||||
checkPersistedCompletedSessionExpiration
|
||||
);
|
||||
}
|
||||
|
||||
public async start(core: CoreStart, deps: StartDependencies) {
|
||||
|
@ -103,14 +142,37 @@ export class SearchSessionService
|
|||
public stop() {}
|
||||
|
||||
private setupMonitoring = async (core: CoreStart, deps: StartDependencies) => {
|
||||
const taskDeps = {
|
||||
config: this.config,
|
||||
taskManager: deps.taskManager,
|
||||
logger: this.logger,
|
||||
};
|
||||
|
||||
if (this.sessionConfig.enabled) {
|
||||
scheduleSearchSessionsTasks(
|
||||
deps.taskManager,
|
||||
this.logger,
|
||||
scheduleSearchSessionsTask(
|
||||
taskDeps,
|
||||
SEARCH_SESSIONS_TASK_ID,
|
||||
SEARCH_SESSIONS_TASK_TYPE,
|
||||
this.sessionConfig.trackingInterval
|
||||
);
|
||||
|
||||
scheduleSearchSessionsTask(
|
||||
taskDeps,
|
||||
SEARCH_SESSIONS_CLEANUP_TASK_ID,
|
||||
SEARCH_SESSIONS_CLEANUP_TASK_TYPE,
|
||||
this.sessionConfig.cleanupInterval
|
||||
);
|
||||
|
||||
scheduleSearchSessionsTask(
|
||||
taskDeps,
|
||||
SEARCH_SESSIONS_EXPIRE_TASK_ID,
|
||||
SEARCH_SESSIONS_EXPIRE_TASK_TYPE,
|
||||
this.sessionConfig.expireInterval
|
||||
);
|
||||
} else {
|
||||
unscheduleSearchSessionsTask(deps.taskManager, this.logger);
|
||||
unscheduleSearchSessionsTask(taskDeps, SEARCH_SESSIONS_TASK_ID);
|
||||
unscheduleSearchSessionsTask(taskDeps, SEARCH_SESSIONS_CLEANUP_TASK_ID);
|
||||
unscheduleSearchSessionsTask(taskDeps, SEARCH_SESSIONS_EXPIRE_TASK_ID);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
121
x-pack/plugins/data_enhanced/server/search/session/setup_task.ts
Normal file
121
x-pack/plugins/data_enhanced/server/search/session/setup_task.ts
Normal file
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { Duration } from 'moment';
|
||||
import { filter, takeUntil } from 'rxjs/operators';
|
||||
import { BehaviorSubject } from 'rxjs';
|
||||
import { RunContext, TaskRunCreatorFunction } from '../../../../task_manager/server';
|
||||
import { CoreSetup, SavedObjectsClient } from '../../../../../../src/core/server';
|
||||
import { SEARCH_SESSION_TYPE } from '../../../../../../src/plugins/data/common';
|
||||
import { DataEnhancedStartDependencies } from '../../type';
|
||||
import {
|
||||
SearchSessionTaskSetupDeps,
|
||||
SearchSessionTaskStartDeps,
|
||||
SearchSessionTaskFn,
|
||||
} from './types';
|
||||
|
||||
export function searchSessionTaskRunner(
|
||||
core: CoreSetup<DataEnhancedStartDependencies>,
|
||||
deps: SearchSessionTaskSetupDeps,
|
||||
title: string,
|
||||
checkFn: SearchSessionTaskFn
|
||||
): TaskRunCreatorFunction {
|
||||
const { logger, config } = deps;
|
||||
return ({ taskInstance }: RunContext) => {
|
||||
const aborted$ = new BehaviorSubject<boolean>(false);
|
||||
return {
|
||||
async run() {
|
||||
try {
|
||||
const sessionConfig = config.search.sessions;
|
||||
const [coreStart] = await core.getStartServices();
|
||||
if (!sessionConfig.enabled) {
|
||||
logger.debug(`Search sessions are disabled. Skipping task ${title}.`);
|
||||
return;
|
||||
}
|
||||
if (aborted$.getValue()) return;
|
||||
|
||||
const internalRepo = coreStart.savedObjects.createInternalRepository([
|
||||
SEARCH_SESSION_TYPE,
|
||||
]);
|
||||
const internalSavedObjectsClient = new SavedObjectsClient(internalRepo);
|
||||
await checkFn(
|
||||
{
|
||||
logger,
|
||||
client: coreStart.elasticsearch.client.asInternalUser,
|
||||
savedObjectsClient: internalSavedObjectsClient,
|
||||
},
|
||||
sessionConfig
|
||||
)
|
||||
.pipe(takeUntil(aborted$.pipe(filter((aborted) => aborted))))
|
||||
.toPromise();
|
||||
|
||||
return {
|
||||
state: {},
|
||||
};
|
||||
} catch (e) {
|
||||
logger.error(`An error occurred. Skipping task ${title}.`);
|
||||
}
|
||||
},
|
||||
cancel: async () => {
|
||||
aborted$.next(true);
|
||||
},
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
export function registerSearchSessionsTask(
|
||||
core: CoreSetup<DataEnhancedStartDependencies>,
|
||||
deps: SearchSessionTaskSetupDeps,
|
||||
taskType: string,
|
||||
title: string,
|
||||
checkFn: SearchSessionTaskFn
|
||||
) {
|
||||
deps.taskManager.registerTaskDefinitions({
|
||||
[taskType]: {
|
||||
title,
|
||||
createTaskRunner: searchSessionTaskRunner(core, deps, title, checkFn),
|
||||
timeout: `${deps.config.search.sessions.monitoringTaskTimeout.asSeconds()}s`,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function unscheduleSearchSessionsTask(
|
||||
{ taskManager, logger }: SearchSessionTaskStartDeps,
|
||||
taskId: string
|
||||
) {
|
||||
try {
|
||||
await taskManager.removeIfExists(taskId);
|
||||
logger.debug(`${taskId} cleared`);
|
||||
} catch (e) {
|
||||
logger.error(`${taskId} Error clearing task ${e.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function scheduleSearchSessionsTask(
|
||||
{ taskManager, logger }: SearchSessionTaskStartDeps,
|
||||
taskId: string,
|
||||
taskType: string,
|
||||
interval: Duration
|
||||
) {
|
||||
await taskManager.removeIfExists(taskId);
|
||||
|
||||
try {
|
||||
await taskManager.ensureScheduled({
|
||||
id: taskId,
|
||||
taskType,
|
||||
schedule: {
|
||||
interval: `${interval.asSeconds()}s`,
|
||||
},
|
||||
state: {},
|
||||
params: {},
|
||||
});
|
||||
|
||||
logger.debug(`${taskId} scheduled to run`);
|
||||
} catch (e) {
|
||||
logger.error(`${taskId} Error scheduling task ${e.message}`);
|
||||
}
|
||||
}
|
|
@ -5,6 +5,18 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import {
|
||||
ElasticsearchClient,
|
||||
Logger,
|
||||
SavedObjectsClientContract,
|
||||
SavedObjectsFindResponse,
|
||||
} from 'kibana/server';
|
||||
import { Observable } from 'rxjs';
|
||||
import { KueryNode, SearchSessionSavedObjectAttributes } from 'src/plugins/data/common';
|
||||
import {
|
||||
TaskManagerSetupContract,
|
||||
TaskManagerStartContract,
|
||||
} from '../../../../../../x-pack/plugins/task_manager/server';
|
||||
import { ConfigSchema } from '../../../config';
|
||||
|
||||
export enum SearchStatus {
|
||||
|
@ -14,3 +26,38 @@ export enum SearchStatus {
|
|||
}
|
||||
|
||||
export type SearchSessionsConfig = ConfigSchema['search']['sessions'];
|
||||
|
||||
export interface CheckSearchSessionsDeps {
|
||||
savedObjectsClient: SavedObjectsClientContract;
|
||||
client: ElasticsearchClient;
|
||||
logger: Logger;
|
||||
}
|
||||
|
||||
export interface SearchSessionTaskSetupDeps {
|
||||
taskManager: TaskManagerSetupContract;
|
||||
logger: Logger;
|
||||
config: ConfigSchema;
|
||||
}
|
||||
|
||||
export interface SearchSessionTaskStartDeps {
|
||||
taskManager: TaskManagerStartContract;
|
||||
logger: Logger;
|
||||
config: ConfigSchema;
|
||||
}
|
||||
|
||||
export type SearchSessionTaskFn = (
|
||||
deps: CheckSearchSessionsDeps,
|
||||
config: SearchSessionsConfig
|
||||
) => Observable<void>;
|
||||
|
||||
export type SearchSessionsResponse = SavedObjectsFindResponse<
|
||||
SearchSessionSavedObjectAttributes,
|
||||
unknown
|
||||
>;
|
||||
|
||||
export type CheckSearchSessionsFn = (
|
||||
deps: CheckSearchSessionsDeps,
|
||||
config: SearchSessionsConfig,
|
||||
filter: KueryNode,
|
||||
page: number
|
||||
) => Observable<SearchSessionsResponse>;
|
||||
|
|
|
@ -0,0 +1,323 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { bulkUpdateSessions, updateSessionStatus } from './update_session_status';
|
||||
import {
|
||||
SearchSessionStatus,
|
||||
SearchSessionSavedObjectAttributes,
|
||||
} from '../../../../../../src/plugins/data/common';
|
||||
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
|
||||
import { SearchStatus } from './types';
|
||||
import moment from 'moment';
|
||||
import {
|
||||
SavedObjectsBulkUpdateObject,
|
||||
SavedObjectsClientContract,
|
||||
SavedObjectsFindResult,
|
||||
} from '../../../../../../src/core/server';
|
||||
|
||||
describe('bulkUpdateSessions', () => {
|
||||
let mockClient: any;
|
||||
let savedObjectsClient: jest.Mocked<SavedObjectsClientContract>;
|
||||
const mockLogger: any = {
|
||||
debug: jest.fn(),
|
||||
warn: jest.fn(),
|
||||
error: jest.fn(),
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
savedObjectsClient = savedObjectsClientMock.create();
|
||||
mockClient = {
|
||||
asyncSearch: {
|
||||
status: jest.fn(),
|
||||
delete: jest.fn(),
|
||||
},
|
||||
eql: {
|
||||
status: jest.fn(),
|
||||
delete: jest.fn(),
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
describe('updateSessionStatus', () => {
|
||||
test('updates expired session', async () => {
|
||||
const so: SavedObjectsFindResult<SearchSessionSavedObjectAttributes> = {
|
||||
id: '123',
|
||||
attributes: {
|
||||
persisted: false,
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
expires: moment().subtract(moment.duration(5, 'd')),
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
id: 'search-id',
|
||||
strategy: 'cool',
|
||||
status: SearchStatus.IN_PROGRESS,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as any;
|
||||
|
||||
const updated = await updateSessionStatus(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
so
|
||||
);
|
||||
|
||||
expect(updated).toBeTruthy();
|
||||
expect(so.attributes.status).toBe(SearchSessionStatus.EXPIRED);
|
||||
});
|
||||
|
||||
test('does nothing if the search is still running', async () => {
|
||||
const so = {
|
||||
id: '123',
|
||||
attributes: {
|
||||
persisted: false,
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
created: moment().subtract(moment.duration(3, 'm')),
|
||||
touched: moment().subtract(moment.duration(10, 's')),
|
||||
expires: moment().add(moment.duration(5, 'd')),
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
id: 'search-id',
|
||||
strategy: 'cool',
|
||||
status: SearchStatus.IN_PROGRESS,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as any;
|
||||
|
||||
mockClient.asyncSearch.status.mockResolvedValue({
|
||||
body: {
|
||||
is_partial: true,
|
||||
is_running: true,
|
||||
},
|
||||
});
|
||||
|
||||
const updated = await updateSessionStatus(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
so
|
||||
);
|
||||
|
||||
expect(updated).toBeFalsy();
|
||||
expect(so.attributes.status).toBe(SearchSessionStatus.IN_PROGRESS);
|
||||
});
|
||||
|
||||
test("doesn't re-check completed or errored searches", async () => {
|
||||
const so = {
|
||||
id: '123',
|
||||
attributes: {
|
||||
expires: moment().add(moment.duration(5, 'd')),
|
||||
status: SearchSessionStatus.ERROR,
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
id: 'search-id',
|
||||
strategy: 'cool',
|
||||
status: SearchStatus.COMPLETE,
|
||||
},
|
||||
'another-search-hash': {
|
||||
id: 'search-id',
|
||||
strategy: 'cool',
|
||||
status: SearchStatus.ERROR,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as any;
|
||||
|
||||
const updated = await updateSessionStatus(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
so
|
||||
);
|
||||
|
||||
expect(updated).toBeFalsy();
|
||||
expect(mockClient.asyncSearch.status).not.toBeCalled();
|
||||
});
|
||||
|
||||
test('updates to complete if the search is done', async () => {
|
||||
savedObjectsClient.bulkUpdate = jest.fn();
|
||||
const so = {
|
||||
attributes: {
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
touched: '123',
|
||||
expires: moment().add(moment.duration(5, 'd')),
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
id: 'search-id',
|
||||
strategy: 'cool',
|
||||
status: SearchStatus.IN_PROGRESS,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as any;
|
||||
mockClient.asyncSearch.status.mockResolvedValue({
|
||||
body: {
|
||||
is_partial: false,
|
||||
is_running: false,
|
||||
completion_status: 200,
|
||||
},
|
||||
});
|
||||
|
||||
const updated = await updateSessionStatus(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
so
|
||||
);
|
||||
|
||||
expect(updated).toBeTruthy();
|
||||
|
||||
expect(mockClient.asyncSearch.status).toBeCalledWith({ id: 'search-id' });
|
||||
expect(so.attributes.status).toBe(SearchSessionStatus.COMPLETE);
|
||||
expect(so.attributes.status).toBe(SearchSessionStatus.COMPLETE);
|
||||
expect(so.attributes.touched).not.toBe('123');
|
||||
expect(so.attributes.completed).not.toBeUndefined();
|
||||
expect(so.attributes.idMapping['search-hash'].status).toBe(SearchStatus.COMPLETE);
|
||||
expect(so.attributes.idMapping['search-hash'].error).toBeUndefined();
|
||||
});
|
||||
|
||||
test('updates to error if the search is errored', async () => {
|
||||
savedObjectsClient.bulkUpdate = jest.fn();
|
||||
const so = {
|
||||
attributes: {
|
||||
expires: moment().add(moment.duration(5, 'd')),
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
id: 'search-id',
|
||||
strategy: 'cool',
|
||||
status: SearchStatus.IN_PROGRESS,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as any;
|
||||
|
||||
mockClient.asyncSearch.status.mockResolvedValue({
|
||||
body: {
|
||||
is_partial: false,
|
||||
is_running: false,
|
||||
completion_status: 500,
|
||||
},
|
||||
});
|
||||
|
||||
const updated = await updateSessionStatus(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
so
|
||||
);
|
||||
|
||||
expect(updated).toBeTruthy();
|
||||
expect(so.attributes.status).toBe(SearchSessionStatus.ERROR);
|
||||
expect(so.attributes.touched).not.toBe('123');
|
||||
expect(so.attributes.idMapping['search-hash'].status).toBe(SearchStatus.ERROR);
|
||||
expect(so.attributes.idMapping['search-hash'].error).toBe(
|
||||
'Search completed with a 500 status'
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('bulkUpdateSessions', () => {
|
||||
test('does nothing if there are no open sessions', async () => {
|
||||
await bulkUpdateSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
[]
|
||||
);
|
||||
|
||||
expect(savedObjectsClient.bulkUpdate).not.toBeCalled();
|
||||
expect(savedObjectsClient.delete).not.toBeCalled();
|
||||
});
|
||||
|
||||
test('updates in space', async () => {
|
||||
const so = {
|
||||
namespaces: ['awesome'],
|
||||
attributes: {
|
||||
expires: moment().add(moment.duration(5, 'd')),
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
touched: '123',
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
id: 'search-id',
|
||||
strategy: 'cool',
|
||||
status: SearchStatus.IN_PROGRESS,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as any;
|
||||
|
||||
savedObjectsClient.bulkUpdate = jest.fn().mockResolvedValue({
|
||||
saved_objects: [so],
|
||||
});
|
||||
|
||||
await bulkUpdateSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
[so]
|
||||
);
|
||||
|
||||
const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0];
|
||||
const updatedAttributes = updateInput[0] as SavedObjectsBulkUpdateObject;
|
||||
expect(updatedAttributes.namespace).toBe('awesome');
|
||||
});
|
||||
|
||||
test('logs failures', async () => {
|
||||
const so = {
|
||||
namespaces: ['awesome'],
|
||||
attributes: {
|
||||
expires: moment().add(moment.duration(5, 'd')),
|
||||
status: SearchSessionStatus.IN_PROGRESS,
|
||||
touched: '123',
|
||||
idMapping: {
|
||||
'search-hash': {
|
||||
id: 'search-id',
|
||||
strategy: 'cool',
|
||||
status: SearchStatus.IN_PROGRESS,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as any;
|
||||
|
||||
savedObjectsClient.bulkUpdate = jest.fn().mockResolvedValue({
|
||||
saved_objects: [
|
||||
{
|
||||
error: 'nope',
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
await bulkUpdateSessions(
|
||||
{
|
||||
savedObjectsClient,
|
||||
client: mockClient,
|
||||
logger: mockLogger,
|
||||
},
|
||||
[so]
|
||||
);
|
||||
|
||||
expect(savedObjectsClient.bulkUpdate).toBeCalledTimes(1);
|
||||
expect(mockLogger.error).toBeCalledTimes(1);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,128 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { SavedObjectsFindResult, SavedObjectsUpdateResponse } from 'kibana/server';
|
||||
import {
|
||||
SearchSessionRequestInfo,
|
||||
SearchSessionSavedObjectAttributes,
|
||||
SearchSessionStatus,
|
||||
} from '../../../../../../src/plugins/data/common';
|
||||
import { getSearchStatus } from './get_search_status';
|
||||
import { getSessionStatus } from './get_session_status';
|
||||
import { CheckSearchSessionsDeps, SearchSessionsResponse, SearchStatus } from './types';
|
||||
import { isSearchSessionExpired } from './utils';
|
||||
|
||||
export async function updateSessionStatus(
|
||||
{ logger, client }: CheckSearchSessionsDeps,
|
||||
session: SavedObjectsFindResult<SearchSessionSavedObjectAttributes>
|
||||
) {
|
||||
let sessionUpdated = false;
|
||||
const isExpired = isSearchSessionExpired(session);
|
||||
|
||||
if (!isExpired) {
|
||||
// Check statuses of all running searches
|
||||
await Promise.all(
|
||||
Object.keys(session.attributes.idMapping).map(async (searchKey: string) => {
|
||||
const updateSearchRequest = (
|
||||
currentStatus: Pick<SearchSessionRequestInfo, 'status' | 'error'>
|
||||
) => {
|
||||
sessionUpdated = true;
|
||||
session.attributes.idMapping[searchKey] = {
|
||||
...session.attributes.idMapping[searchKey],
|
||||
...currentStatus,
|
||||
};
|
||||
};
|
||||
|
||||
const searchInfo = session.attributes.idMapping[searchKey];
|
||||
if (searchInfo.status === SearchStatus.IN_PROGRESS) {
|
||||
try {
|
||||
const currentStatus = await getSearchStatus(client, searchInfo.id);
|
||||
|
||||
if (currentStatus.status !== searchInfo.status) {
|
||||
logger.debug(`search ${searchInfo.id} | status changed to ${currentStatus.status}`);
|
||||
updateSearchRequest(currentStatus);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(e);
|
||||
updateSearchRequest({
|
||||
status: SearchStatus.ERROR,
|
||||
error: e.message || e.meta.error?.caused_by?.reason,
|
||||
});
|
||||
}
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
// And only then derive the session's status
|
||||
const sessionStatus = isExpired
|
||||
? SearchSessionStatus.EXPIRED
|
||||
: getSessionStatus(session.attributes);
|
||||
if (sessionStatus !== session.attributes.status) {
|
||||
const now = new Date().toISOString();
|
||||
session.attributes.status = sessionStatus;
|
||||
session.attributes.touched = now;
|
||||
if (sessionStatus === SearchSessionStatus.COMPLETE) {
|
||||
session.attributes.completed = now;
|
||||
} else if (session.attributes.completed) {
|
||||
session.attributes.completed = null;
|
||||
}
|
||||
sessionUpdated = true;
|
||||
}
|
||||
|
||||
return sessionUpdated;
|
||||
}
|
||||
|
||||
export async function getAllSessionsStatusUpdates(
|
||||
deps: CheckSearchSessionsDeps,
|
||||
searchSessions: SearchSessionsResponse
|
||||
) {
|
||||
const updatedSessions = new Array<SavedObjectsFindResult<SearchSessionSavedObjectAttributes>>();
|
||||
|
||||
await Promise.all(
|
||||
searchSessions.saved_objects.map(async (session) => {
|
||||
const updated = await updateSessionStatus(deps, session);
|
||||
|
||||
if (updated) {
|
||||
updatedSessions.push(session);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
return updatedSessions;
|
||||
}
|
||||
|
||||
export async function bulkUpdateSessions(
|
||||
{ logger, savedObjectsClient }: CheckSearchSessionsDeps,
|
||||
updatedSessions: Array<SavedObjectsFindResult<SearchSessionSavedObjectAttributes>>
|
||||
) {
|
||||
if (updatedSessions.length) {
|
||||
// If there's an error, we'll try again in the next iteration, so there's no need to check the output.
|
||||
const updatedResponse = await savedObjectsClient.bulkUpdate<SearchSessionSavedObjectAttributes>(
|
||||
updatedSessions.map((session) => ({
|
||||
...session,
|
||||
namespace: session.namespaces?.[0],
|
||||
}))
|
||||
);
|
||||
|
||||
const success: Array<SavedObjectsUpdateResponse<SearchSessionSavedObjectAttributes>> = [];
|
||||
const fail: Array<SavedObjectsUpdateResponse<SearchSessionSavedObjectAttributes>> = [];
|
||||
|
||||
updatedResponse.saved_objects.forEach((savedObjectResponse) => {
|
||||
if ('error' in savedObjectResponse) {
|
||||
fail.push(savedObjectResponse);
|
||||
logger.error(
|
||||
`Error while updating search session ${savedObjectResponse?.id}: ${savedObjectResponse.error?.message}`
|
||||
);
|
||||
} else {
|
||||
success.push(savedObjectResponse);
|
||||
}
|
||||
});
|
||||
|
||||
logger.debug(`Updating search sessions: success: ${success.length}, fail: ${fail.length}`);
|
||||
}
|
||||
}
|
|
@ -7,6 +7,9 @@
|
|||
|
||||
import { createHash } from 'crypto';
|
||||
import stringify from 'json-stable-stringify';
|
||||
import { SavedObjectsFindResult } from 'kibana/server';
|
||||
import moment from 'moment';
|
||||
import { SearchSessionSavedObjectAttributes } from 'src/plugins/data/common';
|
||||
|
||||
/**
|
||||
* Generate the hash for this request so that, in the future, this hash can be used to look up
|
||||
|
@ -17,3 +20,9 @@ export function createRequestHash(keys: Record<any, any>) {
|
|||
const { preference, ...params } = keys;
|
||||
return createHash(`sha256`).update(stringify(params)).digest('hex');
|
||||
}
|
||||
|
||||
export function isSearchSessionExpired(
|
||||
session: SavedObjectsFindResult<SearchSessionSavedObjectAttributes>
|
||||
) {
|
||||
return moment(session.attributes.expires).isBefore(moment());
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ export async function getApiIntegrationConfig({ readConfigFile }: FtrConfigProvi
|
|||
'--xpack.data_enhanced.search.sessions.enabled=true', // enable WIP send to background UI
|
||||
'--xpack.data_enhanced.search.sessions.notTouchedTimeout=15s', // shorten notTouchedTimeout for quicker testing
|
||||
'--xpack.data_enhanced.search.sessions.trackingInterval=5s', // shorten trackingInterval for quicker testing
|
||||
'--xpack.data_enhanced.search.sessions.cleanupInterval=5s', // shorten cleanupInterval for quicker testing
|
||||
],
|
||||
},
|
||||
esTestCluster: {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue