Add session cleanup audit logging (#122419)

* Add session cleanup audit logging

* Update snapshots

* Added suggestions from code review

* Clean up sessions in batches

* Added suggestions form code review
This commit is contained in:
Thom Heymann 2022-01-12 23:00:57 +00:00 committed by GitHub
parent a39bca4ba7
commit 39cef8bca9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 806 additions and 359 deletions

View file

@ -53,8 +53,11 @@ Refer to the corresponding {es} logs for potential write errors.
| `user_logout`
| `unknown` | User is logging out.
| `session_cleanup`
| `unknown` | Removing invalid or expired session.
| `access_agreement_acknowledged`
| N/A | User has acknowledged the access agreement.
| n/a | User has acknowledged the access agreement.
3+a|
===== Category: database

View file

@ -15,6 +15,7 @@ import {
httpRequestEvent,
SavedObjectAction,
savedObjectEvent,
sessionCleanupEvent,
SpaceAuditAction,
spaceAuditEvent,
userLoginEvent,
@ -352,6 +353,37 @@ describe('#userLogoutEvent', () => {
});
});
describe('#sessionCleanupEvent', () => {
test('creates event with `unknown` outcome', () => {
expect(
sessionCleanupEvent({
usernameHash: 'abcdef',
sessionId: 'sid',
provider: { name: 'basic1', type: 'basic' },
})
).toMatchInlineSnapshot(`
Object {
"event": Object {
"action": "session_cleanup",
"category": Array [
"authentication",
],
"outcome": "unknown",
},
"kibana": Object {
"authentication_provider": "basic1",
"authentication_type": "basic",
"session_id": "sid",
},
"message": "Removing invalid or expired session for user [hash=abcdef]",
"user": Object {
"hash": "abcdef",
},
}
`);
});
});
describe('#httpRequestEvent', () => {
test('creates event with `unknown` outcome', () => {
expect(

View file

@ -156,6 +156,35 @@ export function userLogoutEvent({ username, provider }: UserLogoutParams): Audit
};
}
export interface SessionCleanupParams {
sessionId: string;
usernameHash?: string;
provider: AuthenticationProvider;
}
export function sessionCleanupEvent({
usernameHash,
sessionId,
provider,
}: SessionCleanupParams): AuditEvent {
return {
message: `Removing invalid or expired session for user [hash=${usernameHash}]`,
event: {
action: 'session_cleanup',
category: ['authentication'],
outcome: 'unknown',
},
user: {
hash: usernameHash,
},
kibana: {
session_id: sessionId,
authentication_provider: provider.name,
authentication_type: provider.type,
},
};
}
export interface AccessAgreementAcknowledgedParams {
username: string;
provider: AuthenticationProvider;

View file

@ -67,6 +67,9 @@ describe('#setup', () => {
).toMatchInlineSnapshot(`
Object {
"asScoped": [Function],
"withoutRequest": Object {
"log": [Function],
},
}
`);
audit.stop();
@ -254,6 +257,82 @@ describe('#asScoped', () => {
});
});
describe('#withoutRequest', () => {
it('logs event without additional meta data', async () => {
const audit = new AuditService(logger);
const auditSetup = audit.setup({
license,
config,
logging,
http,
getCurrentUser,
getSpaceId,
getSID,
recordAuditLoggingUsage,
});
await auditSetup.withoutRequest.log({ message: 'MESSAGE', event: { action: 'ACTION' } });
expect(logger.info).toHaveBeenCalledWith('MESSAGE', {
event: { action: 'ACTION' },
});
audit.stop();
});
it('does not log to audit logger if event matches ignore filter', async () => {
const audit = new AuditService(logger);
const auditSetup = audit.setup({
license,
config: {
enabled: true,
appender: {
type: 'console',
layout: {
type: 'json',
},
},
ignore_filters: [{ actions: ['ACTION'] }],
},
logging,
http,
getCurrentUser,
getSpaceId,
getSID,
recordAuditLoggingUsage,
});
await auditSetup.withoutRequest.log({ message: 'MESSAGE', event: { action: 'ACTION' } });
expect(logger.info).not.toHaveBeenCalled();
audit.stop();
});
it('does not log to audit logger if no event was generated', async () => {
const audit = new AuditService(logger);
const auditSetup = audit.setup({
license,
config: {
enabled: true,
appender: {
type: 'console',
layout: {
type: 'json',
},
},
ignore_filters: [{ actions: ['ACTION'] }],
},
logging,
http,
getCurrentUser,
getSpaceId,
getSID,
recordAuditLoggingUsage,
});
await auditSetup.withoutRequest.log(undefined);
expect(logger.info).not.toHaveBeenCalled();
audit.stop();
});
});
describe('#createLoggingConfig', () => {
test('sets log level to `info` when audit logging is enabled and appender is defined', async () => {
const features$ = of({

View file

@ -26,11 +26,58 @@ export const ECS_VERSION = '1.6.0';
export const RECORD_USAGE_INTERVAL = 60 * 60 * 1000; // 1 hour
export interface AuditLogger {
/**
* Logs an {@link AuditEvent} and automatically adds meta data about the
* current user, space and correlation id.
*
* Guidelines around what events should be logged and how they should be
* structured can be found in: `/x-pack/plugins/security/README.md`
*
* @example
* ```typescript
* const auditLogger = securitySetup.audit.asScoped(request);
* auditLogger.log({
* message: 'User is updating dashboard [id=123]',
* event: {
* action: 'saved_object_update',
* outcome: 'unknown'
* },
* kibana: {
* saved_object: { type: 'dashboard', id: '123' }
* },
* });
* ```
*/
log: (event: AuditEvent | undefined) => void;
}
export interface AuditServiceSetup {
/**
* Creates an {@link AuditLogger} scoped to the current request.
*
* This audit logger logs events with all required user and session info and should be used for
* all user-initiated actions.
*
* @example
* ```typescript
* const auditLogger = securitySetup.audit.asScoped(request);
* auditLogger.log(event);
* ```
*/
asScoped: (request: KibanaRequest) => AuditLogger;
/**
* {@link AuditLogger} for background tasks only.
*
* This audit logger logs events without any user or session info and should never be used to log
* user-initiated actions.
*
* @example
* ```typescript
* securitySetup.audit.withoutRequest.log(event);
* ```
*/
withoutRequest: AuditLogger;
}
interface AuditServiceSetupParams {
@ -88,46 +135,25 @@ export class AuditService {
});
}
/**
* Creates an {@link AuditLogger} scoped to the current request.
*
* @example
* ```typescript
* const auditLogger = securitySetup.audit.asScoped(request);
* auditLogger.log(event);
* ```
*/
const asScoped = (request: KibanaRequest): AuditLogger => {
/**
* Logs an {@link AuditEvent} and automatically adds meta data about the
* current user, space and correlation id.
*
* Guidelines around what events should be logged and how they should be
* structured can be found in: `/x-pack/plugins/security/README.md`
*
* @example
* ```typescript
* const auditLogger = securitySetup.audit.asScoped(request);
* auditLogger.log({
* message: 'User is updating dashboard [id=123]',
* event: {
* action: 'saved_object_update',
* outcome: 'unknown'
* },
* kibana: {
* saved_object: { type: 'dashboard', id: '123' }
* },
* });
* ```
*/
const log: AuditLogger['log'] = async (event) => {
const log = (event: AuditEvent | undefined) => {
if (!event) {
return;
}
if (filterEvent(event, config.ignore_filters)) {
const { message, ...eventMeta } = event;
this.logger.info(message, eventMeta);
}
};
const asScoped = (request: KibanaRequest): AuditLogger => ({
log: async (event) => {
if (!event) {
return;
}
const spaceId = getSpaceId(request);
const user = getCurrentUser(request);
const sessionId = await getSID(request);
const meta: AuditEvent = {
log({
...event,
user:
(user && {
@ -141,14 +167,9 @@ export class AuditService {
...event.kibana,
},
trace: { id: request.id },
};
if (filterEvent(meta, config.ignore_filters)) {
const { message, ...eventMeta } = meta;
this.logger.info(message, eventMeta);
}
};
return { log };
};
});
},
});
http.registerOnPostAuth((request, response, t) => {
if (request.auth.isAuthenticated) {
@ -157,7 +178,10 @@ export class AuditService {
return t.next();
});
return { asScoped };
return {
asScoped,
withoutRequest: { log },
};
}
stop() {

View file

@ -14,6 +14,9 @@ export const auditServiceMock = {
asScoped: jest.fn().mockReturnValue({
log: jest.fn(),
}),
withoutRequest: {
log: jest.fn(),
},
} as jest.Mocked<ReturnType<AuditService['setup']>>;
},
};

View file

@ -11,6 +11,7 @@ export type { AuditEvent } from './audit_events';
export {
userLoginEvent,
userLogoutEvent,
sessionCleanupEvent,
accessAgreementAcknowledgedEvent,
httpRequestEvent,
savedObjectEvent,

View file

@ -67,6 +67,9 @@ describe('Security Plugin', () => {
Object {
"audit": Object {
"asScoped": [Function],
"withoutRequest": Object {
"log": [Function],
},
},
"authc": Object {
"getCurrentUser": [Function],

View file

@ -310,9 +310,7 @@ export class SecurityPlugin
});
return Object.freeze<SecurityPluginSetup>({
audit: {
asScoped: this.auditSetup.asScoped,
},
audit: this.auditSetup,
authc: { getCurrentUser: (request) => this.getAuthentication().getCurrentUser(request) },
authz: {
actions: this.authorizationSetup.actions,
@ -347,6 +345,7 @@ export class SecurityPlugin
const clusterClient = core.elasticsearch.client;
const { watchOnlineStatus$ } = this.elasticsearchService.start();
const { session } = this.sessionManagementService.start({
auditLogger: this.auditSetup!.withoutRequest,
elasticsearchClient: clusterClient.asInternalUser,
kibanaIndexName: this.getKibanaIndexName(),
online$: watchOnlineStatus$(),

View file

@ -6,11 +6,19 @@
*/
import { errors } from '@elastic/elasticsearch';
import type {
BulkResponse,
ClosePointInTimeResponse,
OpenPointInTimeResponse,
SearchResponse,
} from '@elastic/elasticsearch/lib/api/types';
import type { DeeplyMockedKeys } from '@kbn/utility-types/jest';
import type { ElasticsearchClient } from 'src/core/server';
import { elasticsearchServiceMock, loggingSystemMock } from 'src/core/server/mocks';
import type { AuditLogger } from '../audit';
import { auditServiceMock } from '../audit/index.mock';
import { ConfigSchema, createConfig } from '../config';
import { securityMock } from '../mocks';
import { getSessionIndexTemplate, SessionIndex } from './session_index';
@ -19,11 +27,13 @@ import { sessionIndexMock } from './session_index.mock';
describe('Session index', () => {
let mockElasticsearchClient: DeeplyMockedKeys<ElasticsearchClient>;
let sessionIndex: SessionIndex;
let auditLogger: AuditLogger;
const indexName = '.kibana_some_tenant_security_session_1';
const indexTemplateName = '.kibana_some_tenant_security_session_index_template_1';
beforeEach(() => {
mockElasticsearchClient = elasticsearchServiceMock.createElasticsearchClient();
const sessionIndexOptions = {
auditLogger = auditServiceMock.create().withoutRequest;
sessionIndex = new SessionIndex({
logger: loggingSystemMock.createLogger(),
kibanaIndexName: '.kibana_some_tenant',
config: createConfig(
@ -32,9 +42,8 @@ describe('Session index', () => {
{ isTLSEnabled: false }
),
elasticsearchClient: mockElasticsearchClient,
};
sessionIndex = new SessionIndex(sessionIndexOptions);
auditLogger,
});
});
describe('#initialize', () => {
@ -219,74 +228,130 @@ describe('Session index', () => {
describe('#cleanUp', () => {
const now = 123456;
const sessionValue = {
_id: 'SESSION_ID',
_source: { usernameHash: 'USERNAME_HASH', provider: { name: 'basic1', type: 'basic' } },
sort: [0],
};
beforeEach(() => {
mockElasticsearchClient.deleteByQuery.mockResolvedValue(
securityMock.createApiResponse({ body: {} as any })
mockElasticsearchClient.openPointInTime.mockResolvedValue(
securityMock.createApiResponse({
body: { id: 'PIT_ID' } as OpenPointInTimeResponse,
})
);
mockElasticsearchClient.closePointInTime.mockResolvedValue(
securityMock.createApiResponse({
body: { succeeded: true, num_freed: 1 } as ClosePointInTimeResponse,
})
);
mockElasticsearchClient.search.mockResolvedValue(
securityMock.createApiResponse({
body: {
hits: { hits: [sessionValue] },
} as SearchResponse,
})
);
mockElasticsearchClient.bulk.mockResolvedValue(
securityMock.createApiResponse({
body: { items: [{}] } as BulkResponse,
})
);
jest.spyOn(Date, 'now').mockImplementation(() => now);
});
it('throws if call to Elasticsearch fails', async () => {
it('throws if search call to Elasticsearch fails', async () => {
const failureReason = new errors.ResponseError(
securityMock.createApiResponse(securityMock.createApiResponse({ body: { type: 'Uh oh.' } }))
);
mockElasticsearchClient.deleteByQuery.mockRejectedValue(failureReason);
mockElasticsearchClient.search.mockRejectedValue(failureReason);
await expect(sessionIndex.cleanUp()).rejects.toBe(failureReason);
expect(mockElasticsearchClient.openPointInTime).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.bulk).not.toHaveBeenCalled();
expect(mockElasticsearchClient.closePointInTime).toHaveBeenCalledTimes(1);
});
it('throws if bulk delete call to Elasticsearch fails', async () => {
const failureReason = new errors.ResponseError(
securityMock.createApiResponse(securityMock.createApiResponse({ body: { type: 'Uh oh.' } }))
);
mockElasticsearchClient.bulk.mockRejectedValue(failureReason);
await expect(sessionIndex.cleanUp()).rejects.toBe(failureReason);
expect(mockElasticsearchClient.openPointInTime).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.bulk).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.closePointInTime).toHaveBeenCalledTimes(1);
});
it('when neither `lifespan` nor `idleTimeout` is configured', async () => {
await sessionIndex.cleanUp();
expect(mockElasticsearchClient.deleteByQuery).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.deleteByQuery).toHaveBeenCalledWith(
{
index: indexName,
refresh: true,
body: {
query: {
bool: {
should: [
// All expired sessions based on the lifespan, no matter which provider they belong to.
{ range: { lifespanExpiration: { lte: now } } },
// All sessions that belong to the providers that aren't configured.
{
expect(mockElasticsearchClient.openPointInTime).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledWith({
_source_includes: 'usernameHash,provider',
sort: '_shard_doc',
track_total_hits: false,
search_after: undefined,
size: 10_000,
pit: {
id: 'PIT_ID',
keep_alive: '5m',
},
query: {
bool: {
should: [
// All expired sessions based on the lifespan, no matter which provider they belong to.
{ range: { lifespanExpiration: { lte: now } } },
// All sessions that belong to the providers that aren't configured.
{
bool: {
must_not: {
bool: {
must_not: {
bool: {
should: [
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
},
},
],
minimum_should_match: 1,
should: [
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
},
},
},
},
},
// The sessions that belong to a particular provider that are expired based on the idle timeout.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
should: [{ range: { idleTimeoutExpiration: { lte: now } } }],
minimum_should_match: 1,
},
},
],
},
},
},
// The sessions that belong to a particular provider that are expired based on the idle timeout.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
should: [{ range: { idleTimeoutExpiration: { lte: now } } }],
minimum_should_match: 1,
},
},
],
},
},
{ ignore: [409, 404] }
});
expect(mockElasticsearchClient.bulk).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.bulk).toHaveBeenCalledWith(
{
index: indexName,
operations: [{ delete: { _id: sessionValue._id } }],
refresh: false,
},
{
ignore: [409, 404],
}
);
expect(mockElasticsearchClient.closePointInTime).toHaveBeenCalledTimes(1);
});
it('when only `lifespan` is configured', async () => {
@ -299,69 +364,85 @@ describe('Session index', () => {
{ isTLSEnabled: false }
),
elasticsearchClient: mockElasticsearchClient,
auditLogger: auditServiceMock.create().withoutRequest,
});
await sessionIndex.cleanUp();
expect(mockElasticsearchClient.deleteByQuery).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.deleteByQuery).toHaveBeenCalledWith(
{
index: indexName,
refresh: true,
body: {
query: {
bool: {
should: [
// All expired sessions based on the lifespan, no matter which provider they belong to.
{ range: { lifespanExpiration: { lte: now } } },
// All sessions that belong to the providers that aren't configured.
{
expect(mockElasticsearchClient.openPointInTime).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledWith({
_source_includes: 'usernameHash,provider',
sort: '_shard_doc',
track_total_hits: false,
search_after: undefined,
size: 10_000,
pit: {
id: 'PIT_ID',
keep_alive: '5m',
},
query: {
bool: {
should: [
// All expired sessions based on the lifespan, no matter which provider they belong to.
{ range: { lifespanExpiration: { lte: now } } },
// All sessions that belong to the providers that aren't configured.
{
bool: {
must_not: {
bool: {
must_not: {
bool: {
should: [
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
},
},
],
minimum_should_match: 1,
should: [
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
},
},
},
},
},
// The sessions that belong to a particular provider but don't have a configured lifespan.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
must_not: { exists: { field: 'lifespanExpiration' } },
},
},
// The sessions that belong to a particular provider that are expired based on the idle timeout.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
should: [{ range: { idleTimeoutExpiration: { lte: now } } }],
minimum_should_match: 1,
},
},
],
},
},
},
// The sessions that belong to a particular provider but don't have a configured lifespan.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
must_not: { exists: { field: 'lifespanExpiration' } },
},
},
// The sessions that belong to a particular provider that are expired based on the idle timeout.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
should: [{ range: { idleTimeoutExpiration: { lte: now } } }],
minimum_should_match: 1,
},
},
],
},
},
{ ignore: [409, 404] }
});
expect(mockElasticsearchClient.bulk).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.bulk).toHaveBeenCalledWith(
{
index: indexName,
operations: [{ delete: { _id: sessionValue._id } }],
refresh: false,
},
{
ignore: [409, 404],
}
);
expect(mockElasticsearchClient.closePointInTime).toHaveBeenCalledTimes(1);
});
it('when only `idleTimeout` is configured', async () => {
@ -375,63 +456,79 @@ describe('Session index', () => {
{ isTLSEnabled: false }
),
elasticsearchClient: mockElasticsearchClient,
auditLogger: auditServiceMock.create().withoutRequest,
});
await sessionIndex.cleanUp();
expect(mockElasticsearchClient.deleteByQuery).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.deleteByQuery).toHaveBeenCalledWith(
{
index: indexName,
refresh: true,
body: {
query: {
bool: {
should: [
// All expired sessions based on the lifespan, no matter which provider they belong to.
{ range: { lifespanExpiration: { lte: now } } },
// All sessions that belong to the providers that aren't configured.
{
expect(mockElasticsearchClient.openPointInTime).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledWith({
_source_includes: 'usernameHash,provider',
sort: '_shard_doc',
track_total_hits: false,
search_after: undefined,
size: 10_000,
pit: {
id: 'PIT_ID',
keep_alive: '5m',
},
query: {
bool: {
should: [
// All expired sessions based on the lifespan, no matter which provider they belong to.
{ range: { lifespanExpiration: { lte: now } } },
// All sessions that belong to the providers that aren't configured.
{
bool: {
must_not: {
bool: {
must_not: {
bool: {
should: [
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
},
},
],
minimum_should_match: 1,
},
},
},
},
// The sessions that belong to a particular provider that are either expired based on the idle timeout
// or don't have it configured at all.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
should: [
{ range: { idleTimeoutExpiration: { lte: now - 3 * idleTimeout } } },
{ bool: { must_not: { exists: { field: 'idleTimeoutExpiration' } } } },
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
},
},
],
minimum_should_match: 1,
},
},
],
},
},
},
// The sessions that belong to a particular provider that are either expired based on the idle timeout
// or don't have it configured at all.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
should: [
{ range: { idleTimeoutExpiration: { lte: now - 3 * idleTimeout } } },
{ bool: { must_not: { exists: { field: 'idleTimeoutExpiration' } } } },
],
minimum_should_match: 1,
},
},
],
},
},
{ ignore: [409, 404] }
});
expect(mockElasticsearchClient.bulk).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.bulk).toHaveBeenCalledWith(
{
index: indexName,
operations: [{ delete: { _id: sessionValue._id } }],
refresh: false,
},
{
ignore: [409, 404],
}
);
expect(mockElasticsearchClient.closePointInTime).toHaveBeenCalledTimes(1);
});
it('when both `lifespan` and `idleTimeout` are configured', async () => {
@ -445,73 +542,89 @@ describe('Session index', () => {
{ isTLSEnabled: false }
),
elasticsearchClient: mockElasticsearchClient,
auditLogger: auditServiceMock.create().withoutRequest,
});
await sessionIndex.cleanUp();
expect(mockElasticsearchClient.deleteByQuery).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.deleteByQuery).toHaveBeenCalledWith(
{
index: indexName,
refresh: true,
body: {
query: {
bool: {
should: [
// All expired sessions based on the lifespan, no matter which provider they belong to.
{ range: { lifespanExpiration: { lte: now } } },
// All sessions that belong to the providers that aren't configured.
{
expect(mockElasticsearchClient.openPointInTime).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledWith({
_source_includes: 'usernameHash,provider',
sort: '_shard_doc',
track_total_hits: false,
search_after: undefined,
size: 10_000,
pit: {
id: 'PIT_ID',
keep_alive: '5m',
},
query: {
bool: {
should: [
// All expired sessions based on the lifespan, no matter which provider they belong to.
{ range: { lifespanExpiration: { lte: now } } },
// All sessions that belong to the providers that aren't configured.
{
bool: {
must_not: {
bool: {
must_not: {
bool: {
should: [
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
},
},
],
minimum_should_match: 1,
},
},
},
},
// The sessions that belong to a particular provider but don't have a configured lifespan.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
must_not: { exists: { field: 'lifespanExpiration' } },
},
},
// The sessions that belong to a particular provider that are either expired based on the idle timeout
// or don't have it configured at all.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
should: [
{ range: { idleTimeoutExpiration: { lte: now - 3 * idleTimeout } } },
{ bool: { must_not: { exists: { field: 'idleTimeoutExpiration' } } } },
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
},
},
],
minimum_should_match: 1,
},
},
],
},
},
},
// The sessions that belong to a particular provider but don't have a configured lifespan.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
must_not: { exists: { field: 'lifespanExpiration' } },
},
},
// The sessions that belong to a particular provider that are either expired based on the idle timeout
// or don't have it configured at all.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic' } },
],
should: [
{ range: { idleTimeoutExpiration: { lte: now - 3 * idleTimeout } } },
{ bool: { must_not: { exists: { field: 'idleTimeoutExpiration' } } } },
],
minimum_should_match: 1,
},
},
],
},
},
{ ignore: [409, 404] }
});
expect(mockElasticsearchClient.bulk).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.bulk).toHaveBeenCalledWith(
{
index: indexName,
operations: [{ delete: { _id: sessionValue._id } }],
refresh: false,
},
{
ignore: [409, 404],
}
);
expect(mockElasticsearchClient.closePointInTime).toHaveBeenCalledTimes(1);
});
it('when both `lifespan` and `idleTimeout` are configured and multiple providers are enabled', async () => {
@ -540,105 +653,167 @@ describe('Session index', () => {
{ isTLSEnabled: false }
),
elasticsearchClient: mockElasticsearchClient,
auditLogger: auditServiceMock.create().withoutRequest,
});
await sessionIndex.cleanUp();
expect(mockElasticsearchClient.deleteByQuery).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.deleteByQuery).toHaveBeenCalledWith(
{
index: indexName,
refresh: true,
body: {
query: {
bool: {
should: [
// All expired sessions based on the lifespan, no matter which provider they belong to.
{ range: { lifespanExpiration: { lte: now } } },
// All sessions that belong to the providers that aren't configured.
{
expect(mockElasticsearchClient.openPointInTime).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledWith({
_source_includes: 'usernameHash,provider',
sort: '_shard_doc',
track_total_hits: false,
search_after: undefined,
size: 10_000,
pit: {
id: 'PIT_ID',
keep_alive: '5m',
},
query: {
bool: {
should: [
// All expired sessions based on the lifespan, no matter which provider they belong to.
{ range: { lifespanExpiration: { lte: now } } },
// All sessions that belong to the providers that aren't configured.
{
bool: {
must_not: {
bool: {
must_not: {
bool: {
should: [
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic1' } },
],
},
},
{
bool: {
must: [
{ term: { 'provider.type': 'saml' } },
{ term: { 'provider.name': 'saml1' } },
],
},
},
],
minimum_should_match: 1,
should: [
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic1' } },
],
},
},
{
bool: {
must: [
{ term: { 'provider.type': 'saml' } },
{ term: { 'provider.name': 'saml1' } },
],
},
},
},
},
},
// The sessions that belong to a Basic provider but don't have a configured lifespan.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic1' } },
],
must_not: { exists: { field: 'lifespanExpiration' } },
},
},
// The sessions that belong to a Basic provider that are either expired based on the idle timeout
// or don't have it configured at all.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic1' } },
],
should: [
{ range: { idleTimeoutExpiration: { lte: now - 3 * globalIdleTimeout } } },
{ bool: { must_not: { exists: { field: 'idleTimeoutExpiration' } } } },
],
minimum_should_match: 1,
},
},
// The sessions that belong to a SAML provider but don't have a configured lifespan.
{
bool: {
must: [
{ term: { 'provider.type': 'saml' } },
{ term: { 'provider.name': 'saml1' } },
],
must_not: { exists: { field: 'lifespanExpiration' } },
},
},
// The sessions that belong to a SAML provider that are either expired based on the idle timeout
// or don't have it configured at all.
{
bool: {
must: [
{ term: { 'provider.type': 'saml' } },
{ term: { 'provider.name': 'saml1' } },
],
should: [
{ range: { idleTimeoutExpiration: { lte: now - 3 * samlIdleTimeout } } },
{ bool: { must_not: { exists: { field: 'idleTimeoutExpiration' } } } },
],
minimum_should_match: 1,
},
},
],
},
},
},
// The sessions that belong to a Basic provider but don't have a configured lifespan.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic1' } },
],
must_not: { exists: { field: 'lifespanExpiration' } },
},
},
// The sessions that belong to a Basic provider that are either expired based on the idle timeout
// or don't have it configured at all.
{
bool: {
must: [
{ term: { 'provider.type': 'basic' } },
{ term: { 'provider.name': 'basic1' } },
],
should: [
{ range: { idleTimeoutExpiration: { lte: now - 3 * globalIdleTimeout } } },
{ bool: { must_not: { exists: { field: 'idleTimeoutExpiration' } } } },
],
minimum_should_match: 1,
},
},
// The sessions that belong to a SAML provider but don't have a configured lifespan.
{
bool: {
must: [
{ term: { 'provider.type': 'saml' } },
{ term: { 'provider.name': 'saml1' } },
],
must_not: { exists: { field: 'lifespanExpiration' } },
},
},
// The sessions that belong to a SAML provider that are either expired based on the idle timeout
// or don't have it configured at all.
{
bool: {
must: [
{ term: { 'provider.type': 'saml' } },
{ term: { 'provider.name': 'saml1' } },
],
should: [
{ range: { idleTimeoutExpiration: { lte: now - 3 * samlIdleTimeout } } },
{ bool: { must_not: { exists: { field: 'idleTimeoutExpiration' } } } },
],
minimum_should_match: 1,
},
},
],
},
},
{ ignore: [409, 404] }
});
expect(mockElasticsearchClient.bulk).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.bulk).toHaveBeenCalledWith(
{
index: indexName,
operations: [{ delete: { _id: sessionValue._id } }],
refresh: false,
},
{
ignore: [409, 404],
}
);
expect(mockElasticsearchClient.closePointInTime).toHaveBeenCalledTimes(1);
});
it('should clean up sessions in batches of 10,000', async () => {
for (const count of [10_000, 1]) {
mockElasticsearchClient.search.mockResolvedValueOnce(
securityMock.createApiResponse({
body: {
hits: { hits: new Array(count).fill(sessionValue, 0) },
} as SearchResponse,
})
);
}
await sessionIndex.cleanUp();
expect(mockElasticsearchClient.openPointInTime).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledTimes(2);
expect(mockElasticsearchClient.bulk).toHaveBeenCalledTimes(2);
expect(mockElasticsearchClient.closePointInTime).toHaveBeenCalledTimes(1);
});
it('should limit number of batches to 10', async () => {
mockElasticsearchClient.search.mockResolvedValue(
securityMock.createApiResponse({
body: {
hits: { hits: new Array(10_000).fill(sessionValue, 0) },
} as SearchResponse,
})
);
await sessionIndex.cleanUp();
expect(mockElasticsearchClient.openPointInTime).toHaveBeenCalledTimes(1);
expect(mockElasticsearchClient.search).toHaveBeenCalledTimes(10);
expect(mockElasticsearchClient.bulk).toHaveBeenCalledTimes(10);
expect(mockElasticsearchClient.closePointInTime).toHaveBeenCalledTimes(1);
});
it('should log audit event', async () => {
await sessionIndex.cleanUp();
expect(auditLogger.log).toHaveBeenCalledWith(
expect.objectContaining({
event: { action: 'session_cleanup', category: ['authentication'], outcome: 'unknown' },
})
);
});
});

