Push the total request-body-bytes to usage-api (#194429)

Resolves: https://github.com/elastic/response-ops-team/issues/209 

This PR is a follow-on of https://github.com/elastic/kibana/pull/186804.

Creates a new task that runs every 1 hour to push the total
connector-request-body-bytes that have been saved in the event log to
usage-api.
This commit is contained in:
Ersin Erdal 2024-10-31 20:30:16 +01:00 committed by GitHub
parent a7a09f798f
commit 216f899621
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 792 additions and 15 deletions

View file

@ -26,7 +26,8 @@
"spaces",
"security",
"monitoringCollection",
"serverless"
"serverless",
"cloud"
],
"extraPublicDirs": [
"common"

View file

@ -51,6 +51,7 @@ import { OAuthParams } from '../routes/get_oauth_access_token';
import { eventLogClientMock } from '@kbn/event-log-plugin/server/event_log_client.mock';
import { GetGlobalExecutionKPIParams, GetGlobalExecutionLogParams } from '../../common';
import { estypes } from '@elastic/elasticsearch';
import { DEFAULT_USAGE_API_URL } from '../config';
jest.mock('@kbn/core-saved-objects-utils-server', () => {
const actual = jest.requireActual('@kbn/core-saved-objects-utils-server');
@ -588,6 +589,9 @@ describe('create()', () => {
microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL,
microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE,
microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL,
usage: {
url: DEFAULT_USAGE_API_URL,
},
});
const localActionTypeRegistryParams = {

View file

@ -6,7 +6,7 @@
*/
import { ByteSizeValue } from '@kbn/config-schema';
import { ActionsConfig } from './config';
import { ActionsConfig, DEFAULT_USAGE_API_URL } from './config';
import {
DEFAULT_MICROSOFT_EXCHANGE_URL,
DEFAULT_MICROSOFT_GRAPH_API_SCOPE,
@ -42,6 +42,9 @@ const defaultActionsConfig: ActionsConfig = {
microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL,
microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE,
microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL,
usage: {
url: DEFAULT_USAGE_API_URL,
},
};
describe('ensureUriAllowed', () => {

View file

@ -38,6 +38,9 @@ describe('config validation', () => {
"proxyRejectUnauthorizedCertificates": true,
"rejectUnauthorized": true,
"responseTimeout": "PT1M",
"usage": Object {
"url": "https://usage-api.usage-api/api/v1/usage",
},
}
`);
});
@ -85,6 +88,9 @@ describe('config validation', () => {
"proxyRejectUnauthorizedCertificates": false,
"rejectUnauthorized": false,
"responseTimeout": "PT1M",
"usage": Object {
"url": "https://usage-api.usage-api/api/v1/usage",
},
}
`);
});
@ -225,6 +231,9 @@ describe('config validation', () => {
"proxyVerificationMode": "none",
"verificationMode": "none",
},
"usage": Object {
"url": "https://usage-api.usage-api/api/v1/usage",
},
}
`);
});

View file

@ -72,6 +72,8 @@ const connectorTypeSchema = schema.object({
maxAttempts: schema.maybe(schema.number({ min: MIN_MAX_ATTEMPTS, max: MAX_MAX_ATTEMPTS })),
});
export const DEFAULT_USAGE_API_URL = 'https://usage-api.usage-api/api/v1/usage';
// We leverage enabledActionTypes list by allowing the other plugins to overwrite it by using "setEnabledConnectorTypes" in the plugin setup.
// The list can be overwritten only if it's not already been set in the config.
const enabledConnectorTypesSchema = schema.arrayOf(
@ -145,15 +147,14 @@ export const configSchema = schema.object({
max: schema.maybe(schema.number({ min: MIN_QUEUED_MAX, defaultValue: DEFAULT_QUEUED_MAX })),
})
),
usage: schema.maybe(
schema.object({
ca: schema.maybe(
schema.object({
path: schema.string(),
})
),
})
),
usage: schema.object({
url: schema.string({ defaultValue: DEFAULT_USAGE_API_URL }),
ca: schema.maybe(
schema.object({
path: schema.string(),
})
),
}),
});
export type ActionsConfig = TypeOf<typeof configSchema>;

View file

@ -20,7 +20,7 @@ import { ByteSizeValue } from '@kbn/config-schema';
import { Logger } from '@kbn/core/server';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { createReadySignal } from '@kbn/event-log-plugin/server/lib/ready_signal';
import { ActionsConfig } from '../config';
import { ActionsConfig, DEFAULT_USAGE_API_URL } from '../config';
import { ActionsConfigurationUtilities, getActionsConfigurationUtilities } from '../actions_config';
import { resolveCustomHosts } from '../lib/custom_host_settings';
import {
@ -691,6 +691,9 @@ const BaseActionsConfig: ActionsConfig = {
microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL,
microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE,
microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL,
usage: {
url: DEFAULT_USAGE_API_URL,
},
};
function getACUfromConfig(config: Partial<ActionsConfig> = {}): ActionsConfigurationUtilities {

View file

@ -20,7 +20,7 @@ import { ByteSizeValue } from '@kbn/config-schema';
import { Logger } from '@kbn/core/server';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { createReadySignal } from '@kbn/event-log-plugin/server/lib/ready_signal';
import { ActionsConfig } from '../config';
import { ActionsConfig, DEFAULT_USAGE_API_URL } from '../config';
import { ActionsConfigurationUtilities, getActionsConfigurationUtilities } from '../actions_config';
import { resolveCustomHosts } from '../lib/custom_host_settings';
import {
@ -597,6 +597,9 @@ const BaseActionsConfig: ActionsConfig = {
microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL,
microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE,
microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL,
usage: {
url: DEFAULT_USAGE_API_URL,
},
};
function getACUfromConfig(config: Partial<ActionsConfig> = {}): ActionsConfigurationUtilities {

View file

@ -10,7 +10,7 @@ import { resolve as pathResolve, join as pathJoin } from 'path';
import { ByteSizeValue } from '@kbn/config-schema';
import moment from 'moment';
import { ActionsConfig } from '../config';
import { ActionsConfig, DEFAULT_USAGE_API_URL } from '../config';
import { Logger } from '@kbn/core/server';
import { loggingSystemMock } from '@kbn/core/server/mocks';
@ -82,6 +82,9 @@ describe('custom_host_settings', () => {
microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL,
microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE,
microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL,
usage: {
url: DEFAULT_USAGE_API_URL,
},
};
test('ensure it copies over the config parts that it does not touch', () => {

View file

@ -30,6 +30,7 @@ import {
DEFAULT_MICROSOFT_GRAPH_API_SCOPE,
DEFAULT_MICROSOFT_GRAPH_API_URL,
} from '../common';
import { cloudMock } from '@kbn/cloud-plugin/server/mocks';
const executor: ExecutorType<{}, {}, {}, void> = async (options) => {
return { status: 'ok', actionId: options.actionId };
@ -59,6 +60,9 @@ function getConfig(overrides = {}) {
microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL,
microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE,
microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL,
usage: {
url: 'ca.path',
},
...overrides,
};
}
@ -84,6 +88,9 @@ describe('Actions Plugin', () => {
microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL,
microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE,
microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL,
usage: {
url: 'ca.path',
},
});
plugin = new ActionsPlugin(context);
coreSetup = coreMock.createSetup();
@ -95,6 +102,7 @@ describe('Actions Plugin', () => {
eventLog: eventLogMock.createSetup(),
usageCollection: usageCollectionPluginMock.createSetupContract(),
features: featuresPluginMock.createSetup(),
cloud: cloudMock.createSetup(),
};
coreSetup.getStartServices.mockResolvedValue([
coreMock.createStart(),
@ -347,6 +355,7 @@ describe('Actions Plugin', () => {
eventLog: eventLogMock.createSetup(),
usageCollection: usageCollectionPluginMock.createSetupContract(),
features: featuresPluginMock.createSetup(),
cloud: cloudMock.createSetup(),
};
}
@ -374,6 +383,7 @@ describe('Actions Plugin', () => {
usageCollection: usageCollectionPluginMock.createSetupContract(),
features: featuresPluginMock.createSetup(),
serverless: serverlessPluginMock.createSetupContract(),
cloud: cloudMock.createSetup(),
};
}
@ -585,6 +595,9 @@ describe('Actions Plugin', () => {
microsoftGraphApiUrl: DEFAULT_MICROSOFT_GRAPH_API_URL,
microsoftGraphApiScope: DEFAULT_MICROSOFT_GRAPH_API_SCOPE,
microsoftExchangeUrl: DEFAULT_MICROSOFT_EXCHANGE_URL,
usage: {
url: 'ca.path',
},
});
plugin = new ActionsPlugin(context);
coreSetup = coreMock.createSetup();
@ -596,6 +609,7 @@ describe('Actions Plugin', () => {
eventLog: eventLogMock.createSetup(),
usageCollection: usageCollectionPluginMock.createSetupContract(),
features: featuresPluginMock.createSetup(),
cloud: cloudMock.createSetup(),
};
pluginsStart = {
licensing: licensingMock.createStart(),
@ -680,6 +694,7 @@ describe('Actions Plugin', () => {
eventLog: eventLogMock.createSetup(),
usageCollection: usageCollectionPluginMock.createSetupContract(),
features: featuresPluginMock.createSetup(),
cloud: cloudMock.createSetup(),
};
pluginsStart = {
licensing: licensingMock.createStart(),

View file

@ -42,6 +42,7 @@ import {
import { MonitoringCollectionSetup } from '@kbn/monitoring-collection-plugin/server';
import { ServerlessPluginSetup, ServerlessPluginStart } from '@kbn/serverless/server';
import type { CloudSetup } from '@kbn/cloud-plugin/server';
import { ActionsConfig, AllowedHosts, EnabledConnectorTypes, getValidatedConfig } from './config';
import { resolveCustomHosts } from './lib/custom_host_settings';
import { events } from './lib/event_based_telemetry';
@ -108,6 +109,7 @@ import type { IUnsecuredActionsClient } from './unsecured_actions_client/unsecur
import { UnsecuredActionsClient } from './unsecured_actions_client/unsecured_actions_client';
import { createBulkUnsecuredExecutionEnqueuerFunction } from './create_unsecured_execute_function';
import { createSystemConnectors } from './create_system_actions';
import { ConnectorUsageReportingTask } from './usage/connector_usage_reporting_task';
export interface PluginSetupContract {
registerType<
@ -180,6 +182,7 @@ export interface ActionsPluginsSetup {
spaces?: SpacesPluginSetup;
monitoringCollection?: MonitoringCollectionSetup;
serverless?: ServerlessPluginSetup;
cloud: CloudSetup;
}
export interface ActionsPluginsStart {
@ -214,6 +217,7 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
private readonly telemetryLogger: Logger;
private inMemoryConnectors: InMemoryConnector[];
private inMemoryMetrics: InMemoryMetrics;
private connectorUsageReportingTask: ConnectorUsageReportingTask | undefined;
constructor(initContext: PluginInitializerContext) {
this.logger = initContext.logger.get();
@ -323,6 +327,15 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
this.getInMemoryConnectors,
eventLogIndex
);
this.connectorUsageReportingTask = new ConnectorUsageReportingTask({
logger: this.logger,
eventLogIndex,
core,
taskManager: plugins.taskManager,
projectId: plugins.cloud.serverless.projectId,
config: this.actionsConfig.usage,
});
}
// Usage counter for telemetry
@ -593,6 +606,8 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
this.validateEnabledConnectorTypes(plugins);
this.connectorUsageReportingTask?.start(plugins.taskManager).catch(() => {});
return {
isActionTypeEnabled: (id, options = { notifyUsage: false }) => {
return this.actionTypeRegistry!.isActionTypeEnabled(id, options);

View file

@ -0,0 +1,394 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import fs from 'fs';
import axios from 'axios';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { coreMock } from '@kbn/core/server/mocks';
import {
TaskManagerSetupContract,
TaskManagerStartContract,
TaskStatus,
} from '@kbn/task-manager-plugin/server';
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
import {
CONNECTOR_USAGE_REPORTING_SOURCE_ID,
CONNECTOR_USAGE_REPORTING_TASK_ID,
CONNECTOR_USAGE_REPORTING_TASK_SCHEDULE,
CONNECTOR_USAGE_REPORTING_TASK_TYPE,
ConnectorUsageReportingTask,
} from './connector_usage_reporting_task';
import type { CoreSetup, ElasticsearchClient } from '@kbn/core/server';
import { ActionsPluginsStart } from '../plugin';
import { SearchResponse } from '@elastic/elasticsearch/lib/api/types';
jest.mock('axios');
const mockedAxiosPost = jest.spyOn(axios, 'post');
const nowStr = '2024-01-01T12:00:00.000Z';
const nowDate = new Date(nowStr);
jest.useFakeTimers();
jest.setSystemTime(nowDate.getTime());
const readFileSpy = jest.spyOn(fs, 'readFileSync');
describe('ConnectorUsageReportingTask', () => {
const logger = loggingSystemMock.createLogger();
const { createSetup } = coreMock;
const { createSetup: taskManagerSetupMock, createStart: taskManagerStartMock } = taskManagerMock;
let mockEsClient: jest.Mocked<ElasticsearchClient>;
let mockCore: CoreSetup<ActionsPluginsStart>;
let mockTaskManagerSetup: jest.Mocked<TaskManagerSetupContract>;
let mockTaskManagerStart: jest.Mocked<TaskManagerStartContract>;
beforeEach(async () => {
mockTaskManagerSetup = taskManagerSetupMock();
mockTaskManagerStart = taskManagerStartMock();
mockCore = createSetup();
mockEsClient = (await mockCore.getStartServices())[0].elasticsearch.client
.asInternalUser as jest.Mocked<ElasticsearchClient>;
});
afterEach(() => {
jest.resetAllMocks();
});
const createTaskRunner = async ({
lastReportedUsageDate,
projectId,
attempts = 0,
}: {
lastReportedUsageDate: Date;
projectId?: string;
attempts?: number;
}) => {
const timestamp = new Date(new Date().setMinutes(-15));
const task = new ConnectorUsageReportingTask({
eventLogIndex: 'test-index',
projectId,
logger,
core: mockCore,
taskManager: mockTaskManagerSetup,
config: {
url: 'usage-api',
ca: {
path: './ca.crt',
},
},
});
await task.start(mockTaskManagerStart);
const createTaskRunnerFunction =
mockTaskManagerSetup.registerTaskDefinitions.mock.calls[0][0][
CONNECTOR_USAGE_REPORTING_TASK_TYPE
].createTaskRunner;
return createTaskRunnerFunction({
taskInstance: {
id: CONNECTOR_USAGE_REPORTING_TASK_ID,
runAt: timestamp,
attempts: 0,
ownerId: '',
status: TaskStatus.Running,
startedAt: timestamp,
scheduledAt: timestamp,
retryAt: null,
params: {},
state: {
lastReportedUsageDate,
attempts,
},
taskType: CONNECTOR_USAGE_REPORTING_TASK_TYPE,
},
});
};
it('registers the task', async () => {
readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---');
new ConnectorUsageReportingTask({
eventLogIndex: 'test-index',
projectId: 'test-projecr',
logger,
core: createSetup(),
taskManager: mockTaskManagerSetup,
config: {
url: 'usage-api',
ca: {
path: './ca.crt',
},
},
});
expect(mockTaskManagerSetup.registerTaskDefinitions).toBeCalledTimes(1);
expect(mockTaskManagerSetup.registerTaskDefinitions).toHaveBeenCalledWith({
[CONNECTOR_USAGE_REPORTING_TASK_TYPE]: {
title: 'Connector usage reporting task',
timeout: '1m',
createTaskRunner: expect.any(Function),
},
});
});
it('schedules the task', async () => {
readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---');
const core = createSetup();
const taskManagerStart = taskManagerStartMock();
const task = new ConnectorUsageReportingTask({
eventLogIndex: 'test-index',
projectId: 'test-projecr',
logger,
core,
taskManager: mockTaskManagerSetup,
config: {
url: 'usage-api',
ca: {
path: './ca.crt',
},
},
});
await task.start(taskManagerStart);
expect(taskManagerStart.ensureScheduled).toBeCalledTimes(1);
expect(taskManagerStart.ensureScheduled).toHaveBeenCalledWith({
id: CONNECTOR_USAGE_REPORTING_TASK_ID,
taskType: CONNECTOR_USAGE_REPORTING_TASK_TYPE,
schedule: {
...CONNECTOR_USAGE_REPORTING_TASK_SCHEDULE,
},
state: {},
params: {},
});
});
it('logs error if task manager is not ready', async () => {
readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---');
const core = createSetup();
const taskManagerStart = taskManagerStartMock();
const task = new ConnectorUsageReportingTask({
eventLogIndex: 'test-index',
projectId: 'test-projecr',
logger,
core,
taskManager: mockTaskManagerSetup,
config: {
url: 'usage-api',
ca: {
path: './ca.crt',
},
},
});
await task.start();
expect(taskManagerStart.ensureScheduled).not.toBeCalled();
expect(logger.error).toHaveBeenCalledWith(
`Missing required task manager service during start of ${CONNECTOR_USAGE_REPORTING_TASK_TYPE}`
);
});
it('logs error if scheduling task fails', async () => {
readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---');
const core = createSetup();
const taskManagerStart = taskManagerStartMock();
taskManagerStart.ensureScheduled.mockRejectedValue(new Error('test'));
const task = new ConnectorUsageReportingTask({
eventLogIndex: 'test-index',
projectId: 'test-projecr',
logger,
core,
taskManager: mockTaskManagerSetup,
config: {
url: 'usage-api',
ca: {
path: './ca.crt',
},
},
});
await task.start(taskManagerStart);
expect(logger.error).toHaveBeenCalledWith(
'Error scheduling task actions:connector_usage_reporting, received test'
);
});
it('returns the existing state and logs a warning when project id is missing', async () => {
const lastReportedUsageDateStr = '2024-01-01T00:00:00.000Z';
const lastReportedUsageDate = new Date(lastReportedUsageDateStr);
const taskRunner = await createTaskRunner({ lastReportedUsageDate });
const response = await taskRunner.run();
expect(logger.warn).toHaveBeenCalledWith(
'Missing required project id while running actions:connector_usage_reporting'
);
expect(response).toEqual({
state: {
attempts: 0,
lastReportedUsageDate,
},
});
});
it('returns the existing state and logs an error when the CA Certificate is missing', async () => {
const lastReportedUsageDateStr = '2024-01-01T00:00:00.000Z';
const lastReportedUsageDate = new Date(lastReportedUsageDateStr);
readFileSpy.mockImplementationOnce((func) => {
throw new Error('Mock file read error.');
});
const taskRunner = await createTaskRunner({ lastReportedUsageDate, projectId: 'test-id' });
const response = await taskRunner.run();
expect(logger.error).toHaveBeenCalledTimes(2);
expect(logger.error).toHaveBeenNthCalledWith(
1,
`CA Certificate for the project "test-id" couldn't be loaded, Error: Mock file read error.`
);
expect(logger.error).toHaveBeenNthCalledWith(
2,
'Missing required CA Certificate while running actions:connector_usage_reporting'
);
expect(response).toEqual({
state: {
attempts: 0,
lastReportedUsageDate,
},
});
});
it('runs the task', async () => {
readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---');
mockEsClient.search.mockResolvedValueOnce({
aggregations: { total_usage: 215 },
} as SearchResponse<unknown, unknown>);
mockedAxiosPost.mockResolvedValueOnce(200);
const lastReportedUsageDateStr = '2024-01-01T00:00:00.000Z';
const lastReportedUsageDate = new Date(lastReportedUsageDateStr);
const taskRunner = await createTaskRunner({ lastReportedUsageDate, projectId: 'test-project' });
const response = await taskRunner.run();
const report = [
{
creation_timestamp: nowStr,
id: 'connector-request-body-bytes-test-project-2024-01-01T12:00:00.000Z',
source: {
id: CONNECTOR_USAGE_REPORTING_SOURCE_ID,
instance_group_id: 'test-project',
},
usage: {
period_seconds: 43200,
quantity: 0,
type: 'connector_request_body_bytes',
},
usage_timestamp: nowStr,
},
];
expect(mockedAxiosPost).toHaveBeenCalledWith('usage-api', report, {
headers: { 'Content-Type': 'application/json' },
timeout: 30000,
httpsAgent: expect.any(Object),
});
expect(response).toEqual({
state: {
attempts: 0,
lastReportedUsageDate: expect.any(Date),
},
});
});
it('re-runs the task when search for records fails', async () => {
readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---');
mockEsClient.search.mockRejectedValue(new Error('500'));
mockedAxiosPost.mockResolvedValueOnce(200);
const lastReportedUsageDate = new Date('2024-01-01T00:00:00.000Z');
const taskRunner = await createTaskRunner({ lastReportedUsageDate, projectId: 'test-project' });
const response = await taskRunner.run();
expect(response).toEqual({
state: {
lastReportedUsageDate,
attempts: 0,
},
runAt: nowDate,
});
});
it('re-runs the task when it fails to push the usage record', async () => {
readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---');
mockEsClient.search.mockResolvedValueOnce({
aggregations: { total_usage: 215 },
} as SearchResponse<unknown, unknown>);
mockedAxiosPost.mockRejectedValueOnce(500);
const lastReportedUsageDate = new Date('2024-01-01T00:00:00.000Z');
const taskRunner = await createTaskRunner({ lastReportedUsageDate, projectId: 'test-project' });
const response = await taskRunner.run();
expect(response).toEqual({
state: {
lastReportedUsageDate,
attempts: 1,
},
runAt: new Date(nowDate.getTime() + 60000), // After a min
});
});
it('stops retrying after 5 attempts', async () => {
readFileSpy.mockImplementationOnce(() => '---CA CERTIFICATE---');
mockEsClient.search.mockResolvedValueOnce({
aggregations: { total_usage: 215 },
} as SearchResponse<unknown, unknown>);
mockedAxiosPost.mockRejectedValueOnce(new Error('test-error'));
const lastReportedUsageDate = new Date('2024-01-01T00:00:00.000Z');
const taskRunner = await createTaskRunner({
lastReportedUsageDate,
projectId: 'test-project',
attempts: 4,
});
const response = await taskRunner.run();
expect(response).toEqual({
state: {
lastReportedUsageDate,
attempts: 0,
},
});
expect(logger.error).toHaveBeenCalledWith(
'Usage data could not be pushed to usage-api. Stopped retrying after 5 attempts. Error:test-error'
);
});
});

