[Entity Analytics][Privmon] Changing CSV bulk upsert to use scripted updates (#224721)

## Summary

This PR changes the "soft delete" mechanism in the CSV upload to use
scripted updates.
There were 2 main reasons for this:

1. We need to check if a privileged user has been added from some other
data source

In this case, not including the user in the CSV upload shouldn't delete
it, instead only remove the `"csv"` source label.

2. The soft delete needs to take into account the full list of users
being uploaded, and not only the current batch

This means we need to collapse the stream first and then run the soft
delete logic. Doing this allows the soft delete to search the Privmon
index for all users not included in the full csv file

## How to test


#### Prerequisite

Make sure you have a CSV file with usernames
Check
[here](https://gist.github.com/tiansivive/0be2f09e1bb380fdde6609a131e929ed)
for a little helper script

Create a few copies where some of the users are deleted, in order to
test soft delete.
Make sure to create files that are over the batch size (`100`) or change
the batch size in the code.

1. Start up kibana and ES
2. Navigate to Security > Entity Analytics > Privilege User Monitoring
3. Select the `File` option to add data
4. Add one of the CSV files to the open modal and upload
5. Repeat but now upload one of files with the omitted users 

Alternatively, testing only the backend only is possible by directly
hitting the API wit curl
```sh
curl -u elastic:changeme \
  -X POST "http://localhost:5601/api/entity_analytics/monitoring/users/_csv" \
  -H "kbn-xsrf: true" \
  -F "file=@test.csv;type=text/csv"
```

#### Verifying

Easiest way is to use the dev tools to `_search` the privmon users index
with:
```
GET .entity_analytics.monitoring.users-default/_search
```

Look for number of hits and/or use `query` to search for omitted users
like:
```json
{
   "query": {
      "bool": {
         "must": [
            { "term": { "labels.monitoring.privileged_users": "deleted" } }
          ]
      }
   }
}
```

Verify that the `"deleted"` users are _only_ the ones missing from the
last uploaded file.
This commit is contained in:
Tiago Vila Verde 2025-06-23 17:20:30 +02:00 committed by GitHub
parent cb918fcfb9
commit 5eb35a702e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 265 additions and 182 deletions

View file

@ -57,10 +57,12 @@ import type { PrivMonUserSource } from './types';
import { batchPartitions } from '../shared/streams/batching';
import { queryExistingUsers } from './users/query_existing_users';
import { bulkBatchUpsertFromCSV } from './users/bulk/update_from_csv';
import type { SoftDeletionResults } from './users/bulk/soft_delete_omitted_usrs';
import { softDeleteOmittedUsers } from './users/bulk/soft_delete_omitted_usrs';
import { bulkUpsertBatch } from './users/bulk/upsert_batch';
import type { SoftDeletionResults } from './users/soft_delete_omitted_users';
import { softDeleteOmittedUsers } from './users/soft_delete_omitted_users';
import { privilegedUserParserTransform } from './users/privileged_user_parse_transform';
import type { Accumulator } from './users/bulk/utils';
import { accumulateUpsertResults } from './users/bulk/utils';
interface PrivilegeMonitoringClientOpts {
logger: Logger;
@ -284,28 +286,35 @@ export class PrivilegeMonitoringDataClient {
skipEmptyLines: true,
});
return Readable.from(stream.pipe(csvStream))
const res = Readable.from(stream.pipe(csvStream))
.pipe(privilegedUserParserTransform())
.pipe(batchPartitions(100)) // we cant use .map() because we need to hook into the stream flush to finish the last batch
.map(queryExistingUsers(this.esClient, this.getIndex()))
.map(bulkBatchUpsertFromCSV(this.esClient, this.getIndex(), { flushBytes, retries }))
.map(softDeleteOmittedUsers(this.esClient, this.getIndex(), { flushBytes, retries }))
.reduce(
(
{ errors, stats }: PrivmonBulkUploadUsersCSVResponse,
batch: SoftDeletionResults
): PrivmonBulkUploadUsersCSVResponse => {
return {
errors: errors.concat(batch.updated.errors),
stats: {
failed: stats.failed + batch.updated.failed,
successful: stats.successful + batch.updated.successful,
total: stats.total + batch.updated.failed + batch.updated.successful,
},
};
},
{ errors: [], stats: { failed: 0, successful: 0, total: 0 } }
);
.map(bulkUpsertBatch(this.esClient, this.getIndex(), { flushBytes, retries }))
.reduce(accumulateUpsertResults, {
users: [],
errors: [],
failed: 0,
successful: 0,
} satisfies Accumulator)
.then(softDeleteOmittedUsers(this.esClient, this.getIndex(), { flushBytes, retries }))
.then((results: SoftDeletionResults) => {
return {
errors: results.updated.errors.concat(results.deleted.errors),
stats: {
failed: results.updated.failed + results.deleted.failed,
successful: results.updated.successful + results.deleted.successful,
total:
results.updated.failed +
results.updated.successful +
results.deleted.failed +
results.deleted.successful,
},
};
});
return res;
}
private log(level: Exclude<keyof Logger, 'get' | 'log' | 'isLevelEnabled'>, msg: string) {

View file

@ -1,77 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient } from '@kbn/core/server';
import { isRight } from 'fp-ts/Either';
import type { MonitoredUserDoc } from '../../../../../../common/api/entity_analytics/privilege_monitoring/users/common.gen';
import type { BulkProcessingResults, Options } from './types';
export interface SoftDeletionResults {
updated: BulkProcessingResults;
deleted: {
successful: number;
failed: number;
errors: BulkProcessingResults['errors'];
users: string[];
};
}
export const softDeleteOmittedUsers =
(esClient: ElasticsearchClient, index: string, { flushBytes, retries }: Options) =>
async (processed: BulkProcessingResults) => {
const uploaded = processed.batch.uploaded.reduce((acc: string[], either) => {
if (!isRight(either)) {
return acc;
}
acc.push(either.right.username);
return acc;
}, []);
const res = await esClient.helpers.search<MonitoredUserDoc>({
index,
query: {
bool: {
must: [{ term: { 'user.is_privileged': true } }, { term: { 'labels.sources': 'csv' } }],
must_not: [{ terms: { 'user.name': uploaded } }],
},
},
});
const usersToDelete = res.map((hit) => hit._id);
const errors: BulkProcessingResults['errors'] = [];
const { failed, successful } = await esClient.helpers.bulk<string>({
index,
datasource: usersToDelete,
flushBytes,
retries,
refreshOnCompletion: index,
onDocument: (id) => {
return [
{ update: { _id: id } },
{
doc: {
user: {
is_privileged: false,
},
},
},
];
},
onDrop: ({ error, document }) => {
errors.push({
message: error?.message || 'Unknown error',
username: document,
index: null, // The error is not related to a specific row in a CSV
});
},
});
return {
updated: processed,
deleted: { failed, successful, errors, users: usersToDelete },
} satisfies SoftDeletionResults;
};

View file

@ -28,9 +28,16 @@ export interface BulkProcessingError {
index: number | null;
}
export interface BulkProcessingResults {
export interface BulkBatchProcessingResults {
failed: number;
successful: number;
errors: BulkProcessingError[];
batch: Batch;
}
export interface BulkProcessingResults {
users: BulkPrivMonUser[];
errors: BulkProcessingError[];
failed: number;
successful: number;
}

View file

@ -1,82 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient } from '@kbn/core/server';
import { Readable } from 'stream';
import type { Either } from 'fp-ts/Either';
import { isRight } from 'fp-ts/Either';
import type {
Batch,
BulkPrivMonUser,
BulkProcessingError,
BulkProcessingResults,
Options,
} from './types';
export const bulkBatchUpsertFromCSV =
(esClient: ElasticsearchClient, index: string, { flushBytes, retries }: Options) =>
async (batch: Batch) => {
const errors: BulkProcessingError[] = [];
let parsingFailures = 0;
const { failed, successful } = await esClient.helpers.bulk<BulkPrivMonUser>({
index,
flushBytes,
retries,
datasource: Readable.from(batch.uploaded)
.filter((either: Either<BulkProcessingError, BulkPrivMonUser>) => {
if (isRight(either)) {
return true;
}
errors.push(either.left);
parsingFailures++;
return false;
})
.map((e) => e.right),
refreshOnCompletion: index,
onDrop: ({ error, document }) => {
errors.push({
message: error?.message || 'Unknown error',
username: document.username,
index: document.index,
});
},
onDocument: (row) => {
const id = batch.existingUsers[row.username];
const labels = {
sources: ['csv'],
};
if (!id) {
return [
{ create: {} },
{
user: { name: row.username, is_privileged: true },
labels,
},
];
}
return [
{ update: { _id: id } },
{
doc: {
user: { name: row.username },
labels,
},
},
];
},
});
return {
failed: failed + parsingFailures,
successful,
errors,
batch,
} satisfies BulkProcessingResults;
};

View file

@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient } from '@kbn/core/server';
import { separate } from 'fp-ts/Array';
import type { Batch, BulkPrivMonUser, BulkBatchProcessingResults, Options } from './types';
export const bulkUpsertBatch =
(esClient: ElasticsearchClient, index: string, { flushBytes, retries }: Options) =>
async (batch: Batch) => {
const { left: parsingErrors, right: users } = separate(batch.uploaded);
const res = await esClient.bulk<BulkPrivMonUser>({
index,
operations: users.flatMap((u) => {
const id = batch.existingUsers[u.username];
if (!id) {
return [
{ create: {} },
{
user: { name: u.username, is_privileged: true },
labels: { sources: ['csv'] },
},
/* eslint-disable @typescript-eslint/no-explicit-any */
] as any;
}
return [
{ update: { _id: id } },
{
script: {
source: /* java */ `
if (ctx._source.labels == null) {
ctx._source.labels = new HashMap();
}
if (ctx._source.labels.sources == null) {
ctx._source.labels.sources = new ArrayList();
}
if (!ctx._source.labels.sources.contains(params.source)) {
ctx._source.labels.sources.add(params.source);
}
if (ctx._source.user.is_privileged == false) {
ctx._source.user.is_privileged = true;
}
`,
lang: 'painless',
params: {
source: 'csv',
},
},
},
/* eslint-disable @typescript-eslint/no-explicit-any */
] as any;
/* eslint-disable @typescript-eslint/no-explicit-any */
}) as any,
refresh: 'wait_for',
});
const stats = res.items.reduce(
(acc, item, i) => {
if (item.create && item.create.error) {
const err = {
message: item.create.error.reason || 'Create action: Unknown error',
username: users[i].username,
index: i,
};
return {
...acc,
failed: acc.failed + 1,
errors: acc.errors.concat(err),
};
}
if (item.update && item.update.error) {
const err = {
message: item.update.error.reason || 'Update action: Unknown error',
username: users[i].username,
index: i,
};
return {
...acc,
failed: acc.failed + 1,
errors: acc.errors.concat(err),
};
}
return {
...acc,
successful: acc.successful + 1,
};
},
{ failed: parsingErrors.length, successful: 0, errors: parsingErrors }
);
return {
...stats,
batch,
} satisfies BulkBatchProcessingResults;
};

