[Entity Analytics] Adding changes for event.ingested in riskScore and assetCriticality (#203975)

## Summary

This pull request introduces changes to the asset criticality and risk
score data clients to utilize a new ingest pipeline for adding event
timestamps. The changes include the addition of utility functions for
creating and retrieving the ingest pipeline, updates to the field
mappings, and modifications to the data clients to integrate the new
pipeline.

### Ingest Pipeline Integration:

*
[`x-pack/plugins/security_solution/server/lib/entity_analytics/utils/create_ingest_pipeline.ts`](diffhunk://#diff-0011b86f0b91d8a6bb1c91ea0ff59830905e90436af01f5893b14d054b4e69f5R1-R50):
Added new utility functions `getIngestPipelineName` and
`createIngestTimestampPipeline` to manage the ingest pipeline for adding
event timestamps.

### Asset Criticality Data Client:

*
[`x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/asset_criticality_data_client.ts`](diffhunk://#diff-31b32ff8816e16c97f0d702225b9e13d7417331850c88b33435079419db94b62R26-R29):
Imported the new utility functions and updated the `init` method to
create the ingest timestamp pipeline. Additionally, modified the index
settings to use the new ingest pipeline.
### Risk Score Data Client:

*
[`x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_data_client.ts`](diffhunk://#diff-5a33102890d8bc4948e5d3d7df3901c23146bde3dee7bd15563bd1169358e43aR43-R46):
Imported the new utility functions, updated the `init` method to create
the ingest timestamp pipeline, and modified the index settings to use
the new ingest pipeline.

### Field Mapping Updates:

*
[`x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/constants.ts`](diffhunk://#diff-d0e75953a3b6d040a296cb4cd7513428a18b152808231819f28d7329dc86a92cL20-R20):
Added the field mapping `event.ingested` for asset criticality.
*
[`x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/configurations.ts`](diffhunk://#diff-43b70e77669c1f7c9608f8d26095db18f6fa0380beeb5990701656ae920602d7L102-R102):
Added the field mapping `event.ingested` for risk score.


### Checklist

Check the PR satisfies following conditions. 

Reviewers should verify this PR satisfies this list as well.

- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] [Flaky Test
Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was
used on any tests changed
- [ ] The PR description includes the appropriate Release Notes section,
and the correct `release_note:*` label is applied per the
[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

### Testing steps :

- Checkout main branch
- Setup and start kibana
- Enable Risk Engine

- Execute below query, result should not have event.ingested
```
GET /*asset-criticality.asset-criticality-*/_mapping
GET /*risk-score.risk-score-latest-*/_mapping
```

- Add data using document generator
- Execute below query
```
GET /*asset-criticality.asset-criticality-*/_search
{
    "_source": ["event.ingested", "@timestamp"],
    "query": {
    "exists": {
      "field": "event.ingested"
    }
  }
}
```
### Expected output
```
{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 0,
      "relation": "eq"
    },
    "max_score": null,
    "hits": []
  }
}
```
- Same output as above for below query too
```
GET /*risk-score.risk-score-latest-*/_search
{
    "_source": ["event.ingested", "@timestamp"],
    "query": {
    "exists": {
      "field": "event.ingested"
    }
  }
}
```

- The below query should give results but `event.ingested` should not be
present in the results

```
GET /*asset-criticality.asset-criticality-*/_search
{
    "_source": ["@timestamp", "event.ingested"]
}

GET /*risk-score.risk-score-latest-*/_search
{
    "_source": ["@timestamp", "event.ingested"]
}
```

### Expected output

```
{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": ".asset-criticality.asset-criticality-default",
        "_id": "user.name:user-001",
        "_score": 1,
        "_source": {
          "@timestamp": "2025-01-09T14:20:24.221Z"
        }
      },
      {
        "_index": ".asset-criticality.asset-criticality-default",
        "_id": "user.name:user-002",
        "_score": 1,
        "_source": {
          "@timestamp": "2025-01-09T14:20:24.221Z"
        }
      },
      {
        "_index": ".asset-criticality.asset-criticality-default",
        "_id": "host.name:host-001",
        "_score": 1,
        "_source": {
          "@timestamp": "2025-01-09T14:20:24.222Z"
        }
      }
    ]
  }
}
```

### - Checkout this PR and restart Kibana

(Try running the Risk Score engine using the Run Engine option if you
have added data after enabling the Risk Engine)

All the above queries should contain data/results with `event.ingested`
as below :

```
{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 11,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19B5MlF3Loy86u-U-mC6BrCwAAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.757784Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19DYvlD0CQ6h1VE9n-ScWnjqwAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.757971Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19DQLgfYH-Zr4z01uVnAImoTgAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758039Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19IqrXmM5aDk2qno3rUL5TI3gAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758108Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19K9okuf9lAZcd2Y7t-QFWJAQAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758163Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19K95CQyZSvT-ZQVwx_6jJTzgAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758222Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19LMkPHJ-L99JamiiYkt9WB1wAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758272Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19M4c0tojXVhK5aOwVA46RNVgAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758462Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19M7j9nZmY4g5bEDPJc20zNHgAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758573Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19TVbTGATHGj2iG_rFIUx2_1QAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758629Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      }
    ]
  }
}
```


```
{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": ".asset-criticality.asset-criticality-default",
        "_id": "user.name:user-001",
        "_score": 1,
        "_source": {
          "@timestamp": "2025-01-10T07:50:19.522Z",
          "event": {
            "ingested": "2025-01-10T07:50:19.532122Z"
          }
        }
      },
      {
        "_index": ".asset-criticality.asset-criticality-default",
        "_id": "user.name:user-002",
        "_score": 1,
        "_source": {
          "@timestamp": "2025-01-10T07:50:19.523Z",
          "event": {
            "ingested": "2025-01-10T07:50:19.535465Z"
          }
        }
      },
      {
        "_index": ".asset-criticality.asset-criticality-default",
        "_id": "host.name:host-001",
        "_score": 1,
        "_source": {
          "@timestamp": "2025-01-10T07:50:19.523Z",
          "event": {
            "ingested": "2025-01-10T07:50:19.535536Z"
          }
        }
      }
    ]
  }
}
```
The ingest pipeline should also be visible as below 

```
GET /_ingest/pipeline/entity_analytics_create_eventIngest_from_timestamp-pipeline*
```


![image](https://github.com/user-attachments/assets/42d4167b-575c-43ea-9219-34b31ded12fb)

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Abhishek Bhatia 2025-01-17 19:12:33 +05:30 committed by GitHub
parent 7987527d31
commit e266c83b81
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 618 additions and 16 deletions

View file

@ -43,7 +43,7 @@ describe('AssetCriticalityDataClient', () => {
index: '.asset-criticality.asset-criticality-default',
mappings: {
_meta: {
version: 2,
version: 3,
},
dynamic: 'strict',
properties: {
@ -56,6 +56,13 @@ describe('AssetCriticalityDataClient', () => {
criticality_level: {
type: 'keyword',
},
event: {
properties: {
ingested: {
type: 'date',
},
},
},
'@timestamp': {
type: 'date',
ignore_malformed: false,
@ -114,6 +121,9 @@ describe('AssetCriticalityDataClient', () => {
},
},
},
settings: {
default_pipeline: 'entity_analytics_create_eventIngest_from_timestamp-pipeline-default',
},
},
});
});

View file

@ -26,6 +26,10 @@ import {
import { AssetCriticalityAuditActions } from './audit';
import { AUDIT_CATEGORY, AUDIT_OUTCOME, AUDIT_TYPE } from '../audit';
import { getImplicitEntityFields } from './helpers';
import {
getIngestPipelineName,
createEventIngestedFromTimestamp,
} from '../utils/create_ingest_pipeline';
interface AssetCriticalityClientOpts {
logger: Logger;
@ -62,6 +66,7 @@ export class AssetCriticalityDataClient {
* Initialize asset criticality resources.
*/
public async init() {
await createEventIngestedFromTimestamp(this.options.esClient, this.options.namespace);
await this.createOrUpdateIndex();
this.options.auditLogger?.log({
@ -90,6 +95,9 @@ export class AssetCriticalityDataClient {
version: ASSET_CRITICALITY_MAPPINGS_VERSIONS,
},
},
settings: {
default_pipeline: getIngestPipelineName(this.options.namespace),
},
},
});
}

View file

@ -95,4 +95,36 @@ export class AssetCriticalityMigrationClient {
}
);
};
public copyTimestampToEventIngestedForAssetCriticality = (abortSignal?: AbortSignal) => {
return this.options.esClient.updateByQuery(
{
index: this.assetCriticalityDataClient.getIndex(),
conflicts: 'proceed',
ignore_unavailable: true,
allow_no_indices: true,
body: {
query: {
bool: {
must_not: {
exists: {
field: 'event.ingested',
},
},
},
},
script: {
source: 'ctx._source.event.ingested = ctx._source.@timestamp',
lang: 'painless',
},
},
},
{
requestTimeout: '5m',
retryOnTimeout: true,
maxRetries: 2,
signal: abortSignal,
}
);
};
}

View file

@ -9,7 +9,7 @@ import { ASSET_CRITICALITY_MAPPINGS_VERSIONS, assetCriticalityFieldMap } from '.
describe('asset criticality - constants', () => {
it("please bump 'ASSET_CRITICALITY_MAPPINGS_VERSIONS' when mappings change", () => {
expect(ASSET_CRITICALITY_MAPPINGS_VERSIONS).toEqual(2);
expect(ASSET_CRITICALITY_MAPPINGS_VERSIONS).toEqual(3);
expect(assetCriticalityFieldMap).toMatchInlineSnapshot(`
Object {
"@timestamp": Object {
@ -27,6 +27,11 @@ describe('asset criticality - constants', () => {
"required": false,
"type": "keyword",
},
"event.ingested": Object {
"array": false,
"required": false,
"type": "date",
},
"host.asset.criticality": Object {
"array": false,
"required": false,

View file

@ -17,7 +17,7 @@ const assetCriticalityMapping = {
};
// Upgrade this value to force a mappings update on the next Kibana startup
export const ASSET_CRITICALITY_MAPPINGS_VERSIONS = 2;
export const ASSET_CRITICALITY_MAPPINGS_VERSIONS = 3;
export const assetCriticalityFieldMap: FieldMap = {
'@timestamp': {
@ -58,6 +58,11 @@ export const assetCriticalityFieldMap: FieldMap = {
required: false,
},
'user.asset.criticality': assetCriticalityMapping,
'event.ingested': {
type: 'date',
array: false,
required: false,
},
'service.name': {
type: 'keyword',
array: false,

View file

@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { EntityAnalyticsMigrationsParams } from '.';
import { AssetCriticalityMigrationClient } from '../asset_criticality/asset_criticality_migration_client';
const TASK_TYPE = 'security-solution-ea-asset-criticality-copy-timestamp-to-event-ingested';
const TASK_ID = `${TASK_TYPE}-task-id`;
const TASK_TIMEOUT = '15m';
const TASK_SCOPE = ['securitySolution'];
export const assetCrticalityCopyTimestampToEventIngested = async ({
auditLogger,
taskManager,
logger,
getStartServices,
}: EntityAnalyticsMigrationsParams) => {
if (!taskManager) {
return;
}
logger.debug(`Register task "${TASK_TYPE}"`);
taskManager.registerTaskDefinitions({
[TASK_TYPE]: {
title: `Copy Asset Criticality @timestamp value to events.ingested`,
timeout: TASK_TIMEOUT,
createTaskRunner: createMigrationTask({ auditLogger, logger, getStartServices }),
},
});
const [_, depsStart] = await getStartServices();
const taskManagerStart = depsStart.taskManager;
if (taskManagerStart) {
logger.debug(`Task scheduled: "${TASK_TYPE}"`);
const now = new Date();
try {
await taskManagerStart.ensureScheduled({
id: TASK_ID,
taskType: TASK_TYPE,
scheduledAt: now,
runAt: now,
scope: TASK_SCOPE,
params: {},
state: {},
});
} catch (e) {
logger.error(`Error scheduling ${TASK_ID}, received ${e.message}`);
}
}
};
export const createMigrationTask =
({
getStartServices,
logger,
auditLogger,
}: Pick<EntityAnalyticsMigrationsParams, 'getStartServices' | 'logger' | 'auditLogger'>) =>
() => {
let abortController: AbortController;
return {
run: async () => {
abortController = new AbortController();
const [coreStart] = await getStartServices();
const esClient = coreStart.elasticsearch.client.asInternalUser;
const assetCrticalityClient = new AssetCriticalityMigrationClient({
esClient,
logger,
auditLogger,
});
const assetCriticalityResponse =
await assetCrticalityClient.copyTimestampToEventIngestedForAssetCriticality(
abortController.signal
);
const failures = assetCriticalityResponse.failures?.map((failure) => failure.cause);
const hasFailures = failures && failures?.length > 0;
logger.info(
`Task "${TASK_TYPE}" finished. Updated documents: ${
assetCriticalityResponse.updated
}, failures: ${hasFailures ? failures.join('\n') : 0}`
);
},
cancel: async () => {
abortController.abort();
logger.debug(`Task cancelled: "${TASK_TYPE}"`);
},
};
};

View file

@ -9,6 +9,8 @@ import type { AuditLogger, Logger, StartServicesAccessor } from '@kbn/core/serve
import type { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import type { StartPlugins } from '../../../plugin';
import { scheduleAssetCriticalityEcsCompliancyMigration } from '../asset_criticality/migrations/schedule_ecs_compliancy_migration';
import { assetCrticalityCopyTimestampToEventIngested } from './asset_criticality_copy_timestamp_to_event_ingested';
import { riskScoreCopyTimestampToEventIngested } from './risk_score_copy_timestamp_to_event_ingested';
import { updateAssetCriticalityMappings } from '../asset_criticality/migrations/update_asset_criticality_mappings';
import { updateRiskScoreMappings } from '../risk_engine/migrations/update_risk_score_mappings';
@ -43,3 +45,21 @@ export const scheduleEntityAnalyticsMigration = async (params: EntityAnalyticsMi
await scheduleAssetCriticalityEcsCompliancyMigration({ ...params, logger: scopedLogger });
await updateRiskScoreMappings({ ...params, logger: scopedLogger });
};
export const scheduleAssetCriticalityCopyTimestampToEventIngested = async (
params: EntityAnalyticsMigrationsParams
) => {
const scopedLogger = params.logger.get(
'entityAnalytics.assetCriticality.copyTimestampToEventIngested'
);
await assetCrticalityCopyTimestampToEventIngested({ ...params, logger: scopedLogger });
};
export const scheduleRiskScoreCopyTimestampToEventIngested = async (
params: EntityAnalyticsMigrationsParams
) => {
const scopedLogger = params.logger.get('entityAnalytics.riskScore.copyTimestampToEventIngested');
await riskScoreCopyTimestampToEventIngested({ ...params, logger: scopedLogger });
};

View file

@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { EntityAnalyticsMigrationsParams } from '.';
import { RiskScoreDataClient } from '../risk_score/risk_score_data_client';
import { buildScopedInternalSavedObjectsClientUnsafe } from '../risk_score/tasks/helpers';
const TASK_TYPE = 'security-solution-ea-risk-score-copy-timestamp-to-event-ingested';
const TASK_ID = `${TASK_TYPE}-task-id`;
const TASK_TIMEOUT = '15m';
const TASK_SCOPE = ['securitySolution'];
export const riskScoreCopyTimestampToEventIngested = async ({
auditLogger,
taskManager,
logger,
getStartServices,
}: EntityAnalyticsMigrationsParams) => {
if (!taskManager) {
return;
}
logger.debug(`Register task "${TASK_TYPE}"`);
taskManager.registerTaskDefinitions({
[TASK_TYPE]: {
title: `Copy Risk Score @timestamp value to events.ingested`,
timeout: TASK_TIMEOUT,
createTaskRunner: createMigrationTask({ auditLogger, logger, getStartServices }),
},
});
const [_, depsStart] = await getStartServices();
const taskManagerStart = depsStart.taskManager;
if (taskManagerStart) {
logger.debug(`Task scheduled: "${TASK_TYPE}"`);
const now = new Date();
try {
await taskManagerStart.ensureScheduled({
id: TASK_ID,
taskType: TASK_TYPE,
scheduledAt: now,
runAt: now,
scope: TASK_SCOPE,
params: {},
state: {},
});
} catch (e) {
logger.error(`Error scheduling ${TASK_ID}, received ${e.message}`);
}
}
};
export const createMigrationTask =
({
getStartServices,
logger,
auditLogger,
}: Pick<EntityAnalyticsMigrationsParams, 'getStartServices' | 'logger' | 'auditLogger'>) =>
() => {
let abortController: AbortController;
return {
run: async () => {
abortController = new AbortController();
const [coreStart] = await getStartServices();
const esClient = coreStart.elasticsearch.client.asInternalUser;
const soClient = buildScopedInternalSavedObjectsClientUnsafe({ coreStart, namespace: '*' });
const riskScoreClient = new RiskScoreDataClient({
esClient,
logger,
auditLogger,
namespace: '*',
soClient,
kibanaVersion: '*',
});
const riskScoreResponse = await riskScoreClient.copyTimestampToEventIngestedForRiskScore(
abortController.signal
);
const failures = riskScoreResponse.failures?.map((failure) => failure.cause);
const hasFailures = failures && failures?.length > 0;
logger.info(
`Task "${TASK_TYPE}" finished. Updated documents: ${
riskScoreResponse.updated
}, failures: ${hasFailures ? failures.join('\n') : 0}`
);
},
cancel: async () => {
abortController.abort();
logger.debug(`Task cancelled: "${TASK_TYPE}"`);
},
};
};

View file

@ -13,7 +13,7 @@ describe('#getDefaultRiskEngineConfiguration', () => {
const namespace = 'default';
const config = getDefaultRiskEngineConfiguration({ namespace });
expect(config._meta.mappingsVersion).toEqual(2);
expect(config._meta.mappingsVersion).toEqual(3);
expect(riskScoreFieldMap).toMatchInlineSnapshot(`
Object {
"@timestamp": Object {
@ -21,6 +21,11 @@ describe('#getDefaultRiskEngineConfiguration', () => {
"required": false,
"type": "date",
},
"event.ingested": Object {
"array": false,
"required": false,
"type": "date",
},
"host.name": Object {
"array": false,
"required": false,

View file

@ -28,7 +28,7 @@ export const getDefaultRiskEngineConfiguration = ({
range: { start: 'now-30d', end: 'now' },
_meta: {
// Upgrade this property when changing mappings
mappingsVersion: 2,
mappingsVersion: 3,
},
});

View file

@ -9,6 +9,13 @@ Object {
"ignore_malformed": false,
"type": "date",
},
"event": Object {
"properties": Object {
"ingested": Object {
"type": "date",
},
},
},
"host": Object {
"properties": Object {
"name": Object {

View file

@ -102,6 +102,11 @@ export const riskScoreFieldMap: FieldMap = {
array: false,
required: false,
},
'event.ingested': {
type: 'date',
array: false,
required: false,
},
'host.name': {
type: 'keyword',
array: false,

View file

@ -210,7 +210,202 @@ const assertIndex = (namespace: string) => {
esClient,
options: {
index: `risk-score.risk-score-latest-${namespace}`,
mappings: expect.any(Object),
mappings: {
dynamic: false,
properties: {
'@timestamp': {
ignore_malformed: false,
type: 'date',
},
event: {
properties: {
ingested: {
type: 'date',
},
},
},
host: {
properties: {
name: {
type: 'keyword',
},
risk: {
properties: {
calculated_level: {
type: 'keyword',
},
calculated_score: {
type: 'float',
},
calculated_score_norm: {
type: 'float',
},
category_1_count: {
type: 'long',
},
category_1_score: {
type: 'float',
},
id_field: {
type: 'keyword',
},
id_value: {
type: 'keyword',
},
notes: {
type: 'keyword',
},
inputs: {
properties: {
id: {
type: 'keyword',
},
index: {
type: 'keyword',
},
category: {
type: 'keyword',
},
description: {
type: 'keyword',
},
risk_score: {
type: 'float',
},
timestamp: {
type: 'date',
},
},
type: 'object',
},
},
type: 'object',
},
},
},
service: {
properties: {
name: {
type: 'keyword',
},
risk: {
properties: {
calculated_level: {
type: 'keyword',
},
calculated_score: {
type: 'float',
},
calculated_score_norm: {
type: 'float',
},
category_1_count: {
type: 'long',
},
category_1_score: {
type: 'float',
},
id_field: {
type: 'keyword',
},
id_value: {
type: 'keyword',
},
inputs: {
properties: {
category: {
type: 'keyword',
},
description: {
type: 'keyword',
},
id: {
type: 'keyword',
},
index: {
type: 'keyword',
},
risk_score: {
type: 'float',
},
timestamp: {
type: 'date',
},
},
type: 'object',
},
notes: {
type: 'keyword',
},
},
type: 'object',
},
},
},
user: {
properties: {
name: {
type: 'keyword',
},
risk: {
properties: {
calculated_level: {
type: 'keyword',
},
calculated_score: {
type: 'float',
},
calculated_score_norm: {
type: 'float',
},
category_1_count: {
type: 'long',
},
category_1_score: {
type: 'float',
},
id_field: {
type: 'keyword',
},
id_value: {
type: 'keyword',
},
notes: {
type: 'keyword',
},
inputs: {
properties: {
id: {
type: 'keyword',
},
index: {
type: 'keyword',
},
category: {
type: 'keyword',
},
description: {
type: 'keyword',
},
risk_score: {
type: 'float',
},
timestamp: {
type: 'date',
},
},
type: 'object',
},
},
type: 'object',
},
},
},
},
},
settings: {
'index.default_pipeline': `entity_analytics_create_eventIngest_from_timestamp-pipeline-${namespace}`,
},
},
});
};

View file

@ -45,6 +45,10 @@ import { createOrUpdateIndex } from '../utils/create_or_update_index';
import { retryTransientEsErrors } from '../utils/retry_transient_es_errors';
import { RiskScoreAuditActions } from './audit';
import { AUDIT_CATEGORY, AUDIT_OUTCOME, AUDIT_TYPE } from '../audit';
import {
createEventIngestedFromTimestamp,
getIngestPipelineName,
} from '../utils/create_ingest_pipeline';
interface RiskScoringDataClientOpts {
logger: Logger;
@ -100,6 +104,9 @@ export class RiskScoreDataClient {
options: {
index: getRiskScoreLatestIndex(this.options.namespace),
mappings: mappingFromFieldMap(riskScoreFieldMap, false),
settings: {
'index.default_pipeline': getIngestPipelineName(this.options.namespace),
},
},
});
};
@ -130,9 +137,10 @@ export class RiskScoreDataClient {
public async init() {
const namespace = this.options.namespace;
const esClient = this.options.esClient;
try {
const esClient = this.options.esClient;
await createEventIngestedFromTimestamp(esClient, namespace);
const indexPatterns = getIndexPatternDataStream(namespace);
@ -169,6 +177,7 @@ export class RiskScoreDataClient {
lifecycle: {},
settings: {
'index.mapping.total_fields.limit': totalFieldsLimit,
'index.default_pipeline': getIngestPipelineName(namespace),
},
mappings: {
dynamic: false,
@ -340,6 +349,38 @@ export class RiskScoreDataClient {
});
}
public copyTimestampToEventIngestedForRiskScore = (abortSignal?: AbortSignal) => {
return this.options.esClient.updateByQuery(
{
index: getRiskScoreLatestIndex(this.options.namespace),
conflicts: 'proceed',
ignore_unavailable: true,
allow_no_indices: true,
body: {
query: {
bool: {
must_not: {
exists: {
field: 'event.ingested',
},
},
},
},
script: {
source: 'ctx._source.event.ingested = ctx._source.@timestamp',
lang: 'painless',
},
},
},
{
requestTimeout: '5m',
retryOnTimeout: true,
maxRetries: 2,
signal: abortSignal,
}
);
};
public async reinstallTransform() {
const esClient = this.options.esClient;
const namespace = this.options.namespace;

View file

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient } from '@kbn/core/server';
export const getIngestPipelineName = (namespace: string): string => {
return `entity_analytics_create_eventIngest_from_timestamp-pipeline-${namespace}`;
};
export const createEventIngestedFromTimestamp = async (
esClient: ElasticsearchClient,
namespace: string
) => {
const ingestTimestampPipeline = getIngestPipelineName(namespace);
try {
const pipeline = {
id: ingestTimestampPipeline,
body: {
_meta: {
managed_by: 'entity_analytics',
managed: true,
},
description: 'Pipeline for adding timestamp value to event.ingested',
processors: [
{
set: {
field: 'event.ingested',
value: '{{_ingest.timestamp}}',
},
},
],
},
};
await esClient.ingest.putPipeline(pipeline);
} catch (e) {
throw new Error(`Error creating ingest pipeline: ${e}`);
}
};

View file

@ -71,7 +71,7 @@ export default ({ getService }: FtrProviderContext) => {
assetCriticalityIndexResult['.asset-criticality.asset-criticality-default']?.mappings
).to.eql({
_meta: {
version: 2,
version: 3,
},
dynamic: 'strict',
properties: {
@ -81,6 +81,13 @@ export default ({ getService }: FtrProviderContext) => {
criticality_level: {
type: 'keyword',
},
event: {
properties: {
ingested: {
type: 'date',
},
},
},
id_field: {
type: 'keyword',
},
@ -160,8 +167,7 @@ export default ({ getService }: FtrProviderContext) => {
expect(result['@timestamp']).to.be.a('string');
const doc = await getAssetCriticalityDoc({ idField: 'host.name', idValue: 'host-01', es });
expect(doc).to.eql(result);
expect(_.omit(doc, 'event')).to.eql(result);
});
it('should return 400 if criticality is invalid', async () => {
@ -372,7 +378,7 @@ export default ({ getService }: FtrProviderContext) => {
const doc = await getAssetCriticalityDoc({ idField: 'host.name', idValue: 'host-01', es });
expect(doc).to.eql(updatedDoc);
expect(_.omit(doc, 'event')).to.eql(_.omit(updatedDoc, 'event'));
});
});
@ -387,7 +393,7 @@ export default ({ getService }: FtrProviderContext) => {
idValue: expectedDoc.id_value,
});
expect(omit(esDoc, '@timestamp')).to.eql(expectedDoc);
expect(omit(esDoc, ['@timestamp', 'event'])).to.eql(expectedDoc);
};
it('should return 400 if the records array is empty', async () => {
@ -478,9 +484,8 @@ export default ({ getService }: FtrProviderContext) => {
await assetCriticalityRoutes.upsert(assetCriticality);
const res = await assetCriticalityRoutes.delete('host.name', 'delete-me');
expect(res.body.deleted).to.eql(true);
expect(_.omit(res.body.record, '@timestamp')).to.eql(
expect(_.omit(res.body.record, ['@timestamp', 'event'])).to.eql(
assetCreateTypeToAssetRecord(assetCriticality)
);
@ -494,7 +499,9 @@ export default ({ getService }: FtrProviderContext) => {
...assetCriticality,
criticality_level: CRITICALITY_VALUES.DELETED,
};
expect(_.omit(doc, '@timestamp')).to.eql(assetCreateTypeToAssetRecord(deletedDoc));
expect(_.omit(doc, ['@timestamp', 'event'])).to.eql(
assetCreateTypeToAssetRecord(deletedDoc)
);
});
it('should not return 404 if the asset criticality does not exist', async () => {

View file

@ -71,6 +71,8 @@ export default ({ getService }: FtrProviderContext) => {
const dataStreamName = 'risk-score.risk-score-default';
const latestIndexName = 'risk-score.risk-score-latest-default';
const transformId = 'risk_score_latest_transform_default';
const defaultPipeline =
'entity_analytics_create_eventIngest_from_timestamp-pipeline-default';
await riskEngineRoutes.init();
@ -89,6 +91,13 @@ export default ({ getService }: FtrProviderContext) => {
ignore_malformed: false,
type: 'date',
},
event: {
properties: {
ingested: {
type: 'date',
},
},
},
host: {
properties: {
name: {
@ -288,6 +297,7 @@ export default ({ getService }: FtrProviderContext) => {
expect(indexTemplate.index_template.template!.settings).to.eql({
index: {
default_pipeline: defaultPipeline,
mapping: {
total_fields: {
limit: '1000',
@ -341,6 +351,7 @@ export default ({ getService }: FtrProviderContext) => {
const dataStreamName = `risk-score.risk-score-${customSpaceName}`;
const latestIndexName = `risk-score.risk-score-latest-${customSpaceName}`;
const transformId = `risk_score_latest_transform_${customSpaceName}`;
const defaultPipeline = `entity_analytics_create_eventIngest_from_timestamp-pipeline-${customSpaceName}`;
await riskEngineRoutesWithNamespace.init();
@ -359,6 +370,13 @@ export default ({ getService }: FtrProviderContext) => {
ignore_malformed: false,
type: 'date',
},
event: {
properties: {
ingested: {
type: 'date',
},
},
},
host: {
properties: {
name: {
@ -562,6 +580,7 @@ export default ({ getService }: FtrProviderContext) => {
expect(indexTemplate.index_template.template!.settings).to.eql({
index: {
default_pipeline: defaultPipeline,
mapping: {
total_fields: {
limit: '1000',
@ -626,7 +645,7 @@ export default ({ getService }: FtrProviderContext) => {
start: 'now-30d',
},
_meta: {
mappingsVersion: 2,
mappingsVersion: 3,
},
});
});