View file

@ -0,0 +1,309 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import fs from 'fs';
import { Logger, CoreSetup, type ElasticsearchClient } from '@kbn/core/server';
import {
IntervalSchedule,
type ConcreteTaskInstance,
TaskManagerStartContract,
TaskManagerSetupContract,
} from '@kbn/task-manager-plugin/server';
import { AggregationsSumAggregate } from '@elastic/elasticsearch/lib/api/types';
import axios from 'axios';
import https from 'https';
import { ActionsConfig } from '../config';
import { ConnectorUsageReport } from './types';
import { ActionsPluginsStart } from '../plugin';
export const CONNECTOR_USAGE_REPORTING_TASK_SCHEDULE: IntervalSchedule = { interval: '1h' };
export const CONNECTOR_USAGE_REPORTING_TASK_ID = 'connector_usage_reporting';
export const CONNECTOR_USAGE_REPORTING_TASK_TYPE = `actions:${CONNECTOR_USAGE_REPORTING_TASK_ID}`;
export const CONNECTOR_USAGE_REPORTING_TASK_TIMEOUT = 30000;
export const CONNECTOR_USAGE_TYPE = `connector_request_body_bytes`;
export const CONNECTOR_USAGE_REPORTING_SOURCE_ID = `task-connector-usage-report`;
export const MAX_PUSH_ATTEMPTS = 5;
export class ConnectorUsageReportingTask {
private readonly logger: Logger;
private readonly eventLogIndex: string;
private readonly projectId: string | undefined;
private readonly caCertificate: string | undefined;
private readonly usageApiUrl: string;
constructor({
logger,
eventLogIndex,
core,
taskManager,
projectId,
config,
}: {
logger: Logger;
eventLogIndex: string;
core: CoreSetup<ActionsPluginsStart>;
taskManager: TaskManagerSetupContract;
projectId: string | undefined;
config: ActionsConfig['usage'];
}) {
this.logger = logger;
this.projectId = projectId;
this.eventLogIndex = eventLogIndex;
this.usageApiUrl = config.url;
const caCertificatePath = config.ca?.path;
if (caCertificatePath && caCertificatePath.length > 0) {
try {
this.caCertificate = fs.readFileSync(caCertificatePath, 'utf8');
} catch (e) {
this.caCertificate = undefined;
this.logger.error(
`CA Certificate for the project "${projectId}" couldn't be loaded, Error: ${e.message}`
);
}
}
taskManager.registerTaskDefinitions({
[CONNECTOR_USAGE_REPORTING_TASK_TYPE]: {
title: 'Connector usage reporting task',
timeout: '1m',
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
return {
run: async () => this.runTask(taskInstance, core),
cancel: async () => {},
};
},
},
});
}
public start = async (taskManager?: TaskManagerStartContract) => {
if (!taskManager) {
this.logger.error(
`Missing required task manager service during start of ${CONNECTOR_USAGE_REPORTING_TASK_TYPE}`
);
return;
}
try {
await taskManager.ensureScheduled({
id: CONNECTOR_USAGE_REPORTING_TASK_ID,
taskType: CONNECTOR_USAGE_REPORTING_TASK_TYPE,
schedule: {
...CONNECTOR_USAGE_REPORTING_TASK_SCHEDULE,
},
state: {},
params: {},
});
} catch (e) {
this.logger.error(
`Error scheduling task ${CONNECTOR_USAGE_REPORTING_TASK_TYPE}, received ${e.message}`
);
}
};
private runTask = async (taskInstance: ConcreteTaskInstance, core: CoreSetup) => {
const { state } = taskInstance;
if (!this.projectId) {
this.logger.warn(
`Missing required project id while running ${CONNECTOR_USAGE_REPORTING_TASK_TYPE}`
);
return {
state,
};
}
if (!this.caCertificate) {
this.logger.error(
`Missing required CA Certificate while running ${CONNECTOR_USAGE_REPORTING_TASK_TYPE}`
);
return {
state,
};
}
const [{ elasticsearch }] = await core.getStartServices();
const esClient = elasticsearch.client.asInternalUser;
const now = new Date();
const oneDayAgo = new Date(new Date().getTime() - 24 * 60 * 60 * 1000);
const lastReportedUsageDate: Date = !!state.lastReportedUsageDate
? new Date(state.lastReportedUsageDate)
: oneDayAgo;
let attempts: number = state.attempts || 0;
const fromDate = lastReportedUsageDate;
const toDate = now;
let totalUsage = 0;
try {
totalUsage = await this.getTotalUsage({
esClient,
fromDate,
toDate,
});
} catch (e) {
this.logger.error(`Usage data could not be fetched. It will be retried. Error:${e.message}`);
return {
state: {
lastReportedUsageDate,
attempts,
},
runAt: now,
};
}
const record: ConnectorUsageReport = this.createUsageRecord({
totalUsage,
fromDate,
toDate,
projectId: this.projectId,
});
this.logger.debug(`Record: ${JSON.stringify(record)}`);
try {
attempts = attempts + 1;
await this.pushUsageRecord(record);
this.logger.info(
`Connector usage record has been successfully reported, ${record.creation_timestamp}, usage: ${record.usage.quantity}, period:${record.usage.period_seconds}`
);
} catch (e) {
if (attempts < MAX_PUSH_ATTEMPTS) {
this.logger.error(
`Usage data could not be pushed to usage-api. It will be retried (${attempts}). Error:${e.message}`
);
return {
state: {
lastReportedUsageDate,
attempts,
},
runAt: this.getDelayedRetryDate({ attempts, now }),
};
}
this.logger.error(
`Usage data could not be pushed to usage-api. Stopped retrying after ${attempts} attempts. Error:${e.message}`
);
return {
state: {
lastReportedUsageDate,
attempts: 0,
},
};
}
return {
state: { lastReportedUsageDate: toDate, attempts: 0 },
};
};
private getTotalUsage = async ({
esClient,
fromDate,
toDate,
}: {
esClient: ElasticsearchClient;
fromDate: Date;
toDate: Date;
}): Promise<number> => {
const usageResult = await esClient.search({
index: this.eventLogIndex,
sort: '@timestamp',
size: 0,
query: {
bool: {
filter: {
bool: {
must: [
{
term: { 'event.action': 'execute' },
},
{
term: { 'event.provider': 'actions' },
},
{
exists: {
field: 'kibana.action.execution.usage.request_body_bytes',
},
},
{
range: {
'@timestamp': {
gt: fromDate,
lte: toDate,
},
},
},
],
},
},
},
},
aggs: {
total_usage: { sum: { field: 'kibana.action.execution.usage.request_body_bytes' } },
},
});
return (usageResult.aggregations?.total_usage as AggregationsSumAggregate)?.value ?? 0;
};
private createUsageRecord = ({
totalUsage,
fromDate,
toDate,
projectId,
}: {
totalUsage: number;
fromDate: Date;
toDate: Date;
projectId: string;
}): ConnectorUsageReport => {
const period = Math.round((toDate.getTime() - fromDate.getTime()) / 1000);
const toStr = toDate.toISOString();
const timestamp = new Date(toStr);
timestamp.setMinutes(0);
timestamp.setSeconds(0);
timestamp.setMilliseconds(0);
return {
id: `connector-request-body-bytes-${projectId}-${timestamp.toISOString()}`,
usage_timestamp: toStr,
creation_timestamp: toStr,
usage: {
type: CONNECTOR_USAGE_TYPE,
period_seconds: period,
quantity: totalUsage,
},
source: {
id: CONNECTOR_USAGE_REPORTING_SOURCE_ID,
instance_group_id: projectId,
},
};
};
private pushUsageRecord = async (record: ConnectorUsageReport) => {
return axios.post(this.usageApiUrl, [record], {
headers: { 'Content-Type': 'application/json' },
timeout: CONNECTOR_USAGE_REPORTING_TASK_TIMEOUT,
httpsAgent: new https.Agent({
ca: this.caCertificate,
}),
});
};
private getDelayedRetryDate = ({ attempts, now }: { attempts: number; now: Date }) => {
const baseDelay = 60 * 1000;
const delayByAttempts = baseDelay * attempts;
const delayedTime = now.getTime() + delayByAttempts;
return new Date(delayedTime);
};
}

View file

@ -65,3 +65,18 @@ export const byServiceProviderTypeSchema: MakeSchemaFrom<ActionsUsage>['count_ac
other: { type: 'long' },
ses: { type: 'long' },
};
export interface ConnectorUsageReport {
id: string;
usage_timestamp: string;
creation_timestamp: string;
usage: {
type: string;
period_seconds: number;
quantity: number | string | undefined;
};
source: {
id: string | undefined;
instance_group_id: string;
};
}

View file

@ -47,7 +47,8 @@
"@kbn/core-http-server",
"@kbn/core-test-helpers-kbn-server",
"@kbn/security-plugin-types-server",
"@kbn/core-application-common"
"@kbn/core-application-common",
"@kbn/cloud-plugin"
],
"exclude": [
"target/**/*",

View file

@ -81,6 +81,7 @@ export default function ({ getService }: FtrProviderContext) {
'actions:.torq',
'actions:.webhook',
'actions:.xmatters',
'actions:connector_usage_reporting',
'actions_telemetry',
'ad_hoc_run-backfill',
'alerting:.es-query',