[Usage Counters] Enhancements to the APIs (#187665)

## Summary

Part of https://github.com/elastic/kibana/issues/186530
Follow-up of https://github.com/elastic/kibana/pull/187064 

The goal of this PR is to provide the necessary means to allow
implementing the [Counting
views](https://docs.google.com/document/d/1W77qoweixcjrq0sEKh_LjIk3j33Xyy9umod9mG9BlOM/edit)
part of the _Dashboards++_ initiative.
We do this by extending the capabilities of the _usage counters_ APIs:
* We support custom retention periods. Currently data is only kept in SO
indices for 5 days. Having 90 days worth of counting was required for
Dashboards++.
* We expose a Search API that will allow retrieving persisted counters.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Gerard Soldevila 2024-08-05 16:33:27 +02:00 committed by GitHub
parent 99ba4d8ad3
commit d9c1f9702b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
57 changed files with 1908 additions and 714 deletions

View file

@ -103,7 +103,8 @@ describe('getSearchDsl', () => {
mappings,
opts.type,
opts.sortField,
opts.sortOrder
opts.sortOrder,
opts.pit
);
});

View file

@ -86,7 +86,7 @@ export function getSearchDsl(
hasNoReferenceOperator,
kueryNode,
}),
...getSortingParams(mappings, type, sortField, sortOrder),
...getSortingParams(mappings, type, sortField, sortOrder, pit),
...(pit ? getPitParams(pit) : {}),
search_after: searchAfter,
};

View file

@ -238,4 +238,12 @@ describe('searchDsl/getSortParams', () => {
});
});
});
describe('pit, no sortField', () => {
it('defaults to natural storage order sorting', () => {
expect(getSortingParams(MAPPINGS, 'saved', undefined, undefined, { id: 'abc123' })).toEqual({
sort: ['_shard_doc'],
});
});
});
});

View file

@ -6,8 +6,9 @@
* Side Public License, v 1.
*/
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import Boom from '@hapi/boom';
import type { SortOrder, SortCombinations } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { SavedObjectsPitParams } from '@kbn/core-saved-objects-api-server/src/apis';
import { getProperty, type IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
const TOP_LEVEL_FIELDS = ['_id', '_score'];
@ -16,10 +17,15 @@ export function getSortingParams(
mappings: IndexMapping,
type: string | string[],
sortField?: string,
sortOrder?: estypes.SortOrder
): { sort?: estypes.SortCombinations[] } {
sortOrder?: SortOrder,
pit?: SavedObjectsPitParams
): { sort?: SortCombinations[] } {
if (!sortField) {
return {};
// if we are performing a PIT search, we must sort by some criteria
// in order to get the 'sort' property for each of the results.
// Defaulting to '_shard_doc' tells ES to sort by the natural stored order,
// giving the best performance
return pit ? { sort: ['_shard_doc'] } : {};
}
const types = Array.isArray(type) ? type : [type];

View file

@ -7,5 +7,6 @@
*/
export const getUsageCollection = () => ({
domainId: 'abc123',
incrementCounter: jest.fn(),
});

View file

@ -1,76 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import moment from 'moment';
import type { SavedObjectsFindResult } from '@kbn/core/server';
import {
type UsageCountersSavedObjectAttributes,
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
} from '@kbn/usage-collection-plugin/server';
import { isSavedObjectOlderThan } from './saved_objects';
export const createMockSavedObjectDoc = (
updatedAt: moment.Moment,
id: string,
namespace?: string
) =>
({
id,
type: USAGE_COUNTERS_SAVED_OBJECT_TYPE,
...(namespace && { namespaces: [namespace] }),
attributes: {
count: 3,
counterName: 'testName',
counterType: 'count',
domainId: 'testDomain',
source: 'server',
},
references: [],
updated_at: updatedAt.format(),
version: 'WzI5LDFd',
score: 0,
} as SavedObjectsFindResult<UsageCountersSavedObjectAttributes>);
describe('isSavedObjectOlderThan', () => {
it(`returns true if doc is older than x days`, () => {
const numberOfDays = 1;
const startDate = moment().format();
const doc = createMockSavedObjectDoc(moment().subtract(2, 'days'), 'some-id');
const result = isSavedObjectOlderThan({
numberOfDays,
startDate,
doc,
});
expect(result).toBe(true);
});
it(`returns false if doc is exactly x days old`, () => {
const numberOfDays = 1;
const startDate = moment().format();
const doc = createMockSavedObjectDoc(moment().subtract(1, 'days'), 'some-id');
const result = isSavedObjectOlderThan({
numberOfDays,
startDate,
doc,
});
expect(result).toBe(false);
});
it(`returns false if doc is younger than x days`, () => {
const numberOfDays = 2;
const startDate = moment().format();
const doc = createMockSavedObjectDoc(moment().subtract(1, 'days'), 'some-id');
const result = isSavedObjectOlderThan({
numberOfDays,
startDate,
doc,
});
expect(result).toBe(false);
});
});

View file

@ -1,31 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import moment from 'moment';
import { SavedObject } from '@kbn/core-saved-objects-api-server';
export function isSavedObjectOlderThan({
numberOfDays,
startDate,
doc,
}: {
numberOfDays: number;
startDate: moment.Moment | string | number;
doc: Pick<SavedObject, 'updated_at'>;
}): boolean {
const { updated_at: updatedAt } = doc;
const today = moment(startDate).startOf('day');
const updateDay = moment(updatedAt).startOf('day');
const diffInDays = today.diff(updateDay, 'days');
if (diffInDays > numberOfDays) {
return true;
}
return false;
}

View file

@ -20,8 +20,5 @@ export { registerCoreUsageCollector } from './core';
export { registerLocalizationUsageCollector } from './localization';
export { registerConfigUsageCollector } from './config_usage';
export { registerUiCountersUsageCollector } from './ui_counters';
export {
registerUsageCountersRollups,
registerUsageCountersUsageCollector,
} from './usage_counters';
export { registerUsageCountersUsageCollector } from './usage_counters';
export { registerEventLoopDelaysCollector } from './event_loop_delays';

View file

@ -7,4 +7,3 @@
*/
export { registerUsageCountersUsageCollector } from './register_usage_counters_collector';
export { registerUsageCountersRollups } from './rollups';

View file

@ -1,226 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import moment from 'moment';
/**
* Mocking methods that are used to retrieve current time. This allows:
* 1) introducing OLD counters that can be rolled up
* 2) Removing flakiness for tests that are executed on a 2 day span (close to midnight)
* getCurrentTime => used by `SOR.incrementCounter` to determine 'updated_at'
* isSavedObjectOlderThan => used by `rollUsageCountersIndices` to determine if a counter is beyond the retention period
*/
jest.mock('@kbn/core-saved-objects-api-server-internal/src/lib/apis/utils', () => ({
...jest.requireActual('@kbn/core-saved-objects-api-server-internal/src/lib/apis/utils'),
getCurrentTime: jest.fn(),
}));
jest.mock('../../common/saved_objects', () => ({
...jest.requireActual('../../common/saved_objects'),
isSavedObjectOlderThan: jest.fn(),
}));
import { getCurrentTime } from '@kbn/core-saved-objects-api-server-internal/src/lib/apis/utils';
import type { Logger, ISavedObjectsRepository, SavedObject } from '@kbn/core/server';
import {
type TestElasticsearchUtils,
type TestKibanaUtils,
createTestServers,
createRootWithCorePlugins,
} from '@kbn/core-test-helpers-kbn-server';
import {
serializeCounterKey,
type UsageCountersSavedObjectAttributes,
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
} from '@kbn/usage-collection-plugin/server';
import { rollUsageCountersIndices } from '../rollups/rollups';
import { USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS } from '../rollups/constants';
import { isSavedObjectOlderThan } from '../../common/saved_objects';
const getCurrentTimeMock = getCurrentTime as jest.MockedFunction<typeof getCurrentTime>;
const isSavedObjectOlderThanMock = isSavedObjectOlderThan as jest.MockedFunction<
typeof isSavedObjectOlderThan
>;
const NOW = '2024-06-30T10:00:00.000Z';
const OLD = moment(NOW).subtract(USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS + 1, 'days');
const RECENT = moment(NOW).subtract(USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS - 1, 'days');
const OLD_YMD = OLD.format('YYYYMMDD');
const RECENT_YMD = RECENT.format('YYYYMMDD');
const OLD_ISO = OLD.toISOString();
const RECENT_ISO = RECENT.toISOString();
const ALL_COUNTERS = [
`domain1:a:count:server:${OLD_YMD}:default`,
`domain1:a:count:server:${RECENT_YMD}:default`,
`domain1:b:count:server:${OLD_YMD}:one`,
`domain1:b:count:server:${OLD_YMD}:two`,
`domain1:b:count:server:${RECENT_YMD}:one`,
`domain1:b:count:server:${RECENT_YMD}:two`,
`domain2:a:count:server:${OLD_YMD}:default`,
`domain2:a:count:server:${RECENT_YMD}:default`,
`domain2:c:count:server:${RECENT_YMD}:default`,
];
const RECENT_COUNTERS = ALL_COUNTERS.filter((key) => key.includes(RECENT_YMD));
describe('usage-counters', () => {
let esServer: TestElasticsearchUtils;
let root: TestKibanaUtils['root'];
let internalRepository: ISavedObjectsRepository;
let logger: Logger;
beforeAll(async () => {
const { startES } = createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
});
esServer = await startES();
root = createRootWithCorePlugins();
await root.preboot();
await root.setup();
const start = await root.start();
logger = root.logger.get('test daily rollups');
internalRepository = start.savedObjects.createInternalRepository([
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
]);
// insert a bunch of usage counters in multiple namespaces
await createTestCounters(internalRepository);
});
it('deletes documents older that the retention period, from all namespaces', async () => {
// check that all documents are there
const beforeRollup = await internalRepository.find<UsageCountersSavedObjectAttributes>({
type: USAGE_COUNTERS_SAVED_OBJECT_TYPE,
namespaces: ['*'],
});
expect(
beforeRollup.saved_objects
.map(({ attributes, updated_at: updatedAt, namespaces }) =>
serializeCounterKey({ ...attributes, date: updatedAt, namespace: namespaces?.[0] })
)
.sort()
).toEqual(ALL_COUNTERS);
// run the rollup logic
isSavedObjectOlderThanMock.mockImplementation(({ doc }) => doc.updated_at === OLD_ISO);
await rollUsageCountersIndices(logger, internalRepository);
// check only recent counters are present
const afterRollup = await internalRepository.find<UsageCountersSavedObjectAttributes>({
type: USAGE_COUNTERS_SAVED_OBJECT_TYPE,
namespaces: ['*'],
});
expect(
afterRollup.saved_objects
.map(({ attributes, updated_at: updatedAt, namespaces }) =>
serializeCounterKey({ ...attributes, date: updatedAt, namespace: namespaces?.[0] })
)
.sort()
).toEqual(RECENT_COUNTERS);
});
afterAll(async () => {
await esServer.stop();
await root.shutdown();
});
});
async function createTestCounters(internalRepository: ISavedObjectsRepository) {
await createCounters(internalRepository, OLD_ISO, [
// domainId, counterName, counterType, source, count, namespace?
['domain1', 'a', 'count', 'server', 28],
['domain1', 'b', 'count', 'server', 29, 'one'],
['domain1', 'b', 'count', 'server', 30, 'two'],
['domain2', 'a', 'count', 'server', 31],
]);
await createCounters(internalRepository, RECENT_ISO, [
// domainId, counterName, counterType, source, count, namespace?
['domain1', 'a', 'count', 'server', 32],
['domain1', 'b', 'count', 'server', 33, 'one'],
['domain1', 'b', 'count', 'server', 34, 'two'],
['domain2', 'a', 'count', 'server', 35],
['domain2', 'c', 'count', 'server', 36],
]);
}
// domainId, counterName, counterType, source, count, namespace?
type CounterAttributes = [string, string, string, 'ui' | 'server', number, string?];
async function createCounters(
internalRepository: ISavedObjectsRepository,
isoDate: string,
countersAttributes: CounterAttributes[]
) {
// tamper SO `updated_at`
getCurrentTimeMock.mockReturnValue(isoDate);
await Promise.all(
countersAttributes
.map((attrs) => createCounter(isoDate, ...attrs))
.map((counter) => incrementCounter(internalRepository, counter))
);
}
function createCounter(
date: string,
domainId: string,
counterName: string,
counterType: string,
source: 'server' | 'ui',
count: number,
namespace?: string
): SavedObject<UsageCountersSavedObjectAttributes> {
const id = serializeCounterKey({
domainId,
counterName,
counterType,
namespace,
source,
date,
});
return {
type: USAGE_COUNTERS_SAVED_OBJECT_TYPE,
id,
...(namespace && { namespaces: [namespace] }),
attributes: {
domainId,
counterName,
counterType,
source,
count,
},
references: [],
};
}
async function incrementCounter(
internalRepository: ISavedObjectsRepository,
counter: SavedObject<UsageCountersSavedObjectAttributes>
) {
const namespace = counter.namespaces?.[0];
return await internalRepository.incrementCounter(
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
counter.id,
[{ fieldName: 'count', incrementBy: counter.attributes.count }],
{
...(namespace && { namespace }),
upsertAttributes: {
domainId: counter.attributes.domainId,
counterName: counter.attributes.counterName,
counterType: counter.attributes.counterType,
source: counter.attributes.source,
},
}
);
}

