[EDR Workflows] Additional usage reporting task test (#185888)

**Unit:**

This PR introduces an additional unit test to validate the behavior of
"paginated" ES.search() when using a default batch size of 1000. It
ensures that the data is appropriately passed down to the API through
three subsequent requests. The underlying logic can be summarized as
follows:

1. Fetch all documents with a timestamp greater than or equal to 15
minutes in batches of 1000.
2. Perform internal transformations.
3. Transmit transformed documents to an external API.

Since this process occurs server-side within a managed task running at
regular intervals, it is not feasible to conduct integration testing
using FTR or Cypress, as these requests cannot be intercepted in a
browser environment. However, I believe that the Jest test included in
this PR adequately addresses our responsibility. In this test, I mock
esClient.search() responses in a loop to ensure that all fetched items
are correctly sent to the API.

**Cypress:**

The Cypress integration test added with this PR is based on an actual
task run. Steps taken:
1. Create an environment with the task interval set to 1 minute and a
local API URL for usage reporting.
2. Load 2001 documents into ES, ensuring they have all the required
heartbeat fields.
3. Start transparent API proxy on a local API URL
4. Wait for the task to run.
5. Intercept the task API call to the usage reporting API.
6. Verify that the API request contains a payload of all 2001 documents
stored in ES.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Konrad Szwarc 2024-06-21 00:14:28 +02:00 committed by GitHub
parent ac58b908de
commit 8f01b66ef2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 640 additions and 117 deletions

View file

@ -1231,6 +1231,7 @@
"@babel/types": "7.21.2",
"@bazel/ibazel": "^0.16.2",
"@bazel/typescript": "4.6.2",
"@cypress/debugging-proxy": "2.0.1",
"@cypress/grep": "^4.0.1",
"@cypress/webpack-preprocessor": "^6.0.1",
"@elastic/eslint-plugin-eui": "0.0.2",

View file

@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Client, estypes } from '@elastic/elasticsearch';
import type { ToolingLog } from '@kbn/tooling-log';
import { ENDPOINT_HEARTBEAT_INDEX } from '../constants';
import { createToolingLogger } from './utils';
export interface IndexedEndpointHeartbeats {
data: estypes.BulkResponse;
cleanup: () => Promise<DeletedEndpointHeartbeats>;
}
export interface DeletedEndpointHeartbeats {
data: estypes.BulkResponse;
}
export const indexEndpointHeartbeats = async (
esClient: Client,
log: ToolingLog,
count: number
): Promise<IndexedEndpointHeartbeats> => {
log.debug(`Indexing ${count} endpoint heartbeats`);
const startTime = new Date();
const docs = Array.from({ length: count }).map((_, i) => {
const ingested = new Date(startTime.getTime() + i).toISOString();
return {
'@timestamp': '2024-06-11T13:03:37Z',
agent: {
id: `agent-${i}`,
},
event: {
agent_id_status: 'auth_metadata_missing',
ingested,
},
};
});
const operations = docs.flatMap((doc) => [
{
index: {
_index: ENDPOINT_HEARTBEAT_INDEX,
op_type: 'create',
},
},
doc,
]);
const response = await esClient.bulk({
refresh: 'wait_for',
operations,
});
if (response.errors) {
log.error(
`There was an error indexing endpoint heartbeats ${JSON.stringify(response.items, null, 2)}`
);
} else {
log.debug(`Indexed ${docs.length} endpoint heartbeats successfully`);
}
return {
data: response,
cleanup: deleteIndexedEndpointHeartbeats.bind(null, esClient, response, log),
};
};
export const deleteIndexedEndpointHeartbeats = async (
esClient: Client,
indexedHeartbeats: IndexedEndpointHeartbeats['data'],
log = createToolingLogger()
): Promise<DeletedEndpointHeartbeats> => {
let response: estypes.BulkResponse = {
took: 0,
errors: false,
items: [],
};
if (indexedHeartbeats.items.length) {
const idsToDelete = indexedHeartbeats.items
.filter((item) => item.create)
.map((item) => ({
delete: {
_index: item.create?._index,
_id: item.create?._id,
},
}));
if (idsToDelete.length) {
response = await esClient.bulk({
operations: idsToDelete,
});
log.debug('Indexed endpoint heartbeats deleted successfully');
}
}
return { data: response };
};

View file

