[SIEM] [Detection Engine] Replacing of Reindex in Signals API with Scroll and Bulk Indexing (#47386)

* working commit with no re-index.. very messy working draft

* Add flag for using GET/PUT instead of reindex, remove painless script from GET/PUT, clean up code

* unset alerting and actions features since feature flags are not present

* fix leftover merge conflicts from rebase

* Defaults to scroll and bulk index combination instead of reindex, updates docs, updates erroneous log.info to log.error

* Refactors bulkIndex method to remove useage of let and increase readability

* Replaces env variable, remove scroll and bulk index logic from signals_alert_type.ts and updates logger.info -> logger.error in catch clause

* add ternary for scroll lock parameter

* minor cleanup from merge with master

* removes class definition and exports separate functions for scroll and bulk functionality, replaces logger.info with a logger.error, adds another signal for testing zero documents upon initial search query
This commit is contained in:
Devin W. Hurley 2019-10-14 13:26:55 -04:00 committed by GitHub
parent a7d95e9582
commit 4a7f36d6cf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 279 additions and 75 deletions

View file

@ -24,6 +24,13 @@ export KBN_URLBASE=http://${user}:${password}@localhost:5601
source your .zhsrc/.bashrc or open a new terminal to ensure you get the new values set.
Optional env var when set to true will utilize `reindex` api for reindexing
instead of the scroll and bulk index combination.
```
export USE_REINDEX_API=true
```
Add these lines to your `kibana.dev.yml` to turn on the feature toggles of alerting and actions:
```
# Feature flag to turn on alerting

View file

@ -4,87 +4,95 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { defaultIndexPattern } from '../../../../default_index_pattern';
import { fromKueryExpression, toElasticsearchQuery } from '@kbn/es-query';
// TODO: See build_events_reindex.ts for all the spots to make things "configurable"
// here but this is intended to replace the build_events_reindex.ts
export const buildEventsQuery = () => {
interface BuildEventsScrollQuery {
index: string[];
from: number;
to: number;
kql: string | undefined;
filter: Record<string, {}> | undefined;
size: number;
scroll: string;
}
export const getFilter = (kql: string | undefined, filter: Record<string, {}> | undefined) => {
if (kql != null) {
return toElasticsearchQuery(fromKueryExpression(kql), null);
} else if (filter != null) {
return filter;
} else {
// TODO: Re-visit this error (which should never happen) when we do signal errors for the UI
throw new TypeError('either kql or filter should be set');
}
};
export const buildEventsScrollQuery = ({
index,
from,
to,
kql,
filter,
size,
scroll,
}: BuildEventsScrollQuery) => {
const kqlOrFilter = getFilter(kql, filter);
const filterWithTime = [
kqlOrFilter,
{
bool: {
filter: [
{
bool: {
should: [
{
range: {
'@timestamp': {
gte: from,
},
},
},
],
minimum_should_match: 1,
},
},
{
bool: {
should: [
{
range: {
'@timestamp': {
lte: to,
},
},
},
],
minimum_should_match: 1,
},
},
],
},
},
];
return {
allowNoIndices: true,
index: defaultIndexPattern,
index,
scroll,
size,
ignoreUnavailable: true,
body: {
query: {
bool: {
filter: [
{
bool: {
filter: [
{
bool: {
should: [
{
match_phrase: {
'user.name': 'root',
},
},
],
minimum_should_match: 1,
},
},
{
bool: {
filter: [
{
bool: {
should: [
{
range: {
'@timestamp': {
gte: 1567317600000,
},
},
},
],
minimum_should_match: 1,
},
},
{
bool: {
should: [
{
range: {
'@timestamp': {
lte: 1569909599999,
},
},
},
],
minimum_should_match: 1,
},
},
],
},
},
],
},
},
...filterWithTime,
{
match_all: {},
},
],
},
},
size: 26,
track_total_hits: true,
sort: [
{
'@timestamp': 'desc',
},
{
_doc: 'desc',
},
],
sort: ['_doc'],
},
};
};

View file

@ -14,7 +14,10 @@ import { buildEventsReIndex } from './build_events_reindex';
// TODO: Comment this in and use this instead of the reIndex API
// once scrolling and other things are done with it.
// import { buildEventsQuery } from './build_events_query';
import { buildEventsScrollQuery } from './build_events_query';
// bulk scroll class
import { scrollAndBulkIndex } from './utils';
export const signalsAlertType = ({ logger }: { logger: Logger }): AlertType => {
return {
@ -35,16 +38,14 @@ export const signalsAlertType = ({ logger }: { logger: Logger }): AlertType => {
to: schema.string(),
type: schema.string(),
references: schema.arrayOf(schema.string(), { defaultValue: [] }),
scrollSize: schema.maybe(schema.number()),
scrollLock: schema.maybe(schema.string()),
}),
},
// TODO: Type the params as it is all filled with any
async executor({ services, params, state }: AlertExecutorOptions) {
const instance = services.alertInstanceFactory('siem-signals');
// TODO: Comment this in eventually and use the buildEventsQuery()
// for scrolling and other fun stuff instead of using the buildEventsReIndex()
// const query = buildEventsQuery();
const {
description,
filter,
@ -58,7 +59,24 @@ export const signalsAlertType = ({ logger }: { logger: Logger }): AlertType => {
severity,
to,
type,
scrollSize,
scrollLock,
} = params;
const scroll = scrollLock ? scrollLock : '1m';
const size = scrollSize ? scrollSize : 400;
// TODO: Turn these options being sent in into a template for the alert type
const noReIndex = buildEventsScrollQuery({
index,
from,
to,
kql,
filter,
size,
scroll,
});
const reIndex = buildEventsReIndex({
index,
from,
@ -84,11 +102,31 @@ export const signalsAlertType = ({ logger }: { logger: Logger }): AlertType => {
// TODO: Comment this in eventually and use this for manual insertion of the
// signals instead of the ReIndex() api
// const result = await services.callCluster('search', query);
const result = await services.callCluster('reindex', reIndex);
// TODO: Error handling here and writing of any errors that come back from ES by
logger.info(`Result of reindex: ${JSON.stringify(result, null, 2)}`);
if (process.env.USE_REINDEX_API === 'true') {
const result = await services.callCluster('reindex', reIndex);
// TODO: Error handling here and writing of any errors that come back from ES by
logger.info(`Result of reindex: ${JSON.stringify(result, null, 2)}`);
} else {
logger.info(`[+] Initial search call`);
const noReIndexResult = await services.callCluster('search', noReIndex);
logger.info(`Total docs to reindex: ${noReIndexResult.hits.total.value}`);
const bulkIndexResult = await scrollAndBulkIndex(
noReIndexResult,
params,
services,
logger
);
if (bulkIndexResult) {
logger.info('Finished SIEM signal job');
} else {
logger.error('Error processing SIEM signal job');
}
}
} catch (err) {
// TODO: Error handling and writing of errors into a signal that has error
// handling/conditions

View file

@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { SearchResponse, SearchHit, SignalHit } from '../../types';
import { Logger } from '../../../../../../../../src/core/server';
import { AlertServices } from '../../../../../alerting/server/types';
// format scroll search result for signals index.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const buildBulkBody = (doc: SearchHit, signalParams: Record<string, any>): SignalHit => {
const indexPatterns = signalParams.index.map((element: string) => `"${element}"`).join(',');
const refs = signalParams.references.map((element: string) => `"${element}"`).join(',');
return {
...doc._source,
signal: {
rule_revision: 1,
rule_id: signalParams.id,
rule_type: signalParams.type,
parent: {
id: doc._id,
type: 'event',
depth: 1,
},
name: signalParams.name,
severity: signalParams.severity,
description: signalParams.description,
time_detected: Date.now(),
index_patterns: indexPatterns,
references: refs,
},
};
};
// Bulk Index documents.
export const singleBulkIndex = async (
sr: SearchResponse<object>,
params: Record<string, any>, // eslint-disable-line @typescript-eslint/no-explicit-any
service: AlertServices,
logger: Logger
): Promise<boolean> => {
if (sr.hits.hits.length === 0) {
logger.warn('First search result yielded 0 documents');
return false;
}
const bulkBody = sr.hits.hits.flatMap((doc: SearchHit) => [
{
index: {
_index: process.env.SIGNALS_INDEX || '.siem-signals-10-01-2019',
_id: doc._id,
},
},
buildBulkBody(doc, params),
]);
const firstResult = await service.callCluster('bulk', {
refresh: true,
body: bulkBody,
});
if (firstResult.errors) {
logger.error(`[-] bulkResponse had errors: ${JSON.stringify(firstResult.errors, null, 2)}}`);
return false;
}
return true;
};
// Given a scroll id, grab the next set of documents
export const singleScroll = async (
scrollId: string | undefined,
params: Record<string, any>, // eslint-disable-line @typescript-eslint/no-explicit-any
service: AlertServices,
logger: Logger
): Promise<SearchResponse<object>> => {
const scroll = params.scrollLock ? params.scrollLock : '1m';
try {
const nextScrollResult = await service.callCluster('scroll', {
scroll,
scrollId,
});
return nextScrollResult;
} catch (exc) {
logger.error(`[-] nextScroll threw an error ${exc}`);
throw exc;
}
};
// scroll through documents and re-index using bulk endpoint.
export const scrollAndBulkIndex = async (
someResult: SearchResponse<object>,
params: Record<string, any>, // eslint-disable-line @typescript-eslint/no-explicit-any
service: AlertServices,
logger: Logger
): Promise<boolean> => {
logger.info('[+] starting bulk insertion');
const firstBulkIndexSuccess = await singleBulkIndex(someResult, params, service, logger);
if (!firstBulkIndexSuccess) {
logger.warn('First bulk index was unsuccessful');
return false;
}
let newScrollId = someResult._scroll_id;
while (true) {
try {
const scrollResult = await singleScroll(newScrollId, params, service, logger);
newScrollId = scrollResult._scroll_id;
if (scrollResult.hits.hits.length === 0) {
logger.info('[+] Finished indexing signals');
return true;
}
const bulkSuccess = await singleBulkIndex(scrollResult, params, service, logger);
if (!bulkSuccess) {
logger.error('[-] bulk index failed');
}
} catch (exc) {
logger.error('[-] scroll and bulk threw an error');
return false;
}
}
};

View file

@ -0,0 +1,13 @@
{
"id": "3",
"description": "Detecting root and admin users",
"index": ["auditbeat-*", "filebeat-*", "packetbeat-*", "winlogbeat-*"],
"interval": "5m",
"name": "Detect Root/Admin Users",
"severity": 1,
"type": "kql",
"from": "now-16y",
"to": "now-15y",
"kql": "user.name: root or user.name: admin"
}

View file

@ -60,6 +60,25 @@ export interface SiemContext {
req: FrameworkRequest;
}
export interface SignalHit {
signal: {
rule_revision: number;
rule_id: number;
rule_type: string;
parent: {
id: string;
type: string;
depth: number;
};
name: string;
severity: number;
description: string;
time_detected: number;
index_patterns: string[];
references: string[];
};
}
export interface TotalValue {
value: number;
relation: string;