View file

@ -1,112 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import moment from 'moment';
import { savedObjectsRepositoryMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { USAGE_COUNTERS_SAVED_OBJECT_TYPE } from '@kbn/usage-collection-plugin/server';
import { USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS } from './constants';
import { createMockSavedObjectDoc } from '../../common/saved_objects.test';
import { rollUsageCountersIndices } from './rollups';
describe('rollUsageCountersIndices', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
let savedObjectClient: ReturnType<typeof savedObjectsRepositoryMock.create>;
beforeEach(() => {
logger = loggingSystemMock.createLogger();
savedObjectClient = savedObjectsRepositoryMock.create();
});
it('returns undefined if no savedObjectsClient initialised yet', async () => {
await expect(rollUsageCountersIndices(logger, undefined)).resolves.toBe(undefined);
expect(logger.warn).toHaveBeenCalledTimes(0);
});
it('does not delete any documents on empty saved objects', async () => {
savedObjectClient.find.mockImplementation(async ({ type, page = 1, perPage = 10 }) => {
switch (type) {
case USAGE_COUNTERS_SAVED_OBJECT_TYPE:
return { saved_objects: [], total: 0, page, per_page: perPage };
default:
throw new Error(`Unexpected type [${type}]`);
}
});
await expect(rollUsageCountersIndices(logger, savedObjectClient)).resolves.toEqual([]);
expect(savedObjectClient.find).toBeCalled();
expect(savedObjectClient.delete).not.toBeCalled();
expect(logger.warn).toHaveBeenCalledTimes(0);
});
it(`deletes documents older than ${USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS} days`, async () => {
const mockSavedObjects = [
createMockSavedObjectDoc(moment().subtract(5, 'days'), 'doc-id-1'),
createMockSavedObjectDoc(moment().subtract(9, 'days'), 'doc-id-1'),
createMockSavedObjectDoc(moment().subtract(1, 'days'), 'doc-id-2'),
createMockSavedObjectDoc(moment().subtract(6, 'days'), 'doc-id-3', 'secondary'),
];
savedObjectClient.find.mockImplementation(async ({ type, page = 1, perPage = 10 }) => {
switch (type) {
case USAGE_COUNTERS_SAVED_OBJECT_TYPE:
return { saved_objects: mockSavedObjects, total: 0, page, per_page: perPage };
default:
throw new Error(`Unexpected type [${type}]`);
}
});
await expect(rollUsageCountersIndices(logger, savedObjectClient)).resolves.toHaveLength(2);
expect(savedObjectClient.find).toBeCalled();
expect(savedObjectClient.delete).toHaveBeenCalledTimes(2);
expect(savedObjectClient.delete).toHaveBeenNthCalledWith(
1,
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
'doc-id-1'
);
expect(savedObjectClient.delete).toHaveBeenNthCalledWith(
2,
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
'doc-id-3',
{ namespace: 'secondary' }
);
expect(logger.warn).toHaveBeenCalledTimes(0);
});
it(`logs warnings on savedObject.find failure`, async () => {
savedObjectClient.find.mockImplementation(async () => {
throw new Error(`Expected error!`);
});
await expect(rollUsageCountersIndices(logger, savedObjectClient)).resolves.toEqual(undefined);
expect(savedObjectClient.find).toBeCalled();
expect(savedObjectClient.delete).not.toBeCalled();
expect(logger.warn).toHaveBeenCalledTimes(2);
});
it(`logs warnings on savedObject.delete failure`, async () => {
const mockSavedObjects = [createMockSavedObjectDoc(moment().subtract(7, 'days'), 'doc-id-1')];
savedObjectClient.find.mockImplementation(async ({ type, page = 1, perPage = 10 }) => {
switch (type) {
case USAGE_COUNTERS_SAVED_OBJECT_TYPE:
return { saved_objects: mockSavedObjects, total: 0, page, per_page: perPage };
default:
throw new Error(`Unexpected type [${type}]`);
}
});
savedObjectClient.delete.mockImplementation(async () => {
throw new Error(`Expected error!`);
});
await expect(rollUsageCountersIndices(logger, savedObjectClient)).resolves.toEqual(undefined);
expect(savedObjectClient.find).toBeCalled();
expect(savedObjectClient.delete).toHaveBeenCalledTimes(1);
expect(savedObjectClient.delete).toHaveBeenNthCalledWith(
1,
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
'doc-id-1'
);
expect(logger.warn).toHaveBeenCalledTimes(2);
});
});

View file

@ -1,56 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import moment from 'moment';
import type { ISavedObjectsRepository, Logger } from '@kbn/core/server';
import {
type UsageCountersSavedObject,
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
} from '@kbn/usage-collection-plugin/server';
import { USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS } from './constants';
import { isSavedObjectOlderThan } from '../../common/saved_objects';
export async function rollUsageCountersIndices(
logger: Logger,
savedObjectsClient?: ISavedObjectsRepository
) {
if (!savedObjectsClient) {
return;
}
const now = moment();
try {
const { saved_objects: rawUiCounterDocs } =
await savedObjectsClient.find<UsageCountersSavedObject>({
type: USAGE_COUNTERS_SAVED_OBJECT_TYPE,
namespaces: ['*'],
perPage: 1000, // Process 1000 at a time as a compromise of speed and overload
});
const docsToDelete = rawUiCounterDocs.filter((doc) =>
isSavedObjectOlderThan({
numberOfDays: USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS,
startDate: now,
doc,
})
);
return await Promise.all(
docsToDelete.map(({ id, type, namespaces }) =>
namespaces?.[0]
? savedObjectsClient.delete(type, id, { namespace: namespaces[0] })
: savedObjectsClient.delete(type, id)
)
);
} catch (err) {
logger.warn(`Failed to rollup Usage Counters saved objects.`);
logger.warn(err);
}
}

View file