@ -10,6 +10,11 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import type { CasePostRequest } from '@kbn/cases-plugin/common/api';
import type { UsageRecord } from '@kbn/security-solution-serverless/server/types';
import type {
DeletedEndpointHeartbeats,
IndexedEndpointHeartbeats,
} from '../../../common/endpoint/data_loaders/index_endpoint_hearbeats';
import type { SecuritySolutionDescribeBlockFtrConfig } from '../../../scripts/run_cypress/utils';
import type { DeleteAllEndpointDataResponse } from '../../../scripts/endpoint/common/delete_all_endpoint_data';
import type { IndexedEndpointPolicyResponse } from '../../../common/endpoint/data_loaders/index_endpoint_policy_response';
@ -119,6 +124,36 @@ declare global {
options?: Partial<Loggable & Timeoutable>
): Chainable<IndexedCase['data']>;
task(
name: 'indexEndpointHeartbeats',
arg?: { count?: number },
options?: Partial<Loggable & Timeoutable>
): Chainable<IndexedEndpointHeartbeats['data']>;
task(
name: 'deleteIndexedEndpointHeartbeats',
arg: IndexedEndpointHeartbeats['data'],
options?: Partial<Loggable & Timeoutable>
): Chainable<DeletedEndpointHeartbeats>;
task(
name: 'startTransparentApiProxy',
arg?: { port?: number },
options?: Partial<Loggable & Timeoutable>
): Chainable<null>;
task(
name: 'getInterceptedRequestsFromTransparentApiProxy',
arg?: {},
options?: Partial<Loggable & Timeoutable>
): Chainable<UsageRecord[][]>;
task(
name: 'stopTransparentProxyApi',
arg?: {},
options?: Partial<Loggable & Timeoutable>
): Chainable<null>;
task(
name: 'deleteIndexedCase',
arg: IndexedCase['data'],

View file

@ -8,6 +8,8 @@
// @ts-expect-error
import registerDataSession from 'cypress-data-session/src/plugin';
import { merge } from 'lodash';
import { transparentApiProxy } from './support/transparent_api_proxy';
import { samlAuthentication } from './support/saml_authentication';
import { getVideosForFailedSpecs } from './support/filter_videos';
import { setupToolingLogLevel } from './support/setup_tooling_log_level';
@ -75,7 +77,6 @@ export const getCypressBaseConfig = (
// baseUrl: To override, set Env. variable `CYPRESS_BASE_URL`
baseUrl: 'http://localhost:5601',
supportFile: 'public/management/cypress/support/e2e.ts',
// TODO: undo before merge
specPattern: 'public/management/cypress/e2e/**/*.cy.{js,jsx,ts,tsx}',
experimentalRunAllSpecs: true,
experimentalMemoryManagement: true,
@ -100,6 +101,8 @@ export const getCypressBaseConfig = (
dataLoaders(on, config);
transparentApiProxy(on, config);
// Data loaders specific to "real" Endpoint testing
dataLoadersForRealEndpoints(on, config);

View file

@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { recurse } from 'cypress-recurse';
import type { UsageRecord } from '@kbn/security-solution-serverless/server/types';
import { METERING_SERVICE_BATCH_SIZE } from '@kbn/security-solution-serverless/server/constants';
import {
getInterceptedRequestsFromTransparentApiProxy,
startTransparentApiProxy,
stopTransparentApiProxy,
} from '../../tasks/transparent_api_proxy';
import type { ReturnTypeFromChainable } from '../../types';
import { indexEndpointHeartbeats } from '../../tasks/index_endpoint_heartbeats';
import { login, ROLE } from '../../tasks/login';
describe(
'Metering',
{
tags: ['@serverless', '@skipInServerlessMKI'],
env: {
ftrConfig: {
kbnServerArgs: [
`--xpack.securitySolutionServerless.usageReportingTaskInterval=1m`,
`--xpack.securitySolutionServerless.usageReportingApiUrl=https://localhost:3623`,
],
},
},
},
() => {
const HEARTBEAT_COUNT = 2001;
let endpointData: ReturnTypeFromChainable<typeof indexEndpointHeartbeats> | undefined;
before(() => {
login(ROLE.system_indices_superuser);
startTransparentApiProxy({ port: 3623 });
indexEndpointHeartbeats({
count: HEARTBEAT_COUNT,
}).then((indexedHeartbeats) => {
endpointData = indexedHeartbeats;
});
});
after(() => {
if (endpointData) {
endpointData.cleanup();
endpointData = undefined;
}
stopTransparentApiProxy();
});
describe('Usage Reporting Task', () => {
it('properly sends indexed heartbeats to the metering api', () => {
const expectedChunks = Math.ceil(HEARTBEAT_COUNT / METERING_SERVICE_BATCH_SIZE);
recurse(
getInterceptedRequestsFromTransparentApiProxy,
(res: UsageRecord[][]) => {
if (res.length === expectedChunks) {
expect(res).to.have.length(expectedChunks);
for (let i = 0; i < expectedChunks; i++) {
if (i < expectedChunks - 1) {
expect(res[i]).to.have.length(METERING_SERVICE_BATCH_SIZE);
} else {
// The last or only chunk
expect(res[i]).to.have.length(
HEARTBEAT_COUNT % METERING_SERVICE_BATCH_SIZE || METERING_SERVICE_BATCH_SIZE
);
}
}
const allHeartbeats = res.flat();
expect(allHeartbeats).to.have.length(HEARTBEAT_COUNT);
expect(allHeartbeats[0].id).to.contain('agent-0');
expect(allHeartbeats[HEARTBEAT_COUNT - 1].id).to.contain(
`agent-${HEARTBEAT_COUNT - 1}`
);
return true;
}
return false;
},
{
delay: 15 * 1000,
timeout: 2 * 60 * 1000,
}
);
});
});
}
);

View file

@ -11,6 +11,11 @@ import type { CasePostRequest } from '@kbn/cases-plugin/common';
import execa from 'execa';
import type { KbnClient } from '@kbn/test';
import type { ToolingLog } from '@kbn/tooling-log';
import type { IndexedEndpointHeartbeats } from '../../../../common/endpoint/data_loaders/index_endpoint_hearbeats';
import {
deleteIndexedEndpointHeartbeats,
indexEndpointHeartbeats,
} from '../../../../common/endpoint/data_loaders/index_endpoint_hearbeats';
import {
getHostVmClient,
createVm,
@ -226,6 +231,19 @@ export const dataLoaders = (
return deleteIndexedHostsAndAlerts(esClient, kbnClient, indexedData);
},
indexEndpointHeartbeats: async (options: { count?: number }) => {
const { esClient, log } = await setupStackServicesUsingCypressConfig(config);
return (await indexEndpointHeartbeats(esClient, log, options.count || 1)).data;
},
deleteIndexedEndpointHeartbeats: async (
data: IndexedEndpointHeartbeats['data']
): Promise<null> => {
const { esClient } = await stackServicesPromise;
await deleteIndexedEndpointHeartbeats(esClient, data);
return null;
},
indexEndpointRuleAlerts: async (options: { endpointAgentId: string; count?: number }) => {
const { esClient, log } = await stackServicesPromise;
return (

View file

@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
// eslint-disable-next-line import/no-nodejs-modules
import fs from 'fs';
// eslint-disable-next-line import/no-nodejs-modules
import type { IncomingMessage, ServerResponse } from 'http';
import DebugProxy from '@cypress/debugging-proxy';
import { ES_CERT_PATH, ES_KEY_PATH } from '@kbn/dev-utils';
import type { UsageRecord } from '@kbn/security-solution-serverless/server/types';
import { setupStackServicesUsingCypressConfig } from './common';
export const transparentApiProxy = (
on: Cypress.PluginEvents,
config: Cypress.PluginConfigOptions
): void => {
let proxy: { start: (port: number) => Promise<void>; stop: () => Promise<void> } | null = null;
const interceptedRequestBody: UsageRecord[][] = [];
on('task', {
startTransparentApiProxy: async (options) => {
const { log } = await setupStackServicesUsingCypressConfig(config);
const port = options?.port || 3623;
log.debug(`[Transparent API] Starting transparent API proxy on port ${port}`);
try {
proxy = new DebugProxy({
keepRequests: true,
https: {
key: fs.readFileSync(ES_KEY_PATH),
cert: fs.readFileSync(ES_CERT_PATH),
},
onRequest: (_: string, req: IncomingMessage, res: ServerResponse) => {
let body = '';
req.on('data', (chunk: string) => {
body += chunk;
});
req.on('end', () => {
try {
const parsedBody = JSON.parse(body);
interceptedRequestBody.push(parsedBody);
} catch (err) {
throw new Error(`[Transparent API] Failed to parse request body as JSON: ${err}`);
}
res.writeHead(201);
res.end();
});
},
});
} catch (e) {
log.error(`[Transparent API] Error starting transparent API proxy: ${e}`);
throw e;
}
if (!proxy) {
throw new Error('[Transparent API] Proxy was not initialized');
}
await proxy.start(port);
log.debug(`[Transparent API] proxy started on port ${port}`);
return null;
},
getInterceptedRequestsFromTransparentApiProxy: async (): Promise<UsageRecord[][]> => {
return interceptedRequestBody;
},
stopTransparentProxyApi: async () => {
if (proxy) {
await proxy.stop();
}
return null;
},
});
};

View file

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type {
DeletedEndpointHeartbeats,
IndexedEndpointHeartbeats,
} from '../../../../common/endpoint/data_loaders/index_endpoint_hearbeats';
export const indexEndpointHeartbeats = (options: {
count?: number;
}): Cypress.Chainable<
Pick<IndexedEndpointHeartbeats, 'data'> & {
cleanup: () => Cypress.Chainable<DeletedEndpointHeartbeats>;
}
> => {
return cy.task('indexEndpointHeartbeats', options).then((res) => {
return {
data: res,
cleanup: () => {
cy.log('Deleting Endpoint Host heartbeats');
return cy.task('deleteIndexedEndpointHeartbeats', res);
},
};
});
};

View file

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { UsageRecord } from '@kbn/security-solution-serverless/server/types';
export const startTransparentApiProxy = (options: { port?: number }): Cypress.Chainable<null> => {
return cy.task('startTransparentApiProxy', options);
};
export const stopTransparentApiProxy = (): Cypress.Chainable<null> => {
return cy.task('stopTransparentProxyApi');
};
export const getInterceptedRequestsFromTransparentApiProxy = (): Cypress.Chainable<
UsageRecord[][]
> => {
return cy.task('getInterceptedRequestsFromTransparentApiProxy');
};

View file

@ -31,5 +31,7 @@
"@kbn/test",
"@kbn/repo-info",
"@kbn/tooling-log",
"@kbn/security-solution-serverless",
"@kbn/dev-utils",
]
}

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
declare module '@cypress/debugging-proxy';

View file

@ -15,8 +15,11 @@ import type { UsageRecord } from '../../types';
// TODO remove once we have the CA available
const agent = new https.Agent({ rejectUnauthorized: false });
export class UsageReportingService {
public async reportUsage(records: UsageRecord[]): Promise<Response> {
return fetch(USAGE_SERVICE_USAGE_URL, {
public async reportUsage(
records: UsageRecord[],
url = USAGE_SERVICE_USAGE_URL
): Promise<Response> {
return fetch(url, {
method: 'post',
body: JSON.stringify(records),
headers: { 'Content-Type': 'application/json' },

View file

@ -8,6 +8,7 @@
import { schema, type TypeOf } from '@kbn/config-schema';
import type { PluginConfigDescriptor, PluginInitializerContext } from '@kbn/core/server';
import type { SecuritySolutionPluginSetup } from '@kbn/security-solution-plugin/server/plugin_contract';
import { USAGE_SERVICE_USAGE_URL } from './constants';
import { productTypes } from '../common/config';
import type { ExperimentalFeatures } from '../common/experimental_features';
import { parseExperimentalConfigValue } from '../common/experimental_features';
@ -15,6 +16,21 @@ import { parseExperimentalConfigValue } from '../common/experimental_features';
export const configSchema = schema.object({
enabled: schema.boolean({ defaultValue: false }),
productTypes,
/**
* Usage Reporting: the interval between runs of the task
*/
usageReportingTaskInterval: schema.string({ defaultValue: '5m' }),
/**
* Usage Reporting: timeout value for how long the task should run.
*/
usageReportingTaskTimeout: schema.string({ defaultValue: '1m' }),
/**
* Usage Reporting: the URL to send usage data to
*/
usageReportingApiUrl: schema.string({ defaultValue: USAGE_SERVICE_USAGE_URL }),
/**
* For internal use. A list of string values (comma delimited) that will enable experimental
* type of functionality that is not yet released. Valid values for this settings need to

View file

@ -9,3 +9,4 @@ const namespace = 'elastic-system';
const USAGE_SERVICE_BASE_API_URL = `https://usage-api.${namespace}/api`;
const USAGE_SERVICE_BASE_API_URL_V1 = `${USAGE_SERVICE_BASE_API_URL}/v1`;
export const USAGE_SERVICE_USAGE_URL = `${USAGE_SERVICE_BASE_API_URL_V1}/usage`;
export const METERING_SERVICE_BATCH_SIZE = 1000;

View file

@ -10,6 +10,7 @@ import type { ElasticsearchClient } from '@kbn/core/server';
import { ENDPOINT_HEARTBEAT_INDEX } from '@kbn/security-solution-plugin/common/endpoint/constants';
import type { EndpointHeartbeat } from '@kbn/security-solution-plugin/common/endpoint/types';
import { METERING_SERVICE_BATCH_SIZE } from '../../constants';
import { ProductLine, ProductTier } from '../../../common/product';
import type { UsageRecord, MeteringCallbackInput, MeteringCallBackResponse } from '../../types';
@ -17,8 +18,6 @@ import type { ServerlessSecurityConfig } from '../../config';
import { METERING_TASK } from '../constants/metering';
const BATCH_SIZE = 1000;
export class EndpointMeteringService {
private type: ProductLine.endpoint | `${ProductLine.cloud}_${ProductLine.endpoint}` | undefined;
private tier: ProductTier | undefined;
@ -73,7 +72,7 @@ export class EndpointMeteringService {
}, [] as UsageRecord[]);
const latestTimestamp = new Date(records[records.length - 1].usage_timestamp);
const shouldRunAgain = heartbeatsResponse.hits.hits.length === BATCH_SIZE;
const shouldRunAgain = heartbeatsResponse.hits.hits.length === METERING_SERVICE_BATCH_SIZE;
return { latestTimestamp, records, shouldRunAgain };
};
@ -86,7 +85,7 @@ export class EndpointMeteringService {
{
index: ENDPOINT_HEARTBEAT_INDEX,
sort: 'event.ingested',
size: BATCH_SIZE,
size: METERING_SERVICE_BATCH_SIZE,
query: {
range: {
'event.ingested': {

View file

@ -132,7 +132,7 @@ export class SecuritySolutionServerlessPlugin
this.endpointUsageReportingTask
?.start({
taskManager: pluginsSetup.taskManager,
interval: ENDPOINT_METERING_TASK.INTERVAL,
interval: this.config.usageReportingTaskInterval,
})
.catch(() => {});

View file

@ -26,6 +26,9 @@ import type { ServerlessSecurityConfig } from '../config';
import type { SecurityUsageReportingTaskSetupContract, UsageRecord } from '../types';
import { SecurityUsageReportingTask } from './usage_reporting_task';
import { endpointMeteringService } from '../endpoint/services';
import type { SearchResponse } from '@elastic/elasticsearch/lib/api/types';
import { USAGE_SERVICE_USAGE_URL } from '../constants';
describe('SecurityUsageReportingTask', () => {
const TITLE = 'test-task-title';
@ -120,136 +123,222 @@ describe('SecurityUsageReportingTask', () => {
);
}
async function setupMocks() {
async function runTask(taskInstance = buildMockTaskInstance(), callNum: number = 0) {
const mockTaskManagerStart = tmStartMock();
await mockTask.start({ taskManager: mockTaskManagerStart, interval: '5m' });
const createTaskRunner =
mockTaskManagerSetup.registerTaskDefinitions.mock.calls[callNum][0][TYPE].createTaskRunner;
const taskRunner = createTaskRunner({ taskInstance });
return taskRunner.run();
}
async function setupBaseMocks() {
mockCore = coreSetupMock();
mockEsClient = (await mockCore.getStartServices())[0].elasticsearch.client
.asInternalUser as jest.Mocked<ElasticsearchClient>;
mockTaskManagerSetup = tmSetupMock();
usageRecord = buildUsageRecord();
reportUsageSpy = jest.spyOn(usageReportingService, 'reportUsage');
meteringCallbackMock = jest.fn().mockResolvedValueOnce({
latestRecordTimestamp: usageRecord.usage_timestamp,
records: [usageRecord],
shouldRunAgain: false,
});
taskArgs = buildTaskArgs();
mockTask = new SecurityUsageReportingTask(taskArgs);
}
beforeEach(async () => {
await setupMocks();
});
afterEach(() => {
jest.restoreAllMocks();
});
describe('task lifecycle', () => {
it('should create task', () => {
expect(mockTask).toBeInstanceOf(SecurityUsageReportingTask);
describe('meteringCallback integration', () => {
async function setupMocks() {
await setupBaseMocks();
taskArgs = buildTaskArgs({
meteringCallback: endpointMeteringService.getUsageRecords,
config: {
productTypes: [
{ product_line: ProductLine.endpoint, product_tier: ProductTier.complete },
],
usageReportingApiUrl: USAGE_SERVICE_USAGE_URL,
} as ServerlessSecurityConfig,
});
mockTask = new SecurityUsageReportingTask(taskArgs);
}
beforeEach(async () => {
await setupMocks();
});
it('should register task', () => {
expect(mockTaskManagerSetup.registerTaskDefinitions).toHaveBeenCalled();
afterEach(() => {
jest.restoreAllMocks();
});
it('should schedule task', async () => {
const mockTaskManagerStart = tmStartMock();
await mockTask.start({ taskManager: mockTaskManagerStart, interval: '5m' });
expect(mockTaskManagerStart.ensureScheduled).toHaveBeenCalled();
describe('Multiple batches', () => {
async function runTasksUntilNoRunAt() {
let task = await runTask();
while (task?.runAt !== undefined) {
task = await runTask({ ...buildMockTaskInstance({ runAt: task.runAt }) });
}
}
const heartBeats = Array.from({ length: 2001 }, (_, i) => ({
_source: {
agent: {
id: `test-${i}`,
},
event: {
ingested: '2021-09-01T00:00:00.000Z',
},
},
}));
const batches = [
heartBeats.slice(0, 1000),
heartBeats.slice(1000, 2000),
heartBeats.slice(2000),
];
it('properly reports multiple batches', async () => {
batches.forEach((batch) => {
mockEsClient.search.mockResolvedValueOnce({
hits: {
hits: batch,
},
} as SearchResponse);
});
await runTasksUntilNoRunAt();
expect(reportUsageSpy).toHaveBeenCalledTimes(3);
batches.forEach((batch, i) => {
expect(reportUsageSpy).toHaveBeenNthCalledWith(
i + 1,
expect.arrayContaining(
batch.map(({ _source }) =>
expect.objectContaining({
id: `endpoint-${_source.agent.id}-2021-09-01T00:00:00.000Z`,
})
)
),
USAGE_SERVICE_USAGE_URL
);
});
});
});
});
describe('task logic', () => {
async function runTask(taskInstance = buildMockTaskInstance(), callNum: number = 0) {
const mockTaskManagerStart = tmStartMock();
await mockTask.start({ taskManager: mockTaskManagerStart, interval: '5m' });
const createTaskRunner =
mockTaskManagerSetup.registerTaskDefinitions.mock.calls[callNum][0][TYPE].createTaskRunner;
const taskRunner = createTaskRunner({ taskInstance });
return taskRunner.run();
describe('Mocked meteringCallback', () => {
async function setupMocks() {
await setupBaseMocks();
meteringCallbackMock = jest.fn().mockResolvedValueOnce({
latestRecordTimestamp: usageRecord.usage_timestamp,
records: [usageRecord],
shouldRunAgain: false,
});
taskArgs = buildTaskArgs({
config: {
usageReportingApiUrl: USAGE_SERVICE_USAGE_URL,
} as ServerlessSecurityConfig,
});
mockTask = new SecurityUsageReportingTask(taskArgs);
}
it('should call metering callback', async () => {
const task = await runTask();
expect(meteringCallbackMock).toHaveBeenCalledWith(
expect.objectContaining({
esClient: mockEsClient,
cloudSetup: taskArgs.cloudSetup,
taskId: TASK_ID,
config: taskArgs.config,
lastSuccessfulReport: new Date(task?.state.lastSuccessfulReport as string),
})
);
beforeEach(async () => {
await setupMocks();
});
it('should report metering records', async () => {
await runTask();
expect(reportUsageSpy).toHaveBeenCalledWith(
expect.arrayContaining([
afterEach(() => {
jest.restoreAllMocks();
});
describe('task lifecycle', () => {
it('should create task', () => {
expect(mockTask).toBeInstanceOf(SecurityUsageReportingTask);
});
it('should register task', () => {
expect(mockTaskManagerSetup.registerTaskDefinitions).toHaveBeenCalled();
});
it('should schedule task', async () => {
const mockTaskManagerStart = tmStartMock();
await mockTask.start({ taskManager: mockTaskManagerStart, interval: '5m' });
expect(mockTaskManagerStart.ensureScheduled).toHaveBeenCalled();
});
});
describe('task logic', () => {
it('should call metering callback', async () => {
const task = await runTask();
expect(meteringCallbackMock).toHaveBeenCalledWith(
expect.objectContaining({
creation_timestamp: usageRecord.creation_timestamp,
id: usageRecord.id,
source: {
id: TASK_ID,
instance_group_id: PROJECT_ID,
metadata: { tier: ProductTier.complete },
},
usage: { period_seconds: 3600, quantity: 1, type: USAGE_TYPE },
usage_timestamp: usageRecord.usage_timestamp,
}),
])
);
});
it('should do nothing if task instance id is outdated', async () => {
const result = await runTask({ ...buildMockTaskInstance(), id: 'old-id' });
expect(result).toEqual(getDeleteTaskRunResult());
expect(reportUsageSpy).not.toHaveBeenCalled();
expect(meteringCallbackMock).not.toHaveBeenCalled();
});
describe('lastSuccessfulReport', () => {
it('should set lastSuccessfulReport correctly if report success', async () => {
reportUsageSpy.mockResolvedValueOnce({ status: 201 });
const taskInstance = buildMockTaskInstance();
const task = await runTask(taskInstance);
const newLastSuccessfulReport = task?.state.lastSuccessfulReport;
expect(newLastSuccessfulReport).toEqual(expect.any(String));
expect(newLastSuccessfulReport).not.toEqual(taskInstance.state.lastSuccessfulReport);
esClient: mockEsClient,
cloudSetup: taskArgs.cloudSetup,
taskId: TASK_ID,
config: taskArgs.config,
lastSuccessfulReport: new Date(task?.state.lastSuccessfulReport as string),
})
);
});
it('should set lastSuccessfulReport correctly if no usage records found', async () => {
meteringCallbackMock.mockResolvedValueOnce([]);
const taskInstance = buildMockTaskInstance({ state: { lastSuccessfulReport: null } });
const task = await runTask(taskInstance);
const newLastSuccessfulReport = task?.state.lastSuccessfulReport;
expect(newLastSuccessfulReport).toEqual(expect.any(String));
expect(newLastSuccessfulReport).not.toEqual(taskInstance.state.lastSuccessfulReport);
it('should report metering records', async () => {
await runTask();
expect(reportUsageSpy).toHaveBeenCalledWith(
expect.arrayContaining([
expect.objectContaining({
creation_timestamp: usageRecord.creation_timestamp,
id: usageRecord.id,
source: {
id: TASK_ID,
instance_group_id: PROJECT_ID,
metadata: { tier: ProductTier.complete },
},
usage: { period_seconds: 3600, quantity: 1, type: USAGE_TYPE },
usage_timestamp: usageRecord.usage_timestamp,
}),
]),
USAGE_SERVICE_USAGE_URL
);
});
describe('and response is NOT 201', () => {
beforeEach(() => {
reportUsageSpy.mockResolvedValueOnce({ status: 500 });
});
it('should do nothing if task instance id is outdated', async () => {
const result = await runTask({ ...buildMockTaskInstance(), id: 'old-id' });
it('should set lastSuccessfulReport correctly', async () => {
const lastSuccessfulReport = new Date(new Date().setMinutes(-15)).toISOString();
const taskInstance = buildMockTaskInstance({ state: { lastSuccessfulReport } });
expect(result).toEqual(getDeleteTaskRunResult());
expect(reportUsageSpy).not.toHaveBeenCalled();
expect(meteringCallbackMock).not.toHaveBeenCalled();
});
describe('lastSuccessfulReport', () => {
it('should set lastSuccessfulReport correctly if report success', async () => {
reportUsageSpy.mockResolvedValueOnce({ status: 201 });
const taskInstance = buildMockTaskInstance();
const task = await runTask(taskInstance);
const newLastSuccessfulReport = task?.state.lastSuccessfulReport;
expect(newLastSuccessfulReport).toEqual(taskInstance.state.lastSuccessfulReport);
expect(newLastSuccessfulReport).toEqual(expect.any(String));
expect(newLastSuccessfulReport).not.toEqual(taskInstance.state.lastSuccessfulReport);
});
it('should set lastSuccessfulReport correctly if previously null', async () => {
it('should set lastSuccessfulReport correctly if no usage records found', async () => {
meteringCallbackMock.mockResolvedValueOnce([]);
const taskInstance = buildMockTaskInstance({ state: { lastSuccessfulReport: null } });
const task = await runTask(taskInstance);
const newLastSuccessfulReport = task?.state.lastSuccessfulReport;
expect(newLastSuccessfulReport).toEqual(expect.any(String));
expect(newLastSuccessfulReport).not.toEqual(taskInstance.state.lastSuccessfulReport);
});
describe('and response is NOT 201', () => {
beforeEach(() => {
reportUsageSpy.mockResolvedValueOnce({ status: 500 });
});
it('should set lastSuccessfulReport correctly', async () => {
const lastSuccessfulReport = new Date(new Date().setMinutes(-15)).toISOString();
const taskInstance = buildMockTaskInstance({ state: { lastSuccessfulReport } });
const task = await runTask(taskInstance);
const newLastSuccessfulReport = task?.state.lastSuccessfulReport;
expect(newLastSuccessfulReport).toEqual(taskInstance.state.lastSuccessfulReport);
});
it('should set lastSuccessfulReport correctly if previously null', async () => {
const taskInstance = buildMockTaskInstance({ state: { lastSuccessfulReport: null } });
const task = await runTask(taskInstance);
const newLastSuccessfulReport = task?.state.lastSuccessfulReport;
expect(newLastSuccessfulReport).toEqual(expect.any(String));
});
});
});
});

View file

@ -23,18 +23,17 @@ import type { ServerlessSecurityConfig } from '../config';
import { stateSchemaByVersion, emptyState } from './task_state';
const SCOPE = ['serverlessSecurity'];
const TIMEOUT = '1m';
export const VERSION = '1.0.0';
export class SecurityUsageReportingTask {
private wasStarted: boolean = false;
private cloudSetup: CloudSetup;
private taskType: string;
private version: string;
private logger: Logger;
private abortController = new AbortController();
private config: ServerlessSecurityConfig;
private readonly cloudSetup: CloudSetup;
private readonly taskType: string;
private readonly version: string;
private readonly logger: Logger;
private readonly config: ServerlessSecurityConfig;
constructor(setupContract: SecurityUsageReportingTaskSetupContract) {
const {
@ -59,7 +58,7 @@ export class SecurityUsageReportingTask {
taskManager.registerTaskDefinitions({
[taskType]: {
title: taskTitle,
timeout: TIMEOUT,
timeout: this.config.usageReportingTaskTimeout,
stateSchemaByVersion,
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
return {
@ -79,7 +78,7 @@ export class SecurityUsageReportingTask {
public start = async ({ taskManager, interval }: SecurityUsageReportingTaskStartContract) => {
if (!taskManager) {
this.logger.error(`missing required taskmanager service during start of ${this.taskType}`);
this.logger.error(`missing required task manager service during start of ${this.taskType}`);
return;
}
@ -133,7 +132,7 @@ export class SecurityUsageReportingTask {
let usageRecords: UsageRecord[] = [];
let latestRecordTimestamp: Date | undefined;
let shouldRunAgain = false;
// save usage record query time so we can use it to know where
// save usage record query time, so we can use it to know where
// the next query range should start
const meteringCallbackTime = new Date();
try {
@ -162,7 +161,12 @@ export class SecurityUsageReportingTask {
if (usageRecords.length !== 0) {
try {
usageReportResponse = await usageReportingService.reportUsage(usageRecords);
this.logger.debug(`Sending ${usageRecords.length} usage records to the API`);
usageReportResponse = await usageReportingService.reportUsage(
usageRecords,
this.config.usageReportingApiUrl
);
if (!usageReportResponse.ok) {
const errorResponse = await usageReportResponse.json();
@ -170,7 +174,7 @@ export class SecurityUsageReportingTask {
return { state: taskInstance.state, runAt: new Date() };
}
this.logger.info(
this.logger.debug(
`(${
usageRecords.length
}) usage records starting from ${lastSuccessfulReport.toISOString()} were sent successfully: ${

View file

@ -1510,6 +1510,15 @@
resolved "https://registry.yarnpkg.com/@csstools/selector-specificity/-/selector-specificity-2.0.1.tgz#b6b8d81780b9a9f6459f4bfe9226ac6aefaefe87"
integrity sha512-aG20vknL4/YjQF9BSV7ts4EWm/yrjagAN7OWBNmlbEOUiu0llj4OGrFoOKK3g2vey4/p2omKCoHrWtPxSwV3HA==
"@cypress/debugging-proxy@2.0.1":
version "2.0.1"
resolved "https://registry.yarnpkg.com/@cypress/debugging-proxy/-/debugging-proxy-2.0.1.tgz#c0ef569af4c2efb6ceb329e5813bc770b133ab9e"
integrity sha512-kUFQSNQfvZUO5yqtiir65FPPJZKPQaA68trYiKlHuWhDu9sopjIAXoZv/ff0GbLSNDHdxIuF0VyJhJIRRHuhAw==
dependencies:
debug "^4.1.1"
http-proxy "^1.17.0"
self-signed-cert "^1.0.1"
"@cypress/grep@^4.0.1":
version "4.0.1"
resolved "https://registry.yarnpkg.com/@cypress/grep/-/grep-4.0.1.tgz#bce679f85da286c4979bb9ffc79b2782dc5b75c6"
@ -19489,7 +19498,7 @@ http-proxy-middleware@^2.0.3:
is-plain-obj "^3.0.0"
micromatch "^4.0.2"
http-proxy@^1.18.1:
http-proxy@^1.17.0, http-proxy@^1.18.1:
version "1.18.1"
resolved "https://registry.yarnpkg.com/http-proxy/-/http-proxy-1.18.1.tgz#401541f0534884bbf95260334e72f88ee3976549"
integrity sha512-7mz/721AbnJwIVbnaSv1Cz3Am0ZLT/UBwkC92VlxhXv/k/BBQfM2fXElQNC27BVGr0uwUpplYPQM9LnaBMR5NQ==
@ -28089,6 +28098,11 @@ selenium-webdriver@^4.21.0:
tmp "^0.2.3"
ws ">=8.16.0"
self-signed-cert@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/self-signed-cert/-/self-signed-cert-1.0.1.tgz#9e2fae07503f84fb910f4a87c5c5c6becc207bf6"
integrity sha512-86d1jYydqaRdUEyR9tj5nQ0d059RUWB9gdZrzDy2MJaUHii1h9EyzbAepkV1rOLO2AkSkQbXtUrqWRH4FDYWHA==
selfsigned@^2.0.1:
version "2.0.1"
resolved "https://registry.yarnpkg.com/selfsigned/-/selfsigned-2.0.1.tgz#8b2df7fa56bf014d19b6007655fff209c0ef0a56"