View file

@ -5,9 +5,16 @@
* 2.0.
*/
import type {
BulkOperationContainer,
SortResults,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient, Logger } from 'src/core/server';
import type { AuthenticationProvider } from '../../common/model';
import type { AuditLogger } from '../audit';
import { sessionCleanupEvent } from '../audit';
import type { ConfigType } from '../config';
export interface SessionIndexOptions {
@ -15,6 +22,7 @@ export interface SessionIndexOptions {
readonly kibanaIndexName: string;
readonly config: Pick<ConfigType, 'session' | 'authc'>;
readonly logger: Logger;
readonly auditLogger: AuditLogger;
}
/**
@ -34,6 +42,22 @@ export type InvalidateSessionsFilter =
*/
const SESSION_INDEX_TEMPLATE_VERSION = 1;
/**
* Number of sessions to remove per batch during cleanup.
*/
const SESSION_INDEX_CLEANUP_BATCH_SIZE = 10_000;
/**
* Maximum number of batches per cleanup.
* If the batch size is 10,000 and this limit is 10, then Kibana will remove up to 100k sessions per cleanup.
*/
const SESSION_INDEX_CLEANUP_BATCH_LIMIT = 10;
/**
* How long the session cleanup search point-in-time should be kept alive.
*/
const SESSION_INDEX_CLEANUP_KEEP_ALIVE = '5m';
/**
* Returns index template that is used for the current version of the session index.
*/
@ -425,6 +449,56 @@ export class SessionIndex {
async cleanUp() {
this.options.logger.debug(`Running cleanup routine.`);
try {
for await (const sessionValues of this.getSessionValuesInBatches()) {
const operations: Array<Required<Pick<BulkOperationContainer, 'delete'>>> = [];
sessionValues.forEach(({ _id, _source }) => {
const { usernameHash, provider } = _source!;
this.options.auditLogger.log(
sessionCleanupEvent({ sessionId: _id, usernameHash, provider })
);
operations.push({ delete: { _id } });
});
if (operations.length > 0) {
const { body: bulkResponse } = await this.options.elasticsearchClient.bulk(
{
index: this.indexName,
operations,
refresh: false,
},
{ ignore: [409, 404] }
);
if (bulkResponse.errors) {
const errorCount = bulkResponse.items.reduce(
(count, item) => (item.delete!.error ? count + 1 : count),
0
);
if (errorCount < bulkResponse.items.length) {
this.options.logger.warn(
`Failed to clean up ${errorCount} of ${bulkResponse.items.length} invalid or expired sessions. The remaining sessions were cleaned up successfully.`
);
} else {
this.options.logger.error(
`Failed to clean up ${bulkResponse.items.length} invalid or expired sessions.`
);
}
} else {
this.options.logger.debug(
`Cleaned up ${bulkResponse.items.length} invalid or expired sessions.`
);
}
}
}
} catch (err) {
this.options.logger.error(`Failed to clean up sessions: ${err.message}`);
throw err;
}
}
/**
* Fetches session values from session index in batches of 10,000.
*/
private async *getSessionValuesInBatches() {
const now = Date.now();
const providersSessionConfig = this.options.config.authc.sortedProviders.map((provider) => {
return {
@ -484,24 +558,37 @@ export class SessionIndex {
});
}
try {
const { body: response } = await this.options.elasticsearchClient.deleteByQuery(
{
index: this.indexName,
refresh: true,
body: { query: { bool: { should: deleteQueries } } },
},
{ ignore: [409, 404] }
);
const { body: openPitResponse } = await this.options.elasticsearchClient.openPointInTime({
index: this.indexName,
keep_alive: SESSION_INDEX_CLEANUP_KEEP_ALIVE,
});
if (response.deleted! > 0) {
this.options.logger.debug(
`Cleaned up ${response.deleted} invalid or expired session values.`
);
try {
let searchAfter: SortResults | undefined;
for (let i = 0; i < SESSION_INDEX_CLEANUP_BATCH_LIMIT; i++) {
const { body: searchResponse } =
await this.options.elasticsearchClient.search<SessionIndexValue>({
pit: { id: openPitResponse.id, keep_alive: SESSION_INDEX_CLEANUP_KEEP_ALIVE },
_source_includes: 'usernameHash,provider',
query: { bool: { should: deleteQueries } },
search_after: searchAfter,
size: SESSION_INDEX_CLEANUP_BATCH_SIZE,
sort: '_shard_doc',
track_total_hits: false, // for performance
});
const { hits } = searchResponse.hits;
if (hits.length > 0) {
yield hits;
searchAfter = hits[hits.length - 1].sort;
}
if (hits.length < SESSION_INDEX_CLEANUP_BATCH_SIZE) {
break;
}
}
} catch (err) {
this.options.logger.error(`Failed to clean up sessions: ${err.message}`);
throw err;
} finally {
await this.options.elasticsearchClient.closePointInTime({
id: openPitResponse.id,
});
}
}
}

View file

@ -15,6 +15,8 @@ import type {
TaskRunCreatorFunction,
} from '../../../task_manager/server';
import { taskManagerMock } from '../../../task_manager/server/mocks';
import type { AuditLogger } from '../audit';
import { auditServiceMock } from '../audit/index.mock';
import { ConfigSchema, createConfig } from '../config';
import type { OnlineStatusRetryScheduler } from '../elasticsearch';
import { Session } from './session';
@ -24,10 +26,23 @@ import {
SessionManagementService,
} from './session_management_service';
const mockSessionIndexInitialize = jest.spyOn(SessionIndex.prototype, 'initialize');
mockSessionIndexInitialize.mockResolvedValue();
const mockSessionIndexCleanUp = jest.spyOn(SessionIndex.prototype, 'cleanUp');
mockSessionIndexCleanUp.mockResolvedValue();
describe('SessionManagementService', () => {
let service: SessionManagementService;
let auditLogger: AuditLogger;
beforeEach(() => {
service = new SessionManagementService(loggingSystemMock.createLogger());
auditLogger = auditServiceMock.create().withoutRequest;
});
afterEach(() => {
mockSessionIndexInitialize.mockReset();
mockSessionIndexCleanUp.mockReset();
});
describe('setup()', () => {
@ -56,12 +71,9 @@ describe('SessionManagementService', () => {
});
describe('start()', () => {
let mockSessionIndexInitialize: jest.SpyInstance;
let mockTaskManager: jest.Mocked<TaskManagerStartContract>;
let sessionCleanupTaskRunCreator: TaskRunCreatorFunction;
beforeEach(() => {
mockSessionIndexInitialize = jest.spyOn(SessionIndex.prototype, 'initialize');
mockTaskManager = taskManagerMock.createStart();
mockTaskManager.ensureScheduled.mockResolvedValue(undefined as any);
@ -84,14 +96,11 @@ describe('SessionManagementService', () => {
sessionCleanupTaskRunCreator = createTaskRunner;
});
afterEach(() => {
mockSessionIndexInitialize.mockReset();
});
it('exposes proper contract', () => {
const mockStatusSubject = new Subject<OnlineStatusRetryScheduler>();
expect(
service.start({
auditLogger,
elasticsearchClient: elasticsearchServiceMock.createElasticsearchClient(),
kibanaIndexName: '.kibana',
online$: mockStatusSubject.asObservable(),
@ -100,10 +109,10 @@ describe('SessionManagementService', () => {
).toEqual({ session: expect.any(Session) });
});
it('registers proper session index cleanup task runner', () => {
const mockSessionIndexCleanUp = jest.spyOn(SessionIndex.prototype, 'cleanUp');
it('registers proper session index cleanup task runner', async () => {
const mockStatusSubject = new Subject<OnlineStatusRetryScheduler>();
service.start({
auditLogger,
elasticsearchClient: elasticsearchServiceMock.createElasticsearchClient(),
kibanaIndexName: '.kibana',
online$: mockStatusSubject.asObservable(),
@ -113,16 +122,17 @@ describe('SessionManagementService', () => {
expect(mockSessionIndexCleanUp).not.toHaveBeenCalled();
const runner = sessionCleanupTaskRunCreator({} as any);
runner.run();
await runner.run();
expect(mockSessionIndexCleanUp).toHaveBeenCalledTimes(1);
runner.run();
await runner.run();
expect(mockSessionIndexCleanUp).toHaveBeenCalledTimes(2);
});
it('initializes session index and schedules session index cleanup task when Elasticsearch goes online', async () => {
const mockStatusSubject = new Subject<OnlineStatusRetryScheduler>();
service.start({
auditLogger,
elasticsearchClient: elasticsearchServiceMock.createElasticsearchClient(),
kibanaIndexName: '.kibana',
online$: mockStatusSubject.asObservable(),
@ -160,6 +170,7 @@ describe('SessionManagementService', () => {
it('removes old cleanup task if cleanup interval changes', async () => {
const mockStatusSubject = new Subject<OnlineStatusRetryScheduler>();
service.start({
auditLogger,
elasticsearchClient: elasticsearchServiceMock.createElasticsearchClient(),
kibanaIndexName: '.kibana',
online$: mockStatusSubject.asObservable(),
@ -195,6 +206,7 @@ describe('SessionManagementService', () => {
it('does not remove old cleanup task if cleanup interval does not change', async () => {
const mockStatusSubject = new Subject<OnlineStatusRetryScheduler>();
service.start({
auditLogger,
elasticsearchClient: elasticsearchServiceMock.createElasticsearchClient(),
kibanaIndexName: '.kibana',
online$: mockStatusSubject.asObservable(),
@ -221,6 +233,7 @@ describe('SessionManagementService', () => {
it('schedules retry if index initialization fails', async () => {
const mockStatusSubject = new Subject<OnlineStatusRetryScheduler>();
service.start({
auditLogger,
elasticsearchClient: elasticsearchServiceMock.createElasticsearchClient(),
kibanaIndexName: '.kibana',
online$: mockStatusSubject.asObservable(),
@ -257,6 +270,7 @@ describe('SessionManagementService', () => {
it('schedules retry if cleanup task registration fails', async () => {
const mockStatusSubject = new Subject<OnlineStatusRetryScheduler>();
service.start({
auditLogger,
elasticsearchClient: elasticsearchServiceMock.createElasticsearchClient(),
kibanaIndexName: '.kibana',
online$: mockStatusSubject.asObservable(),
@ -291,11 +305,8 @@ describe('SessionManagementService', () => {
});
describe('stop()', () => {
let mockSessionIndexInitialize: jest.SpyInstance;
let mockTaskManager: jest.Mocked<TaskManagerStartContract>;
beforeEach(() => {
mockSessionIndexInitialize = jest.spyOn(SessionIndex.prototype, 'initialize');
mockTaskManager = taskManagerMock.createStart();
mockTaskManager.ensureScheduled.mockResolvedValue(undefined as any);
@ -309,13 +320,10 @@ describe('SessionManagementService', () => {
});
});
afterEach(() => {
mockSessionIndexInitialize.mockReset();
});
it('properly unsubscribes from status updates', () => {
const mockStatusSubject = new Subject<OnlineStatusRetryScheduler>();
service.start({
auditLogger,
elasticsearchClient: elasticsearchServiceMock.createElasticsearchClient(),
kibanaIndexName: '.kibana',
online$: mockStatusSubject.asObservable(),

View file

@ -14,6 +14,7 @@ import type {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../task_manager/server';
import type { AuditLogger } from '../audit';
import type { ConfigType } from '../config';
import type { OnlineStatusRetryScheduler } from '../elasticsearch';
import { Session } from './session';
@ -31,6 +32,7 @@ export interface SessionManagementServiceStartParams {
readonly kibanaIndexName: string;
readonly online$: Observable<OnlineStatusRetryScheduler>;
readonly taskManager: TaskManagerStartContract;
readonly auditLogger: AuditLogger;
}
export interface SessionManagementServiceStart {
@ -78,12 +80,14 @@ export class SessionManagementService {
kibanaIndexName,
online$,
taskManager,
auditLogger,
}: SessionManagementServiceStartParams): SessionManagementServiceStart {
this.sessionIndex = new SessionIndex({
config: this.config,
elasticsearchClient,
kibanaIndexName,
logger: this.logger.get('index'),
auditLogger,
});
this.statusSubscription = online$.subscribe(async ({ scheduleRetry }) => {