@ -53,6 +53,7 @@ describe('registerEbtCounters', () => {
test('it reuses the usageCounter when it already exists', () => {
const incrementCounterMock = jest.fn();
usageCollection.getUsageCounterByDomainId.mockReturnValue({
domainId: 'abc123',
incrementCounter: incrementCounterMock,
});
registerEbtCounters(core.analytics, usageCollection);

View file

@ -39,7 +39,6 @@ import {
registerLocalizationUsageCollector,
registerUiCountersUsageCollector,
registerConfigUsageCollector,
registerUsageCountersRollups,
registerUsageCountersUsageCollector,
registerSavedObjectsCountUsageCollector,
registerEventLoopDelaysCollector,
@ -128,11 +127,6 @@ export class KibanaUsageCollectionPlugin implements Plugin {
registerUiCountersUsageCollector(usageCollection, this.logger);
registerUsageCountersRollups(
this.logger.get('usage-counters-rollup'),
getSavedObjectsClient,
pluginStop$
);
registerUsageCountersUsageCollector(usageCollection, this.logger);
registerOpsStatsCollector(usageCollection, metric$);

View file

@ -18,7 +18,6 @@
"@kbn/core-test-helpers-kbn-server",
"@kbn/core-usage-data-server",
"@kbn/core-saved-objects-api-server",
"@kbn/core-saved-objects-api-server-internal",
],
"exclude": [
"target/**/*",

View file

@ -8,12 +8,21 @@
export type CounterEventSource = 'server' | 'ui';
export interface CounterMetric {
export interface AbstractCounter {
/** The domainId used to create the Counter API */
domainId: string;
namespace?: string;
/** The name of the counter */
counterName: string;
/** The type of counter (defaults to 'count') */
counterType: string;
/** The source of this counter: 'server' | 'ui' */
source: CounterEventSource;
/** Namespace associated to this counter */
namespace?: string;
}
export interface CounterMetric extends AbstractCounter {
/** Amount of units to increment this counter */
incrementBy: number;
}

View file

@ -16,8 +16,8 @@ import type {
ExecutionContextSetup,
} from '@kbn/core/server';
import { Collector } from './collector';
import type { ICollector, CollectorOptions, CollectorFetchContext } from './types';
import { UsageCollector, UsageCollectorOptions } from './usage_collector';
import type { ICollector, CollectorOptions, CollectorFetchContext, ICollectorSet } from './types';
import { UsageCollector, type UsageCollectorOptions } from './usage_collector';
import { DEFAULT_MAXIMUM_WAIT_TIME_FOR_ALL_COLLECTORS_IN_S } from '../../common/constants';
import { createPerformanceObsHook, perfTimerify } from './measure_duration';
import { usageCollectorsStatsCollector } from './collector_stats';
@ -44,7 +44,7 @@ export interface CollectorSetConfig {
collectors?: AnyCollector[];
}
export class CollectorSet {
export class CollectorSet implements ICollectorSet {
private readonly logger: Logger;
private readonly executionContext: ExecutionContextSetup;
private readonly maximumWaitTimeForAllCollectorsInS: number;

View file

@ -8,7 +8,7 @@
import { sumBy } from 'lodash';
import { collectorsStatsSchema } from './schema';
import type { CollectorSet } from '../collector_set';
import type { ICollectorSet } from '../types';
export interface CollectorsStats {
not_ready: { count: number; names: string[] };
@ -35,7 +35,7 @@ export interface CollectorsStatsCollectorParams {
}
export const usageCollectorsStatsCollector = (
usageCollection: Pick<CollectorSet, 'makeUsageCollector'>,
usageCollection: Pick<ICollectorSet, 'makeUsageCollector'>,
{
nonReadyCollectorTypes,
timedOutCollectorsTypes,

View file

@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
export type { ICollectorSet } from './types';
export { CollectorSet } from './collector_set';
export type {
AllowedSchemaTypes,

View file

@ -18,6 +18,65 @@ export type {
PossibleSchemaTypes,
} from '@elastic/ebt/client';
import type { Collector, UsageCollectorOptions } from '.';
/**
* Interface to register and manage Usage Collectors through a CollectorSet
*/
export interface ICollectorSet {
/**
* Creates a usage collector to collect plugin telemetry data.
* registerCollector must be called to connect the created collector with the service.
*/
makeUsageCollector: <TFetchReturn, ExtraOptions extends object = {}>(
options: UsageCollectorOptions<TFetchReturn, ExtraOptions>
) => Collector<TFetchReturn, ExtraOptions>;
/**
* Register a usage collector or a stats collector.
* Used to connect the created collector to telemetry.
*/
registerCollector: <TFetchReturn, ExtraOptions extends object>(
collector: Collector<TFetchReturn, ExtraOptions>
) => void;
/**
* Returns a usage collector by type
*/
getCollectorByType: <TFetchReturn, ExtraOptions extends object>(
type: string
) => Collector<TFetchReturn, ExtraOptions> | undefined;
/**
* Fetches the collection from all the registered collectors
* @internal: telemetry use
*/
bulkFetch: <TFetchReturn, ExtraOptions extends object>(
esClient: ElasticsearchClient,
soClient: SavedObjectsClientContract,
collectors?: Map<string, Collector<TFetchReturn, ExtraOptions>>
) => Promise<Array<{ type: string; result: unknown }>>;
/**
* Converts an array of fetched stats results into key/object
* @internal: telemetry use
*/
toObject: <Result extends Record<string, unknown>, T = unknown>(
statsData?: Array<{ type: string; result: T }>
) => Result;
/**
* Rename fields to use API conventions
* @internal: monitoring use
*/
toApiFieldNames: (
apiData: Record<string, unknown> | unknown[]
) => Record<string, unknown> | unknown[];
/**
* Creates a stats collector to collect plugin telemetry data.
* registerCollector must be called to connect the created collector with the service.
* @internal: telemetry and monitoring use
*/
makeStatsCollector: <TFetchReturn, ExtraOptions extends object = {}>(
options: CollectorOptions<TFetchReturn, ExtraOptions>
) => Collector<TFetchReturn, ExtraOptions>;
}
/**
* Helper to find out whether to keep recursively looking or if we are on an end value
*/

View file

@ -10,6 +10,7 @@ import { PluginInitializerContext } from '@kbn/core/server';
export type {
Collector,
ICollectorSet,
AllowedSchemaTypes,
MakeSchemaFrom,
CollectorOptions,

View file

@ -13,9 +13,9 @@ import {
savedObjectsClientMock,
} from '@kbn/core/server/mocks';
import { CollectorOptions, CollectorSet } from './collector';
import { type CollectorOptions, CollectorSet } from './collector';
import { Collector } from './collector/collector';
import { UsageCollectionSetup, CollectorFetchContext } from '.';
import type { UsageCollectionSetup, CollectorFetchContext } from '.';
import { usageCountersServiceMock } from './usage_counters/usage_counters_service.mock';
export type { CollectorOptions };
export { Collector };

View file

@ -13,81 +13,21 @@ import type {
CoreStart,
ISavedObjectsRepository,
Plugin,
ElasticsearchClient,
SavedObjectsClientContract,
} from '@kbn/core/server';
import type { ConfigType } from './config';
import { CollectorSet } from './collector';
import type { Collector, CollectorOptions, UsageCollectorOptions } from './collector';
import { setupRoutes } from './routes';
import type { ConfigType } from './config';
import type { ICollectorSet } from './collector/types';
import type { UsageCountersServiceSetup, UsageCountersServiceStart } from './usage_counters/types';
import { CollectorSet } from './collector';
import { UsageCountersService } from './usage_counters';
import type { UsageCounter } from './usage_counters';
/** Server's setup APIs exposed by the UsageCollection Service **/
export interface UsageCollectionSetup {
/**
* Creates and registers a usage counter to collect daily aggregated plugin counter events
*/
createUsageCounter: (type: string) => UsageCounter;
/**
* Returns a usage counter by type
*/
getUsageCounterByDomainId: (type: string) => UsageCounter | undefined;
/**
* Creates a usage collector to collect plugin telemetry data.
* registerCollector must be called to connect the created collector with the service.
*/
makeUsageCollector: <TFetchReturn, ExtraOptions extends object = {}>(
options: UsageCollectorOptions<TFetchReturn, ExtraOptions>
) => Collector<TFetchReturn, ExtraOptions>;
/**
* Register a usage collector or a stats collector.
* Used to connect the created collector to telemetry.
*/
registerCollector: <TFetchReturn, ExtraOptions extends object>(
collector: Collector<TFetchReturn, ExtraOptions>
) => void;
/**
* Returns a usage collector by type
*/
getCollectorByType: <TFetchReturn, ExtraOptions extends object>(
type: string
) => Collector<TFetchReturn, ExtraOptions> | undefined;
/**
* Fetches the collection from all the registered collectors
* @internal: telemetry use
*/
bulkFetch: <TFetchReturn, ExtraOptions extends object>(
esClient: ElasticsearchClient,
soClient: SavedObjectsClientContract,
collectors?: Map<string, Collector<TFetchReturn, ExtraOptions>>
) => Promise<Array<{ type: string; result: unknown }>>;
/**
* Converts an array of fetched stats results into key/object
* @internal: telemetry use
*/
toObject: <Result extends Record<string, unknown>, T = unknown>(
statsData?: Array<{ type: string; result: T }>
) => Result;
/**
* Rename fields to use API conventions
* @internal: monitoring use
*/
toApiFieldNames: (
apiData: Record<string, unknown> | unknown[]
) => Record<string, unknown> | unknown[];
/**
* Creates a stats collector to collect plugin telemetry data.
* registerCollector must be called to connect the created collector with the service.
* @internal: telemetry and monitoring use
*/
makeStatsCollector: <TFetchReturn, ExtraOptions extends object = {}>(
options: CollectorOptions<TFetchReturn, ExtraOptions>
) => Collector<TFetchReturn, ExtraOptions>;
}
/** Plugin's setup API **/
export type UsageCollectionSetup = ICollectorSet & UsageCountersServiceSetup;
export class UsageCollectionPlugin implements Plugin<UsageCollectionSetup> {
/** Plugin's start API **/
export type UsageCollectionStart = UsageCountersServiceStart;
export class UsageCollectionPlugin implements Plugin<UsageCollectionSetup, UsageCollectionStart> {
private readonly logger: Logger;
private savedObjects?: ISavedObjectsRepository;
private usageCountersService?: UsageCountersService;
@ -112,13 +52,12 @@ export class UsageCollectionPlugin implements Plugin<UsageCollectionSetup> {
bufferDurationMs: config.usageCounters.bufferDuration.asMilliseconds(),
});
const usageCountersServiceSetup = this.usageCountersService.setup(core);
const { createUsageCounter, getUsageCounterByDomainId } = usageCountersServiceSetup;
const usageCountersSetup = this.usageCountersService.setup(core);
const router = core.http.createRouter();
setupRoutes({
router,
usageCountersServiceSetup,
usageCounters: usageCountersSetup,
getSavedObjects: () => this.savedObjects,
collectorSet,
config: {
@ -133,6 +72,10 @@ export class UsageCollectionPlugin implements Plugin<UsageCollectionSetup> {
});
return {
// usage counters methods
createUsageCounter: usageCountersSetup.createUsageCounter,
getUsageCounterByDomainId: usageCountersSetup.getUsageCounterByDomainId,
// collector set methods
bulkFetch: collectorSet.bulkFetch,
getCollectorByType: collectorSet.getCollectorByType,
makeStatsCollector: collectorSet.makeStatsCollector,
@ -140,12 +83,10 @@ export class UsageCollectionPlugin implements Plugin<UsageCollectionSetup> {
registerCollector: collectorSet.registerCollector,
toApiFieldNames: collectorSet.toApiFieldNames,
toObject: collectorSet.toObject,
createUsageCounter,
getUsageCounterByDomainId,
};
}
public start({ savedObjects }: CoreStart) {
public start({ savedObjects }: CoreStart): UsageCollectionStart {
this.logger.debug('Starting plugin');
const config = this.initializerContext.config.get<ConfigType>();
if (!this.usageCountersService) {
@ -153,12 +94,13 @@ export class UsageCollectionPlugin implements Plugin<UsageCollectionSetup> {
}
this.savedObjects = savedObjects.createInternalRepository();
if (config.usageCounters.enabled) {
this.usageCountersService.start({ savedObjects });
} else {
// call stop() to complete observers.
this.usageCountersService.stop();
}
const usageCountersStart = config.usageCounters.enabled
? this.usageCountersService.start({ savedObjects })
: this.usageCountersService.stop();
return {
search: usageCountersStart.search,
};
}
public stop() {

View file

@ -15,7 +15,7 @@ import { type UsageCountersServiceSetup } from '../usage_counters';
export async function storeUiReport(
internalRepository: ISavedObjectsRepository,
counters: UsageCountersServiceSetup,
usageCounters: UsageCountersServiceSetup,
report: ReportSchemaType
) {
const uiCounters = report.uiCounter ? Object.entries(report.uiCounter) : [];
@ -61,7 +61,8 @@ export async function storeUiReport(
const { appName, eventName, total, type } = metric;
const counter =
counters.getUsageCounterByDomainId(appName) ?? counters.createUsageCounter(appName);
usageCounters.getUsageCounterByDomainId(appName) ??
usageCounters.createUsageCounter(appName);
counter.incrementCounter({
counterName: eventName,

View file

@ -12,20 +12,20 @@ import {
type MetricsServiceSetup,
ServiceStatus,
} from '@kbn/core/server';
import { Observable } from 'rxjs';
import { CollectorSet } from '../collector';
import type { Observable } from 'rxjs';
import type { ICollectorSet } from '../collector';
import { registerUiCountersRoute } from './ui_counters';
import { registerStatsRoute } from './stats';
import type { UsageCountersServiceSetup } from '../usage_counters';
export function setupRoutes({
router,
usageCountersServiceSetup,
usageCounters,
getSavedObjects,
...rest
}: {
router: IRouter;
getSavedObjects: () => ISavedObjectsRepository | undefined;
usageCountersServiceSetup: UsageCountersServiceSetup;
usageCounters: UsageCountersServiceSetup;
config: {
allowAnonymous: boolean;
kibanaIndex: string;
@ -37,10 +37,10 @@ export function setupRoutes({
port: number;
};
};
collectorSet: CollectorSet;
collectorSet: ICollectorSet;
metrics: MetricsServiceSetup;
overallStatus$: Observable<ServiceStatus>;
}) {
registerUiCountersRoute(router, getSavedObjects, usageCountersServiceSetup);
registerUiCountersRoute(router, getSavedObjects, usageCounters);
registerStatsRoute({ router, ...rest });
}

View file

@ -17,7 +17,7 @@ import {
ServiceStatus,
ServiceStatusLevels,
} from '@kbn/core/server';
import { CollectorSet } from '../../collector';
import { ICollectorSet } from '../../collector';
import { Stats } from '../../../common/types';
const SNAPSHOT_REGEX = /-snapshot/i;
@ -40,7 +40,7 @@ export function registerStatsRoute({
port: number;
};
};
collectorSet: CollectorSet;
collectorSet: ICollectorSet;
metrics: MetricsServiceSetup;
overallStatus$: Observable<ServiceStatus>;
}) {

View file

@ -15,7 +15,7 @@ import type { UiCounters } from '../../common/types';
export function registerUiCountersRoute(
router: IRouter,
getSavedObjects: () => ISavedObjectsRepository | undefined,
usageCountersServiceSetup: UsageCountersServiceSetup
usageCounters: UsageCountersServiceSetup
) {
router.post(
{
@ -33,8 +33,8 @@ export function registerUiCountersRoute(
if (!internalRepository) {
throw Error(`The saved objects client hasn't been initialised yet`);
}
// we pass the whole usageCountersServiceSetup, so that we can create UI counters dynamically
await storeUiReport(internalRepository, usageCountersServiceSetup, requestBody.report);
// we need to create UI counters dynamically if not explicitly created on server-side
await storeUiReport(internalRepository, usageCounters, requestBody.report);
const bodyOk: UiCounters.v1.UiCountersResponseOk = {
status: 'ok',
};

View file

@ -0,0 +1,151 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { fromKueryExpression } from '@kbn/es-query';
import { usageCountersSearchParamsToKueryFilter } from './kuery_utils';
import type { UsageCountersSearchFilters } from '../types';
describe('usageCountersSearchParamsToKueryFilter', () => {
it('creates a Kuery function with the provided search arguments', () => {
const params: Omit<UsageCountersSearchFilters, 'namespace'> = {
domainId: 'foo',
counterName: 'bar',
counterType: 'count',
source: 'server',
from: '2024-07-03T10:00:00.000Z',
to: '2024-07-10T10:00:00.000Z',
};
const fromParams = usageCountersSearchParamsToKueryFilter(params);
const fromExpression = fromKueryExpression(
// We need to pass the SO type (+ attributes)
// This is handled (removed) by the SOR internally
[
`usage-counter.attributes.domainId: ${params.domainId}`,
`usage-counter.attributes.counterName: ${params.counterName}`,
`usage-counter.attributes.counterType: ${params.counterType}`,
`usage-counter.attributes.source: ${params.source}`,
`usage-counter.updated_at >= "${params.from}"`,
`usage-counter.updated_at <= "${params.to}"`,
].join(' AND ')
);
// hack Kuery expression, as we cannot unquote date params above
fromExpression.arguments[4].arguments[2].isQuoted = false;
fromExpression.arguments[5].arguments[2].isQuoted = false;
expect(fromParams).toEqual(fromExpression);
expect(fromParams).toMatchInlineSnapshot(`
Object {
"arguments": Array [
Object {
"arguments": Array [
Object {
"isQuoted": false,
"type": "literal",
"value": "usage-counter.attributes.domainId",
},
Object {
"isQuoted": false,
"type": "literal",
"value": "foo",
},
],
"function": "is",
"type": "function",
},
Object {
"arguments": Array [
Object {
"isQuoted": false,
"type": "literal",
"value": "usage-counter.attributes.counterName",
},
Object {
"isQuoted": false,
"type": "literal",
"value": "bar",
},
],
"function": "is",
"type": "function",
},
Object {
"arguments": Array [
Object {
"isQuoted": false,
"type": "literal",
"value": "usage-counter.attributes.counterType",
},
Object {
"isQuoted": false,
"type": "literal",
"value": "count",
},
],
"function": "is",
"type": "function",
},
Object {
"arguments": Array [
Object {
"isQuoted": false,
"type": "literal",
"value": "usage-counter.attributes.source",
},
Object {
"isQuoted": false,
"type": "literal",
"value": "server",
},
],
"function": "is",
"type": "function",
},
Object {
"arguments": Array [
Object {
"isQuoted": false,
"type": "literal",
"value": "usage-counter.updated_at",
},
"gte",
Object {
"isQuoted": false,
"type": "literal",
"value": "2024-07-03T10:00:00.000Z",
},
],
"function": "range",
"type": "function",
},
Object {
"arguments": Array [
Object {
"isQuoted": false,
"type": "literal",
"value": "usage-counter.updated_at",
},
"lte",
Object {
"isQuoted": false,
"type": "literal",
"value": "2024-07-10T10:00:00.000Z",
},
],
"function": "range",
"type": "function",
},
],
"function": "and",
"type": "function",
}
`);
});
});

View file

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { type KueryNode, nodeTypes } from '@kbn/es-query';
import { USAGE_COUNTERS_SAVED_OBJECT_TYPE } from '../saved_objects';
import type { UsageCountersSearchFilters } from '../types';
export function usageCountersSearchParamsToKueryFilter(
params: Omit<UsageCountersSearchFilters, 'namespace'>
): KueryNode {
const { domainId, counterName, counterType, source, from, to } = params;
const isFilters = filtersToKueryNodes({ domainId, counterName, counterType, source });
// add a date range filters
if (from) {
isFilters.push(
nodeTypes.function.buildNode(
'range',
`${USAGE_COUNTERS_SAVED_OBJECT_TYPE}.updated_at`,
'gte',
from
)
);
}
if (to) {
isFilters.push(
nodeTypes.function.buildNode(
'range',
`${USAGE_COUNTERS_SAVED_OBJECT_TYPE}.updated_at`,
'lte',
to
)
);
}
return nodeTypes.function.buildNode('and', isFilters);
}
function filtersToKueryNodes(filters: Partial<Record<string, string>>): KueryNode[] {
return Object.entries(filters)
.filter(([, attributeValue]) => typeof attributeValue === 'string' && attributeValue)
.map(([attributeName, attributeValue]) =>
nodeTypes.function.buildNode(
'is',
`${USAGE_COUNTERS_SAVED_OBJECT_TYPE}.attributes.${attributeName}`,
attributeValue
)
);
}

View file

@ -8,7 +8,7 @@
import { UsageCounters } from '../../common';
export type IncrementCounterParams = UsageCounters.v1.IncrementCounterParams;
export type { UsageCountersServiceSetup } from './usage_counters_service';
export type { UsageCountersServiceSetup, UsageCountersServiceStart } from './types';
export type { UsageCountersSavedObjectAttributes, UsageCountersSavedObject } from './saved_objects';
export type { IUsageCounter as UsageCounter } from './usage_counter';

View file

@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { USAGE_COUNTERS_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
import type { ISavedObjectsRepository, SavedObject, ElasticsearchClient } from '@kbn/core/server';
import type { UsageCounters } from '../../../common/types';
import {
serializeCounterKey,
type UsageCountersSavedObjectAttributes,
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
} from '../..';
// domainId, counterName, counterType, source, count, namespace?
export type CounterAttributes = [
string,
string,
string,
UsageCounters.v1.CounterEventSource,
number,
string?
];
export function toCounterMetric(counter: CounterAttributes): UsageCounters.v1.CounterMetric {
const [domainId, counterName, counterType, source, incrementBy, namespace] = counter;
return { domainId, counterName, counterType, source, incrementBy, namespace };
}
export async function createCounters(
internalRepository: ISavedObjectsRepository,
esClient: ElasticsearchClient,
isoDate: string,
counters: UsageCounters.v1.CounterMetric[]
) {
await Promise.all(
counters
.map((counter) => createCounterObject(isoDate, counter))
.map((counter) => incrementCounter(internalRepository, counter))
);
// we manually update the 'updated_at' property of the SOs, to simulate older counters
await modifyUpdatedAt(
esClient,
counters.map((counter) =>
serializeCounterKey({
...counter,
// SOR injects '[namespace:]so_type:' prefix when generating the ID
domainId: `${USAGE_COUNTERS_SAVED_OBJECT_TYPE}:${counter.domainId}`,
date: isoDate,
})
),
isoDate
);
}
function createCounterObject(
date: string,
counter: UsageCounters.v1.CounterMetric
): SavedObject<UsageCountersSavedObjectAttributes> {
const { domainId, counterName, counterType, namespace, source, incrementBy } = counter;
const id = serializeCounterKey({
domainId,
counterName,
counterType,
source,
date,
});
return {
type: USAGE_COUNTERS_SAVED_OBJECT_TYPE,
id,
...(namespace && { namespaces: [namespace] }),
// updated_at: date // illustrative purpose only, overriden by SOR
attributes: {
domainId,
counterName,
counterType,
source,
count: incrementBy,
},
references: [],
};
}
async function incrementCounter(
internalRepository: ISavedObjectsRepository,
counter: SavedObject<UsageCountersSavedObjectAttributes>
) {
const namespace = counter.namespaces?.[0];
return await internalRepository.incrementCounter(
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
counter.id,
[{ fieldName: 'count', incrementBy: counter.attributes.count }],
{
...(namespace && { namespace }),
upsertAttributes: {
domainId: counter.attributes.domainId,
counterName: counter.attributes.counterName,
counterType: counter.attributes.counterType,
source: counter.attributes.source,
},
}
);
}
async function modifyUpdatedAt(esClient: ElasticsearchClient, ids: string[], updatedAt: string) {
await esClient.updateByQuery({
index: USAGE_COUNTERS_SAVED_OBJECT_INDEX,
query: {
ids: {
values: ids,
},
},
refresh: true,
script: {
lang: 'painless',
params: { updatedAt },
source: `ctx._source.updated_at = params.updatedAt;`,
},
});
}

View file

@ -0,0 +1,181 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import moment from 'moment';
import type { Logger, ISavedObjectsRepository, ElasticsearchClient } from '@kbn/core/server';
import {
type TestElasticsearchUtils,
type TestKibanaUtils,
createTestServers,
createRootWithCorePlugins,
} from '@kbn/core-test-helpers-kbn-server';
import {
serializeCounterKey,
UsageCountersSavedObjectAttributes,
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
} from '../saved_objects';
import { USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS } from '../rollups/constants';
import { rollUsageCountersIndices } from '../rollups/rollups';
import { type CounterAttributes, createCounters, toCounterMetric } from './counter_utils';
import type { IUsageCounter } from '../usage_counter';
const CUSTOM_RETENTION = 90;
const NOW = '2024-06-30T10:00:00.000Z';
const OLD = moment(NOW).subtract(USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS + 1, 'days');
const RECENT = moment(NOW).subtract(USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS - 1, 'days');
const OLD_YMD = OLD.format('YYYYMMDD');
const RECENT_YMD = RECENT.format('YYYYMMDD');
const OLD_ISO = OLD.toISOString();
const RECENT_ISO = RECENT.toISOString();
const CUSTOM_OLD = moment(NOW).subtract(CUSTOM_RETENTION + 2, 'days');
const CUSTOM_RECENT = moment(NOW).subtract(USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS - 1, 'days');
const CUSTOM_OLD_YMD = CUSTOM_OLD.format('YYYYMMDD');
const CUSTOM_RECENT_YMD = CUSTOM_RECENT.format('YYYYMMDD');
const CUSTOM_OLD_ISO = CUSTOM_OLD.toISOString();
const CUSTOM_RECENT_ISO = CUSTOM_RECENT.toISOString();
const ALL_COUNTERS = [
`testCounter|domain1:a:count:server:${OLD_YMD}`,
`testCounter|domain2:a:count:server:${OLD_YMD}`,
`one:testCounter|domain1:b:count:server:${OLD_YMD}`,
`two:testCounter|domain1:b:count:server:${OLD_YMD}`,
`testCounter|domain1:a:count:server:${RECENT_YMD}`,
`testCounter|domain2:a:count:server:${RECENT_YMD}`,
`testCounter|domain2:c:count:server:${RECENT_YMD}`,
`one:testCounter|domain1:b:count:server:${RECENT_YMD}`,
`two:testCounter|domain1:b:count:server:${RECENT_YMD}`,
`testCounter|retention_${CUSTOM_RETENTION}:a:count:server:${CUSTOM_OLD_YMD}`,
`testCounter|retention_${CUSTOM_RETENTION}:a:count:server:${CUSTOM_RECENT_YMD}`,
].sort();
const RECENT_COUNTERS = ALL_COUNTERS.filter(
(key) => key.includes(RECENT_YMD) || key.includes(CUSTOM_RECENT_YMD)
);
describe('usage-counters', () => {
let esServer: TestElasticsearchUtils;
let root: TestKibanaUtils['root'];
let getRegisteredUsageCounters: () => IUsageCounter[];
let internalRepository: ISavedObjectsRepository;
let logger: Logger;
beforeAll(async () => {
const { startES } = createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
});
esServer = await startES();
root = createRootWithCorePlugins();
getRegisteredUsageCounters = () => [
{
domainId: 'testCounter|domain1',
incrementCounter: jest.fn(),
},
{
domainId: 'testCounter|domain2',
incrementCounter: jest.fn(),
},
{
domainId: `testCounter|retention_${CUSTOM_RETENTION}`,
retentionPeriodDays: 90,
incrementCounter: jest.fn(),
},
];
await root.preboot();
await root.setup();
const start = await root.start();
logger = root.logger.get('test daily rollups');
internalRepository = start.savedObjects.createInternalRepository([
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
]);
// insert a bunch of usage counters in multiple namespaces
await createTestCounters(internalRepository, start.elasticsearch.client.asInternalUser);
});
it('deletes documents older that the retention period, from all namespaces', async () => {
// check that all documents are there
const beforeRollup = await internalRepository.find<UsageCountersSavedObjectAttributes>({
type: USAGE_COUNTERS_SAVED_OBJECT_TYPE,
namespaces: ['*'],
});
expect(
beforeRollup.saved_objects
.map(({ attributes, updated_at: updatedAt, namespaces }) =>
serializeCounterKey({ ...attributes, date: updatedAt, namespace: namespaces?.[0] })
)
.filter((counterKey) => counterKey.includes('testCounter|'))
.sort()
).toEqual(ALL_COUNTERS);
await rollUsageCountersIndices({
logger,
getRegisteredUsageCounters,
internalRepository,
now: moment(NOW),
});
// check only recent counters are present
const afterRollup = await internalRepository.find<UsageCountersSavedObjectAttributes>({
type: USAGE_COUNTERS_SAVED_OBJECT_TYPE,
namespaces: ['*'],
});
expect(
afterRollup.saved_objects
.map(({ attributes, updated_at: updatedAt, namespaces }) =>
serializeCounterKey({ ...attributes, date: updatedAt, namespace: namespaces?.[0] })
)
.filter((counterKey) => counterKey.includes('testCounter|'))
.sort()
).toEqual(RECENT_COUNTERS);
});
afterAll(async () => {
await esServer.stop();
await root.shutdown();
});
});
const customOld: CounterAttributes[] = [
// domainId, counterName, counterType, source, count, namespace?
[`testCounter|retention_${CUSTOM_RETENTION}`, 'a', 'count', 'server', 198],
];
const customRecent: CounterAttributes[] = [
[`testCounter|retention_${CUSTOM_RETENTION}`, 'a', 'count', 'server', 199],
];
const old: CounterAttributes[] = [
['testCounter|domain1', 'a', 'count', 'server', 28],
['testCounter|domain1', 'b', 'count', 'server', 29, 'one'],
['testCounter|domain1', 'b', 'count', 'server', 30, 'two'],
['testCounter|domain2', 'a', 'count', 'server', 31],
];
const recent: CounterAttributes[] = [
// domainId, counterName, counterType, source, count, namespace?
['testCounter|domain1', 'a', 'count', 'server', 32],
['testCounter|domain1', 'b', 'count', 'server', 33, 'one'],
['testCounter|domain1', 'b', 'count', 'server', 34, 'two'],
['testCounter|domain2', 'a', 'count', 'server', 35],
['testCounter|domain2', 'c', 'count', 'server', 36],
];
async function createTestCounters(repo: ISavedObjectsRepository, client: ElasticsearchClient) {
await createCounters(repo, client, CUSTOM_OLD_ISO, customOld.map(toCounterMetric));
await createCounters(repo, client, CUSTOM_RECENT_ISO, customRecent.map(toCounterMetric));
await createCounters(repo, client, OLD_ISO, old.map(toCounterMetric));
await createCounters(repo, client, RECENT_ISO, recent.map(toCounterMetric));
}

View file

@ -0,0 +1,273 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import moment from 'moment';
import type { ISavedObjectsRepository, ElasticsearchClient } from '@kbn/core/server';
import {
type TestElasticsearchUtils,
type TestKibanaUtils,
createTestServers,
createRootWithCorePlugins,
} from '@kbn/core-test-helpers-kbn-server';
import { serializeCounterKey, USAGE_COUNTERS_SAVED_OBJECT_TYPE } from '../..';
import { type CounterAttributes, createCounters, toCounterMetric } from './counter_utils';
import type { UsageCounterSnapshot } from '../types';
import { searchUsageCounters } from '../search';
import { orderBy } from 'lodash';
// domainId, counterName, counterType, source, count, namespace?
const FIRST_DAY_COUNTERS: CounterAttributes[] = [
['dashboards', 'aDashboardId', 'viewed', 'server', 10, 'first'],
['dashboards', 'aDashboardId', 'edited', 'server', 5, 'first'],
['dashboards', 'aDashboardId', 'viewed', 'server', 100, 'second'],
['dashboards', 'aDashboardId', 'edited', 'server', 50, 'second'],
['dashboards', 'aDashboardId', 'consoleErrors', 'ui', 3, 'first'],
['dashboards', 'aDashboardId', 'consoleErrors', 'ui', 9, 'second'],
['dashboards', 'list', 'viewed', 'ui', 256, 'default'],
['someDomain', 'someCounterName', 'someCounter', 'server', 13, 'first'],
];
const SECOND_DAY_COUNTERS: CounterAttributes[] = [
['dashboards', 'aDashboardId', 'viewed', 'server', 11, 'first'],
['dashboards', 'aDashboardId', 'edited', 'server', 6, 'first'],
['dashboards', 'aDashboardId', 'viewed', 'server', 101, 'second'],
['dashboards', 'aDashboardId', 'edited', 'server', 51, 'second'],
['dashboards', 'aDashboardId', 'consoleErrors', 'ui', 4, 'first'],
['dashboards', 'aDashboardId', 'consoleErrors', 'ui', 10, 'second'],
['dashboards', 'someGlobalServerCounter', 'count', 'server', 28],
['dashboards', 'someGlobalUiCounter', 'count', 'ui', 14],
['dashboards', 'list', 'viewed', 'ui', 257, 'default'],
['someDomain', 'someCounterName', 'someCounter', 'server', 14, 'first'],
];
const THIRD_DAY_COUNTERS: CounterAttributes[] = [
['dashboards', 'someGlobalServerCounter', 'count', 'server', 29],
['dashboards', 'someGlobalUiCounter', 'count', 'ui', 15],
['someDomain', 'someCounterName', 'someCounter', 'server', 15, 'first'],
];
describe('usage-counters#search', () => {
let esServer: TestElasticsearchUtils;
let root: TestKibanaUtils['root'];
let internalRepository: ISavedObjectsRepository;
beforeAll(async () => {
const { startES } = createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
});
esServer = await startES();
root = createRootWithCorePlugins();
await root.preboot();
await root.setup();
const start = await root.start();
internalRepository = start.savedObjects.createInternalRepository([
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
]);
await createTestCounters(internalRepository, start.elasticsearch.client.asInternalUser);
});
describe('namespace agnostic search', () => {
it('returns counters in the default namespace', async () => {
const dashboardsNoNamespace = await searchUsageCounters(internalRepository, {
filters: {
domainId: 'dashboards',
},
});
expect(
dashboardsNoNamespace.counters.every(
({ domainId, namespace }) => domainId === 'dashboards' && namespace === 'default'
)
).toEqual(true);
expectToMatchKeys(dashboardsNoNamespace.counters, [
'dashboards:list:viewed:ui - 513 hits',
'dashboards:someGlobalServerCounter:count:server - 57 hits',
'dashboards:someGlobalUiCounter:count:ui - 29 hits',
]);
// check that the daily records are sorted descendingly
expect(
dashboardsNoNamespace.counters.find(
({ counterName }) => counterName === 'someGlobalUiCounter'
)!.records
).toMatchInlineSnapshot(`
Array [
Object {
"count": 15,
"updatedAt": "2024-07-03T10:00:00.000Z",
},
Object {
"count": 14,
"updatedAt": "2024-07-02T10:00:00.000Z",
},
]
`);
});
});
describe('namespace search', () => {
it('returns all counters that match namespace', async () => {
const dashboardsFirstNamespace = await searchUsageCounters(internalRepository, {
filters: {
domainId: 'dashboards',
namespace: 'first',
},
});
expect(
dashboardsFirstNamespace.counters.every(
({ domainId, namespace }) => domainId === 'dashboards' && namespace === 'first'
)
).toEqual(true);
expectToMatchKeys(dashboardsFirstNamespace.counters, [
'first:dashboards:aDashboardId:viewed:server - 21 hits',
'first:dashboards:aDashboardId:edited:server - 11 hits',
'first:dashboards:aDashboardId:consoleErrors:ui - 7 hits',
]);
});
it('does not return counters that belong to other namespaces', async () => {
const someDomainSecondNamespace = await searchUsageCounters(internalRepository, {
filters: {
domainId: 'someDomain',
namespace: 'second',
},
});
expect(someDomainSecondNamespace.counters).toEqual([]);
});
});
describe('specific counter search', () => {
it('allows searching for specific counters (name + type) on specific namespaces', async () => {
const dashboardsByName = await searchUsageCounters(internalRepository, {
filters: {
domainId: 'dashboards',
counterName: 'aDashboardId',
counterType: 'viewed',
source: 'server',
namespace: 'second',
},
});
expect(
dashboardsByName.counters.every(
({ domainId, counterName, counterType, source, namespace }) =>
domainId === 'dashboards' &&
counterName === 'aDashboardId' &&
counterType === 'viewed' &&
source === 'server' &&
namespace === 'second'
)
).toEqual(true);
expectToMatchKeys(dashboardsByName.counters, [
'second:dashboards:aDashboardId:viewed:server - 201 hits',
]);
});
});
describe('date filters', () => {
it('allow searching for counters that are more recent than the given date', async () => {
const from = moment('2024-07-03T00:00:00.000Z');
const dashboardsFrom = await searchUsageCounters(internalRepository, {
filters: {
domainId: 'dashboards',
from: '2024-07-03T00:00:00.000Z',
},
});
expect(
dashboardsFrom.counters.every(
({ domainId, records }) =>
domainId === 'dashboards' &&
records.every(({ updatedAt }) => moment(updatedAt).diff(from) > 0)
)
).toEqual(true);
expectToMatchKeys(dashboardsFrom.counters, [
'dashboards:someGlobalServerCounter:count:server - 29 hits',
'dashboards:someGlobalUiCounter:count:ui - 15 hits',
]);
});
});
describe('PIT search', () => {
it('allows retrieving all counters in batches', async () => {
const allDashboards = await searchUsageCounters(internalRepository, {
filters: {
domainId: 'dashboards',
namespace: '*',
},
options: {
// we are forcing the logic to perform lots of requests to ES
// each of them retrieving just a single result, just for the sake of testing
perPage: 1,
},
});
expectToMatchKeys(allDashboards.counters, [
'dashboards:list:viewed:ui - 513 hits',
'second:dashboards:aDashboardId:viewed:server - 201 hits',
'second:dashboards:aDashboardId:edited:server - 101 hits',
'dashboards:someGlobalServerCounter:count:server - 57 hits',
'dashboards:someGlobalUiCounter:count:ui - 29 hits',
'first:dashboards:aDashboardId:viewed:server - 21 hits',
'second:dashboards:aDashboardId:consoleErrors:ui - 19 hits',
'first:dashboards:aDashboardId:edited:server - 11 hits',
'first:dashboards:aDashboardId:consoleErrors:ui - 7 hits',
]);
});
});
afterAll(async () => {
await esServer.stop();
await root.shutdown();
});
});
async function createTestCounters(
internalRepository: ISavedObjectsRepository,
esClient: ElasticsearchClient
) {
// insert a bunch of usage counters in multiple namespaces
await createCounters(
internalRepository,
esClient,
'2024-07-01T10:00:00.000Z',
FIRST_DAY_COUNTERS.map(toCounterMetric)
);
await createCounters(
internalRepository,
esClient,
'2024-07-02T10:00:00.000Z',
SECOND_DAY_COUNTERS.map(toCounterMetric)
);
await createCounters(
internalRepository,
esClient,
'2024-07-03T10:00:00.000Z',
THIRD_DAY_COUNTERS.map(toCounterMetric)
);
}
function expectToMatchKeys(counters: UsageCounterSnapshot[], keys: string[]) {
expect(counters.length).toEqual(keys.length);
// the counter snapshots do not include a single date. We match a date agnostic key
expect(
orderBy(
counters.map((counter) => ({ ...counter, key: serializeCounterKey(counter) })),
['count', 'key'],
['desc', 'asc']
).map(({ key, count }) => `${key.substring(0, key.length - 9)} - ${count} hits`)
).toEqual(keys);
}

View file

@ -7,9 +7,9 @@
*/
/**
* Roll indices every 24h
* Roll indices every hour
*/
export const ROLL_INDICES_INTERVAL = 24 * 60 * 60 * 1000;
export const ROLL_INDICES_INTERVAL = 3600_000;
/**
* Start rolling indices after 5 minutes up

View file

@ -7,16 +7,29 @@
*/
import { type Observable, timer, takeUntil } from 'rxjs';
import { Logger, ISavedObjectsRepository } from '@kbn/core/server';
import type { Logger, ISavedObjectsRepository } from '@kbn/core/server';
import { ROLL_INDICES_INTERVAL, ROLL_INDICES_START } from './constants';
import { rollUsageCountersIndices } from './rollups';
import { IUsageCounter } from '../usage_counter';
export function registerUsageCountersRollups(
logger: Logger,
getSavedObjectsClient: () => ISavedObjectsRepository | undefined,
pluginStop$: Observable<void>
) {
export function registerUsageCountersRollups({
logger,
getRegisteredUsageCounters,
internalRepository,
pluginStop$,
}: {
logger: Logger;
getRegisteredUsageCounters: () => IUsageCounter[];
internalRepository: ISavedObjectsRepository;
pluginStop$: Observable<void>;
}) {
timer(ROLL_INDICES_START, ROLL_INDICES_INTERVAL)
.pipe(takeUntil(pluginStop$))
.subscribe(() => rollUsageCountersIndices(logger, getSavedObjectsClient()));
.subscribe(() =>
rollUsageCountersIndices({
logger,
getRegisteredUsageCounters,
internalRepository,
})
);
}

View file

@ -0,0 +1,205 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import moment from 'moment';
import { savedObjectsRepositoryMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { rollUsageCountersIndices } from './rollups';
import { USAGE_COUNTERS_SAVED_OBJECT_TYPE } from '..';
import { createMockSavedObjectDoc } from '../saved_objects.test';
import type { IUsageCounter } from '../usage_counter';
describe('rollUsageCountersIndices', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
let internalRepository: ReturnType<typeof savedObjectsRepositoryMock.create>;
let getRegisteredUsageCounters: () => IUsageCounter[];
beforeEach(() => {
logger = loggingSystemMock.createLogger();
internalRepository = savedObjectsRepositoryMock.create();
getRegisteredUsageCounters = () => [
{
domainId: 'testDomain',
incrementCounter: jest.fn(),
},
{
domainId: 'retention_3',
retentionPeriodDays: 3,
incrementCounter: jest.fn(),
},
];
});
it('returns undefined if no savedObjectsClient initialised yet', async () => {
await expect(
rollUsageCountersIndices({
logger,
getRegisteredUsageCounters,
internalRepository: undefined,
})
).resolves.toBe(undefined);
expect(logger.warn).toHaveBeenCalledTimes(0);
});
it('does not delete any documents on empty saved objects', async () => {
internalRepository.find.mockImplementation(async ({ type, page = 1, perPage = 10 }) => {
switch (type) {
case USAGE_COUNTERS_SAVED_OBJECT_TYPE:
return { saved_objects: [], total: 0, page, per_page: perPage };
default:
throw new Error(`Unexpected type [${type}]`);
}
});
await expect(
rollUsageCountersIndices({ logger, getRegisteredUsageCounters, internalRepository })
).resolves.toEqual(0);
expect(internalRepository.find).toHaveBeenCalledTimes(getRegisteredUsageCounters().length);
expect(internalRepository.bulkDelete).not.toBeCalled();
expect(logger.warn).toHaveBeenCalledTimes(0);
expect(logger.debug).toHaveBeenCalledTimes(0);
});
it(`deletes documents older than the retention period`, async () => {
const mockSavedObjects = [
createMockSavedObjectDoc(moment().subtract(5, 'days'), 'doc-id-0', 'testDomain'),
createMockSavedObjectDoc(moment().subtract(9, 'days'), 'doc-id-1', 'testDomain'), // old
createMockSavedObjectDoc(moment().subtract(2, 'days'), 'doc-id-2', 'retention_3'),
createMockSavedObjectDoc(moment().subtract(4, 'days'), 'doc-id-3', 'retention_3'), // old
createMockSavedObjectDoc(moment().subtract(6, 'days'), 'doc-id-4', 'testDomain', 'secondary'), // old
];
internalRepository.find.mockImplementationOnce(async ({ type, page = 1, perPage = 10 }) => {
switch (type) {
case USAGE_COUNTERS_SAVED_OBJECT_TYPE:
return {
saved_objects: [mockSavedObjects[1], mockSavedObjects[4]],
total: mockSavedObjects.length,
page,
per_page: perPage,
};
default:
throw new Error(`Unexpected type [${type}]`);
}
});
internalRepository.find.mockImplementationOnce(async ({ type, page = 1, perPage = 10 }) => {
switch (type) {
case USAGE_COUNTERS_SAVED_OBJECT_TYPE:
return {
saved_objects: [mockSavedObjects[3]],
total: mockSavedObjects.length,
page,
per_page: perPage,
};
default:
throw new Error(`Unexpected type [${type}]`);
}
});
await expect(
rollUsageCountersIndices({ logger, getRegisteredUsageCounters, internalRepository })
).resolves.toEqual(3);
expect(internalRepository.find).toHaveBeenCalledTimes(getRegisteredUsageCounters().length);
expect(internalRepository.find).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ type: USAGE_COUNTERS_SAVED_OBJECT_TYPE })
);
expect(internalRepository.find).toHaveBeenNthCalledWith(
2,
expect.objectContaining({ type: USAGE_COUNTERS_SAVED_OBJECT_TYPE })
);
expect(internalRepository.bulkDelete).toHaveBeenCalledTimes(3);
expect(internalRepository.bulkDelete.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
Array [
Object {
"id": "doc-id-1",
"type": "usage-counter",
},
],
Object {
"namespace": "default",
},
],
Array [
Array [
Object {
"id": "doc-id-4",
"type": "usage-counter",
},
],
Object {
"namespace": "secondary",
},
],
Array [
Array [
Object {
"id": "doc-id-3",
"type": "usage-counter",
},
],
Object {
"namespace": "default",
},
],
]
`);
expect(logger.warn).toHaveBeenCalledTimes(0);
});
it(`logs warnings on savedObject.find failure`, async () => {
internalRepository.find.mockImplementation(async () => {
throw new Error(`Expected error!`);
});
await expect(
rollUsageCountersIndices({ logger, getRegisteredUsageCounters, internalRepository })
).resolves.toEqual(0);
// we abort operation if the find for a given domain fails
expect(internalRepository.find).toHaveBeenCalledTimes(1);
expect(internalRepository.bulkDelete).not.toBeCalled();
expect(logger.warn).toHaveBeenCalledTimes(2);
});
it(`logs warnings on savedObject.delete failure`, async () => {
const mockSavedObjects = [
createMockSavedObjectDoc(moment().subtract(7, 'days'), 'doc-id-6', 'testDomain'),
];
internalRepository.find.mockImplementationOnce(async ({ type, page = 1, perPage = 10 }) => {
switch (type) {
case USAGE_COUNTERS_SAVED_OBJECT_TYPE:
return { saved_objects: mockSavedObjects, total: 0, page, per_page: perPage };
default:
throw new Error(`Unexpected type [${type}]`);
}
});
internalRepository.delete.mockImplementationOnce(async () => {
throw new Error(`Expected error!`);
});
await expect(
rollUsageCountersIndices({ logger, getRegisteredUsageCounters, internalRepository })
).resolves.toEqual(1);
expect(internalRepository.find).toHaveBeenCalledTimes(2);
expect(internalRepository.bulkDelete).toHaveBeenCalledTimes(1);
expect(internalRepository.bulkDelete.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
Array [
Object {
"id": "doc-id-6",
"type": "usage-counter",
},
],
Object {
"namespace": "default",
},
],
]
`);
expect(logger.warn).toHaveBeenCalledTimes(2);
});
});

View file

@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import moment from 'moment';
import type { ISavedObjectsRepository, Logger, SavedObjectsFindOptions } from '@kbn/core/server';
import { groupBy } from 'lodash';
import { USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS } from './constants';
import { type UsageCountersSavedObjectAttributes, USAGE_COUNTERS_SAVED_OBJECT_TYPE } from '..';
import type { IUsageCounter } from '../usage_counter';
import { usageCountersSearchParamsToKueryFilter } from '../common/kuery_utils';
// Process 1000 at a time as a compromise of speed and overload
const ROLLUP_BATCH_SIZE = 1000;
export async function rollUsageCountersIndices({
logger,
getRegisteredUsageCounters,
internalRepository,
now = moment(),
}: {
logger: Logger;
getRegisteredUsageCounters: () => IUsageCounter[];
internalRepository?: ISavedObjectsRepository;
now?: moment.Moment;
}) {
if (!internalRepository) {
return;
}
let cleanupCounter = 0;
try {
const counterQueue = getRegisteredUsageCounters();
while (counterQueue.length > 0) {
const counter = counterQueue.shift()!;
const findParams: SavedObjectsFindOptions = {
type: USAGE_COUNTERS_SAVED_OBJECT_TYPE,
filter: usageCountersSearchParamsToKueryFilter({
domainId: counter.domainId,
to: moment(now)
// get documents that are OLDER than the retention period
.subtract(
1 + (counter.retentionPeriodDays ?? USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS),
'days'
)
.toISOString(),
}),
sortField: 'updated_at',
sortOrder: 'asc',
namespaces: ['*'],
perPage: ROLLUP_BATCH_SIZE,
};
const { saved_objects: rawUiCounterDocs } =
await internalRepository.find<UsageCountersSavedObjectAttributes>(findParams);
if (rawUiCounterDocs.length) {
const toDelete = rawUiCounterDocs.map(({ id, type, namespaces }) => ({
id,
type,
namespace: namespaces?.[0] ?? 'default',
}));
cleanupCounter += toDelete.length;
logger.debug(
`[Rollups] Cleaning ${toDelete.length} old Usage Counters saved objects under domain '${counter.domainId}'`
);
const toDeleteByNamespace = groupBy(toDelete, 'namespace');
// perform a Bulk delete for each namespace
await Promise.all(
Object.entries(toDeleteByNamespace).map(([namespace, counters]) =>
internalRepository.bulkDelete(
counters.map(({ namespace: _, ...props }) => ({ ...props })),
{ namespace }
)
)
);
if (toDelete.length === ROLLUP_BATCH_SIZE) {
// we found a lot of old Usage Counters, put the counter back in the queue, as there might be more
counterQueue.push(counter);
}
}
}
} catch (err) {
logger.warn(`Failed to rollup Usage Counters saved objects.`);
logger.warn(err);
}
if (cleanupCounter) {
logger.debug(`[Rollups] Cleaned ${cleanupCounter} Usage Counters saved objects`);
}
return cleanupCounter;
}

View file

@ -6,15 +6,15 @@
* Side Public License, v 1.
*/
import { serializeCounterKey, storeCounter } from './saved_objects';
import { savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
import { UsageCounters } from '../../common';
import moment from 'moment';
import { savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
import { serializeCounterKey, storeCounter } from './saved_objects';
import type { UsageCounters } from '../../common';
import type { SavedObjectsFindResult } from '@kbn/core/server';
import { type UsageCountersSavedObjectAttributes, USAGE_COUNTERS_SAVED_OBJECT_TYPE } from '..';
describe('counterKey', () => {
test('#serializeCounterKey returns a serialized string', () => {
test('#serializeCounterKey returns a serialized string that omits default namespace', () => {
const result = serializeCounterKey({
domainId: 'a',
counterName: 'b',
@ -24,7 +24,20 @@ describe('counterKey', () => {
date: moment('09042021', 'DDMMYYYY'),
});
expect(result).toEqual('a:b:c:ui:20210409:default');
expect(result).toEqual('a:b:c:ui:20210409');
});
test('#serializeCounterKey returns a serialized string for non-default namespaces', () => {
const result = serializeCounterKey({
domainId: 'a',
counterName: 'b',
counterType: 'c',
namespace: 'second',
source: 'ui',
date: moment('09042021', 'DDMMYYYY'),
});
expect(result).toEqual('second:a:b:c:ui:20210409');
});
});
@ -77,3 +90,26 @@ describe('storeCounter', () => {
`);
});
});
export const createMockSavedObjectDoc = (
updatedAt: moment.Moment,
id: string,
domainId: string,
namespace?: string
) =>
({
id,
type: USAGE_COUNTERS_SAVED_OBJECT_TYPE,
...(namespace && { namespaces: [namespace] }),
attributes: {
count: 3,
domainId,
counterName: 'testName',
counterType: 'count',
source: 'server',
},
references: [],
updated_at: updatedAt.format(),
version: 'WzI5LDFd',
score: 0,
} as SavedObjectsFindResult<UsageCountersSavedObjectAttributes>);

View file

@ -8,25 +8,18 @@
import moment from 'moment';
import { USAGE_COUNTERS_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
import { DEFAULT_NAMESPACE_STRING } from '@kbn/core-saved-objects-utils-server';
import type {
SavedObject,
SavedObjectsRepository,
SavedObjectsServiceSetup,
} from '@kbn/core/server';
import { UsageCounters } from '../../common';
import type { UsageCounters } from '../../common';
/**
* The attributes stored in the UsageCounters' SavedObjects
*/
export interface UsageCountersSavedObjectAttributes {
/** The domain ID registered in the Usage Counter **/
domainId: string;
/** The counter name **/
counterName: string;
/** The counter type **/
counterType: string;
/** The source of the event that is being counted: 'server' | 'ui' **/
source: string;
export interface UsageCountersSavedObjectAttributes extends UsageCounters.v1.AbstractCounter {
/** Number of times the event has occurred **/
count: number;
}
@ -77,17 +70,7 @@ export const registerUsageCountersSavedObjectTypes = (
* Parameters to the `serializeCounterKey` method
* @internal used in kibana_usage_collectors
*/
export interface SerializeCounterKeyParams {
/** The domain ID registered in the UsageCounter **/
domainId: string;
/** The counter name **/
counterName: string;
/** The counter type **/
counterType: string;
/** The namespace of this counter */
namespace?: string;
/** The source of the event we are counting */
source: string;
export interface SerializeCounterKeyParams extends UsageCounters.v1.AbstractCounter {
/** The date to which serialize the key (defaults to 'now') **/
date?: moment.MomentInput;
}
@ -97,19 +80,17 @@ export interface SerializeCounterKeyParams {
* @internal used in kibana_usage_collectors
* @param opts {@link SerializeCounterKeyParams}
*/
export const serializeCounterKey = ({
domainId,
counterName,
counterType,
namespace,
source,
date,
}: SerializeCounterKeyParams) => {
export const serializeCounterKey = (params: SerializeCounterKeyParams) => {
const { domainId, counterName, counterType, namespace, source, date } = params;
const dayDate = moment(date).format('YYYYMMDD');
// e.g. 'dashboards:viewed:total:ui:20240628' // namespace-agnostic counters
// e.g. 'dashboards:viewed:total:ui:20240628:default' // namespaced counters
const namespaceSuffix = namespace ? `:${namespace}` : '';
return `${domainId}:${counterName}:${counterType}:${source}:${dayDate}${namespaceSuffix}`;
if (namespace && namespace !== DEFAULT_NAMESPACE_STRING) {
// e.g. 'someNamespace:dashboards:viewed:total:ui:20240628'
return `${namespace}:${domainId}:${counterName}:${counterType}:${source}:${dayDate}`;
} else {
// e.g. 'dashboards:viewed:total:ui:20240628'
return `${domainId}:${counterName}:${counterType}:${source}:${dayDate}`;
}
};
export interface StoreCounterParams {

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export { searchUsageCounters } from './search';

View file

@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { SavedObjectsFindResult } from '@kbn/core-saved-objects-api-server';
import type { UsageCounters } from '../../../common';
import type { UsageCountersSavedObjectAttributes } from '../saved_objects';
// domainId, counterName, counterType, source, count, namespace?
export type CounterAttributes = [
string,
string,
string,
UsageCounters.v1.CounterEventSource,
number,
string?
];
export const mockedUsageCounters: Array<
SavedObjectsFindResult<UsageCountersSavedObjectAttributes>
> = [
toSOFR('2024-07-08T10:00:00.000Z', 'foo', 'bar', 'count', 'server', 28, 'default'),
toSOFR('2024-07-07T10:00:00.000Z', 'foo', 'bar', 'count', 'server', 27, 'default'),
toSOFR('2024-07-06T10:00:00.000Z', 'foo', 'bar', 'count', 'server', 26, 'default'),
toSOFR('2024-07-05T10:00:00.000Z', 'foo', 'bar', 'count', 'server', 25, 'default'),
toSOFR('2024-07-04T10:00:00.000Z', 'foo', 'bar', 'count', 'server', 24, 'default'),
toSOFR('2024-07-03T10:00:00.000Z', 'foo', 'bar', 'count', 'server', 23, 'default'),
];
function toSOFR(
isoDate: string,
...attrs: CounterAttributes
): SavedObjectsFindResult<UsageCountersSavedObjectAttributes> {
const [domainId, counterName, counterType, source, count, namespace] = attrs;
return {
id: 'someId',
type: 'usage-counter',
...(namespace && namespace !== 'default' && { namespaces: [namespace[0]] }),
attributes: {
domainId,
counterName,
counterType,
source,
count,
},
updated_at: isoDate,
references: [],
score: 0,
};
}

View file

@ -0,0 +1,185 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
import { searchUsageCounters } from './search';
import { mockedUsageCounters } from './search.fixtures';
describe('searchUsageCounters', () => {
let internalRepository: ReturnType<typeof savedObjectsRepositoryMock.create>;
beforeEach(() => {
internalRepository = savedObjectsRepositoryMock.create();
});
it('calls repository.find() with the right params', async () => {
internalRepository.find.mockResolvedValueOnce({
page: 1,
per_page: 100,
total: 8,
saved_objects: [],
});
await searchUsageCounters(internalRepository, {
filters: {
domainId: 'foo',
counterName: 'bar',
counterType: 'count',
from: '2024-07-03T10:00:00.000Z',
source: 'server',
},
});
expect(internalRepository.find).toHaveBeenCalledTimes(1);
expect(internalRepository.find.mock.calls[0][0]).toMatchInlineSnapshot(`
Object {
"filter": Object {
"arguments": Array [
Object {
"arguments": Array [
Object {
"isQuoted": false,
"type": "literal",
"value": "usage-counter.attributes.domainId",
},
Object {
"isQuoted": false,
"type": "literal",
"value": "foo",
},
],
"function": "is",
"type": "function",
},
Object {
"arguments": Array [
Object {
"isQuoted": false,
"type": "literal",
"value": "usage-counter.attributes.counterName",
},
Object {
"isQuoted": false,
"type": "literal",
"value": "bar",
},
],
"function": "is",
"type": "function",
},
Object {
"arguments": Array [
Object {
"isQuoted": false,
"type": "literal",
"value": "usage-counter.attributes.counterType",
},
Object {
"isQuoted": false,
"type": "literal",
"value": "count",
},
],
"function": "is",
"type": "function",
},
Object {
"arguments": Array [
Object {
"isQuoted": false,
"type": "literal",
"value": "usage-counter.attributes.source",
},
Object {
"isQuoted": false,
"type": "literal",
"value": "server",
},
],
"function": "is",
"type": "function",
},
Object {
"arguments": Array [
Object {
"isQuoted": false,
"type": "literal",
"value": "usage-counter.updated_at",
},
"gte",
Object {
"isQuoted": false,
"type": "literal",
"value": "2024-07-03T10:00:00.000Z",
},
],
"function": "range",
"type": "function",
},
],
"function": "and",
"type": "function",
},
"perPage": 100,
"pit": Object {
"id": "some_pit_id",
},
"type": "usage-counter",
}
`);
});
it('aggregates the usage counters with the same ID/namespace', async () => {
internalRepository.find.mockResolvedValueOnce({
page: 1,
per_page: 1000,
total: 8,
saved_objects: mockedUsageCounters,
});
const res = await searchUsageCounters(internalRepository, { filters: { domainId: 'foo' } });
expect(res.counters).toMatchInlineSnapshot(`
Array [
Object {
"count": 153,
"counterName": "bar",
"counterType": "count",
"domainId": "foo",
"records": Array [
Object {
"count": 28,
"updatedAt": "2024-07-08T10:00:00.000Z",
},
Object {
"count": 27,
"updatedAt": "2024-07-07T10:00:00.000Z",
},
Object {
"count": 26,
"updatedAt": "2024-07-06T10:00:00.000Z",
},
Object {
"count": 25,
"updatedAt": "2024-07-05T10:00:00.000Z",
},
Object {
"count": 24,
"updatedAt": "2024-07-04T10:00:00.000Z",
},
Object {
"count": 23,
"updatedAt": "2024-07-03T10:00:00.000Z",
},
],
"source": "server",
},
]
`);
});
});

View file

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { orderBy } from 'lodash';
import { SortResults } from '@elastic/elasticsearch/lib/api/types';
import type {
ISavedObjectsRepository,
SavedObjectsFindOptions,
SavedObjectsFindResult,
} from '@kbn/core-saved-objects-api-server';
import {
serializeCounterKey,
type UsageCountersSavedObjectAttributes,
USAGE_COUNTERS_SAVED_OBJECT_TYPE,
} from '../saved_objects';
import type {
UsageCounterSnapshot,
UsageCountersSearchParams,
UsageCountersSearchResult,
} from '../types';
import { usageCountersSearchParamsToKueryFilter } from '../common/kuery_utils';
export async function searchUsageCounters(
repository: ISavedObjectsRepository,
params: UsageCountersSearchParams
): Promise<UsageCountersSearchResult> {
const { filters, options = {} } = params;
const { namespace: filterNamespace } = filters;
const baseFindParams: SavedObjectsFindOptions = {
...(filterNamespace && { namespaces: [filterNamespace] }),
type: USAGE_COUNTERS_SAVED_OBJECT_TYPE,
filter: usageCountersSearchParamsToKueryFilter(filters),
perPage: options.perPage || 100,
};
// create a PIT to perform consecutive searches
const pit = await repository.openPointInTimeForType(USAGE_COUNTERS_SAVED_OBJECT_TYPE);
// create a data structure to store/aggregate all counters
const countersMap = new Map<string, UsageCounterSnapshot>();
// the current offset for the iterative search
let searchAfter: SortResults | undefined;
do {
const findParams: SavedObjectsFindOptions = {
...baseFindParams,
pit,
...(searchAfter && { searchAfter }),
};
// this is where the actual search call is performed
const res = await repository.find<UsageCountersSavedObjectAttributes>(findParams);
res.saved_objects.forEach((result) => processResult(countersMap, result));
searchAfter = res.saved_objects.pop()?.sort;
} while (searchAfter);
await repository.closePointInTime(pit.id);
const counters = Array.from(countersMap.values());
// sort daily counters descending
counters.forEach(
(snapshot) => (snapshot.records = orderBy(snapshot.records, 'updatedAt', 'desc'))
);
return {
counters,
};
}
function processResult(
countersMap: Map<string, UsageCounterSnapshot>,
result: SavedObjectsFindResult<UsageCountersSavedObjectAttributes>
) {
const { attributes, updated_at: updatedAt, namespaces } = result;
const namespace = namespaces?.[0];
const key = serializeCounterKey({ ...attributes, namespace });
let counterSnapshot = countersMap.get(key);
if (!counterSnapshot) {
counterSnapshot = {
domainId: attributes.domainId,
counterName: attributes.counterName,
counterType: attributes.counterType,
source: attributes.source,
...(namespace && namespaces?.[0] && { namespace: namespaces[0] }),
records: [
{
updatedAt: updatedAt!,
count: attributes.count,
},
],
count: attributes.count,
};
countersMap.set(key, counterSnapshot!);
} else {
counterSnapshot.records.push({
updatedAt: updatedAt!,
count: attributes.count,
});
counterSnapshot.count += attributes.count;
}
}

View file

@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { UsageCounters } from '../../common/types';
import type { UsageCounter } from '.';
export interface CreateUsageCounterParams {
/**
* Number of days a usage counter must be kept in the persistence layer.
* See USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS for default value.
*/
retentionPeriodDays?: number;
}
/**
* Provides the necessary tools to create and incremement Usage Counters
*/
export interface UsageCountersServiceSetup {
/**
* Returns a usage counter by domainId
*/
getUsageCounterByDomainId: (domainId: string) => UsageCounter | undefined;
/**
* Registers a usage counter to collect daily aggregated plugin counter events
*/
createUsageCounter: (domainId: string, params?: CreateUsageCounterParams) => UsageCounter;
}
export interface UsageCountersSearchParams {
/** A set of filters to limit the results of the search operation */
filters: UsageCountersSearchFilters;
/** A set of options to modify the behavior of the search operation */
options?: UsageCountersSearchOptions;
}
export interface UsageCountersSearchFilters {
/** The domainId used to create the Counter API */
domainId: string;
/** The name of the counter. Optional, will return all counters in the same domainId that match the rest of filters if omitted */
counterName?: string;
/** The type of counter. Optional, will return all counters in the same domainId that match the rest of filters if omitted */
counterType?: string;
/** Namespace of the counter. Optional, counters of the 'default' namespace will be returned if omitted */
namespace?: string;
/** ISO date string to limit search results: get counters that are more recent than the provided date (if specified) */
from?: string;
/** ISO date string to limit search results: get counters that are older than the provided date (if specified) */
to?: string;
/** Return counters from a given source only. Optional, both 'ui' and 'server' counters will be returned if omitted */
source?: 'server' | 'ui';
}
export interface UsageCountersSearchOptions {
/** Number of counters to retrieve per page, when querying ES (defaults to 100) */
perPage?: number;
}
/**
* The result of a Usage Counters search operation
*/
export interface UsageCountersSearchResult {
/**
* The counters that matched the search criteria
*/
counters: UsageCounterSnapshot[];
}
/**
* Represents the current state of a Usage Counter at a given point in time
*/
export interface UsageCounterSnapshot extends UsageCounters.v1.AbstractCounter {
/** List of daily records captured for this counter */
records: UsageCounterRecord[];
/** Number of events captured (adds up all records) */
count: number;
}
/**
* Number of events counted on a given day
*/
export interface UsageCounterRecord {
/** Date where the counter was last updated */
updatedAt: string;
/** Number of events captured on that day */
count: number;
}
/**
* Interface to allow searching for persisted usage-counters
*/
export interface UsageCountersServiceStart {
search: (params: UsageCountersSearchParams) => Promise<UsageCountersSearchResult>;
}

View file

@ -12,6 +12,7 @@ import type { UsageCounters } from '../../common';
export interface UsageCounterParams {
domainId: string;
counter$: Rx.Subject<UsageCounters.v1.CounterMetric>;
retentionPeriodDays?: number;
}
/**
@ -20,6 +21,16 @@ export interface UsageCounterParams {
* API whenever the event happens.
*/
export interface IUsageCounter {
/**
* Defines a domainId (aka a namespace) under which multiple counters can be stored
*/
domainId: string;
/**
* Defines custom retention period for the counters under this domain.
* This is the number of days worth of counters that must be kept in the system indices.
* See USAGE_COUNTERS_KEEP_DOCS_FOR_DAYS for default value
*/
retentionPeriodDays?: number;
/**
* Notifies the counter about a new event happening so it can increase the count internally.
* @param params {@link IncrementCounterParams}
@ -28,12 +39,14 @@ export interface IUsageCounter {
}
export class UsageCounter implements IUsageCounter {
private domainId: string;
public readonly domainId: string;
private counter$: Rx.Subject<UsageCounters.v1.CounterMetric>;
public readonly retentionPeriodDays?: number | undefined;
constructor({ domainId, counter$ }: UsageCounterParams) {
constructor({ domainId, counter$, retentionPeriodDays }: UsageCounterParams) {
this.domainId = domainId;
this.counter$ = counter$;
this.retentionPeriodDays = retentionPeriodDays;
}
public incrementCounter = (params: UsageCounters.v1.IncrementCounterParams) => {

View file

@ -7,8 +7,9 @@
*/
import type { PublicMethodsOf } from '@kbn/utility-types';
import type { UsageCountersService, UsageCountersServiceSetup } from './usage_counters_service';
import type { UsageCountersService } from './usage_counters_service';
import type { UsageCounter } from './usage_counter';
import type { UsageCountersServiceSetup } from './types';
const createSetupContractMock = () => {
const setupContract: jest.Mocked<UsageCountersServiceSetup> = {
@ -16,9 +17,14 @@ const createSetupContractMock = () => {
getUsageCounterByDomainId: jest.fn(),
};
setupContract.createUsageCounter.mockReturnValue({
incrementCounter: jest.fn(),
} as unknown as jest.Mocked<UsageCounter>);
setupContract.createUsageCounter.mockImplementation(
(domainId: string, params?: { retentionPeriodDays?: number }) =>
({
domainId,
...(params?.retentionPeriodDays && { retentionPeriodDays: params.retentionPeriodDays }),
incrementCounter: jest.fn(),
} as unknown as jest.Mocked<UsageCounter>)
);
return setupContract;
};

View file

@ -7,10 +7,22 @@
*/
/* eslint-disable dot-notation */
import { UsageCountersService } from './usage_counters_service';
import { loggingSystemMock, coreMock } from '@kbn/core/server/mocks';
import * as rxOp from 'rxjs';
import moment from 'moment';
import { loggingSystemMock, coreMock } from '@kbn/core/server/mocks';
import { UsageCountersService } from './usage_counters_service';
jest.mock('./rollups', () => ({
...jest.requireActual('./rollups'),
// used by `rollUsageCountersIndices` to determine if a counter is beyond the retention period
registerUsageCountersRollups: jest.fn(),
}));
import { registerUsageCountersRollups } from './rollups';
const registerUsageCountersRollupsMock = registerUsageCountersRollups as jest.MockedFunction<
typeof registerUsageCountersRollups
>;
const tick = () => {
jest.useRealTimers();
@ -42,7 +54,9 @@ describe('UsageCountersService', () => {
usageCounter.incrementCounter({ counterName: 'counterA' });
usageCounter.incrementCounter({ counterName: 'counterA', namespace: 'second', source: 'ui' });
const dataInSourcePromise = usageCountersService['source$'].pipe(rxOp.toArray()).toPromise();
const dataInSourcePromise = rxOp.firstValueFrom(
usageCountersService['source$'].pipe(rxOp.toArray())
);
usageCountersService['flushCache$'].next();
usageCountersService['source$'].complete();
await expect(dataInSourcePromise).resolves.toHaveLength(2);
@ -54,6 +68,20 @@ describe('UsageCountersService', () => {
expect(coreSetup.savedObjects.registerType).toBeCalledTimes(2);
});
it('triggers regular cleanup of old counters on start', () => {
const usageCountersService = new UsageCountersService({ logger, retryCount, bufferDurationMs });
usageCountersService.start(coreStart);
expect(registerUsageCountersRollupsMock).toHaveBeenCalledTimes(1);
expect(registerUsageCountersRollupsMock).toHaveBeenCalledWith(
expect.objectContaining({
logger: expect.any(Object),
getRegisteredUsageCounters: expect.any(Function),
internalRepository: expect.any(Object),
})
);
});
it('flushes cached data on start', async () => {
const usageCountersService = new UsageCountersService({ logger, retryCount, bufferDurationMs });
@ -69,7 +97,9 @@ describe('UsageCountersService', () => {
usageCounter.incrementCounter({ counterName: 'counterA' });
usageCounter.incrementCounter({ counterName: 'counterA', namespace: 'second', source: 'ui' });
const dataInSourcePromise = usageCountersService['source$'].pipe(rxOp.toArray()).toPromise();
const dataInSourcePromise = rxOp.firstValueFrom(
usageCountersService['source$'].pipe(rxOp.toArray())
);
usageCountersService.start(coreStart);
usageCountersService['source$'].complete();

View file

@ -7,9 +7,9 @@
*/
import * as Rx from 'rxjs';
import * as rxOp from 'rxjs';
import moment from 'moment';
import type {
ISavedObjectsRepository,
SavedObjectsRepository,
SavedObjectsServiceSetup,
SavedObjectsServiceStart,
@ -18,11 +18,20 @@ import type { Logger, LogMeta } from '@kbn/core/server';
import { type IUsageCounter, UsageCounter } from './usage_counter';
import type { UsageCounters } from '../../common';
import type {
UsageCountersServiceSetup,
UsageCountersServiceStart,
UsageCountersSearchParams,
UsageCountersSearchResult,
CreateUsageCounterParams,
} from './types';
import {
registerUsageCountersSavedObjectTypes,
storeCounter,
serializeCounterKey,
registerUsageCountersSavedObjectTypes,
} from './saved_objects';
import { registerUsageCountersRollups } from './rollups';
import { searchUsageCounters } from './search';
interface UsageCountersLogMeta extends LogMeta {
kibana: { usageCounters: { results: unknown[] } };
@ -34,11 +43,6 @@ export interface UsageCountersServiceDeps {
bufferDurationMs: number;
}
export interface UsageCountersServiceSetup {
createUsageCounter: (domainId: string) => IUsageCounter;
getUsageCounterByDomainId: (domainId: string) => IUsageCounter | undefined;
}
/* internal */
export interface UsageCountersServiceSetupDeps {
savedObjects: SavedObjectsServiceSetup;
@ -56,11 +60,12 @@ export class UsageCountersService {
private readonly counterSets = new Map<string, UsageCounter>();
private readonly source$ = new Rx.Subject<UsageCounters.v1.CounterMetric>();
private readonly counter$ = this.source$.pipe(rxOp.multicast(new Rx.Subject()), rxOp.refCount());
private readonly counter$ = this.source$.pipe(Rx.multicast(new Rx.Subject()), Rx.refCount());
private readonly flushCache$ = new Rx.Subject<void>();
private readonly stopCaching$ = new Rx.Subject<void>();
private repository?: ISavedObjectsRepository;
private readonly logger: Logger;
constructor({ logger, retryCount, bufferDurationMs }: UsageCountersServiceDeps) {
@ -69,14 +74,14 @@ export class UsageCountersService {
this.bufferDurationMs = bufferDurationMs;
}
public setup = (core: UsageCountersServiceSetupDeps): UsageCountersServiceSetup => {
public setup = ({ savedObjects }: UsageCountersServiceSetupDeps): UsageCountersServiceSetup => {
const cache$ = new Rx.ReplaySubject<UsageCounters.v1.CounterMetric>();
const storingCache$ = new Rx.BehaviorSubject<boolean>(false);
// flush cache data from cache -> source
this.flushCache$
.pipe(
rxOp.exhaustMap(() => cache$),
rxOp.takeUntil(this.stop$)
Rx.exhaustMap(() => cache$),
Rx.takeUntil(this.stop$)
)
.subscribe((data) => {
storingCache$.next(true);
@ -86,16 +91,17 @@ export class UsageCountersService {
// store data into cache when not paused
storingCache$
.pipe(
rxOp.distinctUntilChanged(),
rxOp.switchMap((isStoring) => (isStoring ? Rx.EMPTY : this.source$)),
rxOp.takeUntil(Rx.merge(this.stopCaching$, this.stop$))
Rx.distinctUntilChanged(),
Rx.switchMap((isStoring) => (isStoring ? Rx.EMPTY : this.source$)),
Rx.takeUntil(Rx.merge(this.stopCaching$, this.stop$))
)
.subscribe((data) => {
cache$.next(data);
storingCache$.next(false);
});
registerUsageCountersSavedObjectTypes(core.savedObjects);
// register the usage-counter and usage-counters (deprecated) types
registerUsageCountersSavedObjectTypes(savedObjects);
return {
createUsageCounter: this.createUsageCounter,
@ -103,22 +109,22 @@ export class UsageCountersService {
};
};
public start = ({ savedObjects }: UsageCountersServiceStartDeps): void => {
public start = ({ savedObjects }: UsageCountersServiceStartDeps): UsageCountersServiceStart => {
this.stopCaching$.next();
const internalRepository = savedObjects.createInternalRepository();
this.repository = savedObjects.createInternalRepository();
this.counter$
.pipe(
/* buffer source events every ${bufferDurationMs} */
rxOp.bufferTime(this.bufferDurationMs),
Rx.bufferTime(this.bufferDurationMs),
/**
* bufferTime will trigger every ${bufferDurationMs}
* regardless if source emitted anything or not.
* using filter will stop cut the pipe short
*/
rxOp.filter((counters) => Array.isArray(counters) && counters.length > 0),
rxOp.map((counters) => Object.values(this.mergeCounters(counters))),
rxOp.takeUntil(this.stop$),
rxOp.concatMap((counters) => this.storeDate$(counters, internalRepository))
Rx.filter((counters) => Array.isArray(counters) && counters.length > 0),
Rx.map((counters) => Object.values(this.mergeCounters(counters))),
Rx.takeUntil(this.stop$),
Rx.concatMap((counters) => this.storeDate$(counters, this.repository!))
)
.subscribe((results) => {
this.logger.debug<UsageCountersLogMeta>('Store counters into savedObjects', {
@ -129,10 +135,27 @@ export class UsageCountersService {
});
this.flushCache$.next();
// we start a regular, timer-based cleanup
registerUsageCountersRollups({
logger: this.logger,
getRegisteredUsageCounters: () => Array.from(this.counterSets.values()),
internalRepository: this.repository,
pluginStop$: this.stop$,
});
return {
search: this.search,
};
};
public stop = () => {
public stop = (): UsageCountersServiceStart => {
this.stop$.next();
this.stop$.complete();
return {
search: this.search,
};
};
private storeDate$(
@ -142,8 +165,8 @@ export class UsageCountersService {
return Rx.forkJoin(
counters.map((metric) =>
Rx.defer(() => storeCounter({ metric, soRepository })).pipe(
rxOp.retry(this.retryCount),
rxOp.catchError((error) => {
Rx.retry(this.retryCount),
Rx.catchError((error) => {
this.logger.warn(error);
return Rx.of(error);
})
@ -152,18 +175,25 @@ export class UsageCountersService {
);
}
private createUsageCounter = (domainId: string): IUsageCounter => {
private createUsageCounter = (
domainId: string,
params: CreateUsageCounterParams = {}
): IUsageCounter => {
if (this.counterSets.get(domainId)) {
throw new Error(`Usage counter set "${domainId}" already exists.`);
}
const counterSet = new UsageCounter({ domainId, counter$: this.source$ });
const counterSet = new UsageCounter({
domainId,
counter$: this.source$,
retentionPeriodDays: params.retentionPeriodDays,
});
this.counterSets.set(domainId, counterSet);
return counterSet;
};
private getUsageCounterByDomainId = (type: string): IUsageCounter | undefined => {
return this.counterSets.get(type);
private getUsageCounterByDomainId = (domainId: string): IUsageCounter | undefined => {
return this.counterSets.get(domainId);
};
private mergeCounters = (
@ -193,4 +223,14 @@ export class UsageCountersService {
return acc;
}, {} as Record<string, UsageCounters.v1.CounterMetric>);
};
private search = async (
params: UsageCountersSearchParams
): Promise<UsageCountersSearchResult> => {
if (!this.repository) {
throw new Error('Cannot search before this service is started. Please call start() first.');
}
return await searchUsageCounters(this.repository, params);
};
}

View file

@ -23,6 +23,10 @@
"@kbn/analytics-collection-utils",
"@kbn/logging",
"@kbn/core-saved-objects-server",
"@kbn/core-test-helpers-kbn-server",
"@kbn/es-query",
"@kbn/core-saved-objects-utils-server",
"@kbn/core-saved-objects-api-server",
],
"exclude": [
"target/**/*",

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { UsageCollectionSetup } from '@kbn/usage-collection-plugin/server';
import type { UsageCollectionSetup } from '@kbn/usage-collection-plugin/server';
export interface CloudUsageCollectorConfig {
isCloudEnabled: boolean;

View file

@ -80,6 +80,7 @@ describe(`GET ${INTERNAL_ROUTES.MIGRATE.GET_ILM_POLICY_STATUS}`, () => {
it('increments the download api counter', async () => {
const core = await createReportingCore({});
const usageCounter = {
domainId: 'abc123',
incrementCounter: jest.fn(),
};
core.getUsageCounter = jest.fn().mockReturnValue(usageCounter);

View file

@ -69,6 +69,7 @@ describe(`POST ${INTERNAL_ROUTES.DIAGNOSE.BROWSER}`, () => {
);
usageCounter = {
domainId: 'abc123',
incrementCounter: jest.fn(),
};
core.getUsageCounter = jest.fn().mockReturnValue(usageCounter);

View file

@ -89,6 +89,7 @@ describe(`POST ${INTERNAL_ROUTES.GENERATE_PREFIX}`, () => {
reportingCore = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
usageCounter = {
domainId: 'abc123',
incrementCounter: jest.fn(),
};
jest.spyOn(reportingCore, 'getUsageCounter').mockReturnValue(usageCounter);

View file

@ -110,6 +110,7 @@ describe(`Reporting Job Management Routes: Internal`, () => {
reportingCore = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
usageCounter = {
domainId: 'abc123',
incrementCounter: jest.fn(),
};
jest.spyOn(reportingCore, 'getUsageCounter').mockReturnValue(usageCounter);

View file

@ -88,6 +88,7 @@ describe(`POST ${PUBLIC_ROUTES.GENERATE_PREFIX}`, () => {
reportingCore = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
usageCounter = {
domainId: 'abc123',
incrementCounter: jest.fn(),
};
jest.spyOn(reportingCore, 'getUsageCounter').mockReturnValue(usageCounter);

View file

@ -107,6 +107,7 @@ describe(`Reporting Job Management Routes: Public`, () => {
reportingCore = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
usageCounter = {
domainId: 'abc123',
incrementCounter: jest.fn(),
};
jest.spyOn(reportingCore, 'getUsageCounter').mockReturnValue(usageCounter);