View file

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { separate } from 'fp-ts/Array';
import type { BulkProcessingResults, BulkPrivMonUser, BulkBatchProcessingResults } from './types';
export interface Accumulator {
failed: number;
successful: number;
errors: BulkProcessingResults['errors'];
users: BulkPrivMonUser[];
}
export const accumulateUpsertResults = (
acc: Accumulator,
processed: BulkBatchProcessingResults
): BulkProcessingResults => {
const { left: errors, right: users } = separate(processed.batch.uploaded);
return {
users: acc.users.concat(users),
errors: acc.errors.concat(errors),
failed: acc.failed + processed.failed,
successful: acc.successful + processed.successful,
};
};

View file

@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient } from '@kbn/core/server';
import type { MonitoredUserDoc } from '../../../../../common/api/entity_analytics/privilege_monitoring/users/common.gen';
import type { BulkProcessingError, BulkProcessingResults, Options } from './bulk/types';
export interface SoftDeletionResults {
updated: BulkProcessingResults;
deleted: {
successful: number;
failed: number;
errors: BulkProcessingResults['errors'];
users: string[];
};
}
export const softDeleteOmittedUsers =
(esClient: ElasticsearchClient, index: string, { flushBytes, retries }: Options) =>
async (processed: BulkProcessingResults) => {
const res = await esClient.helpers.search<MonitoredUserDoc>({
index,
query: {
bool: {
must: [{ term: { 'user.is_privileged': true } }, { term: { 'labels.sources': 'csv' } }],
must_not: [{ terms: { 'user.name': processed.users.map((u) => u.username) } }],
},
},
});
const usersToDelete = res.map((hit) => hit._id);
const errors: BulkProcessingResults['errors'] = [];
const accumulator = { users: usersToDelete, failed: 0, successful: 0, errors };
const stats =
usersToDelete.length === 0
? accumulator
: await esClient
.bulk<MonitoredUserDoc>({
index,
refresh: 'wait_for',
operations: usersToDelete.flatMap((id) => [
{ update: { _id: id } },
{
script: {
source: /* java */ `
if (ctx._source.labels != null && ctx._source.labels.sources != null) {
ctx._source.labels.sources.removeIf(src -> src == params.to_remove);
if (ctx._source.labels.sources.isEmpty()) {
ctx._source.user.is_privileged = false;
}
}
`,
lang: 'painless',
params: {
to_remove: 'csv',
},
},
},
]),
})
.then((results) =>
results.items.reduce((acc, item, i) => {
if (item.update?.error) {
return {
...acc,
failed: acc.failed + 1,
errors: acc.errors.concat({
message:
item.update.error.reason || 'Soft delete update action: Unknown error',
username: acc.users[i],
index: i,
} satisfies BulkProcessingError),
};
}
return {
...acc,
successful: acc.successful + 1,
};
}, accumulator)
);
return {
updated: processed,
deleted: stats,
} satisfies SoftDeletionResults;
};