[Usage Collection] Add execution context to the fetch methods (#125873)

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Alejandro Fernández Haro 2022-02-22 11:58:43 +01:00 committed by GitHub
parent 2cabe1a68f
commit 6bf91d77ad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 187 additions and 103 deletions

View file

@ -6,13 +6,9 @@
* Side Public License, v 1.
*/
import { CollectorSet } from '../../plugins/usage_collection/server/collector';
import { loggerMock } from '../../core/server/logging/logger.mock';
import { createUsageCollectionSetupMock } from '../../plugins/usage_collection/server/mocks';
const { makeUsageCollector } = new CollectorSet({
logger: loggerMock.create(),
maximumWaitTimeForAllCollectorsInS: 0,
});
const { makeUsageCollector } = createUsageCollectionSetupMock();
enum TELEMETRY_LAYER_TYPE {
ES_DOCS = 'es_docs',

View file

@ -6,16 +6,10 @@
* Side Public License, v 1.
*/
import {
CollectorSet,
UsageCollectorOptions,
} from '../../plugins/usage_collection/server/collector';
import { loggerMock } from '../../core/server/logging/logger.mock';
import type { UsageCollectorOptions } from 'src/plugins/usage_collection/server';
import { createUsageCollectionSetupMock } from '../../plugins/usage_collection/server/mocks';
const collectorSet = new CollectorSet({
logger: loggerMock.create(),
maximumWaitTimeForAllCollectorsInS: 0,
});
const collectorSet = createUsageCollectionSetupMock();
interface Usage {
locale: string;

View file

@ -6,14 +6,10 @@
* Side Public License, v 1.
*/
import { CollectorSet } from '../../../plugins/usage_collection/server/collector';
import { loggerMock } from '../../../core/server/logging/logger.mock';
import type { Usage } from './types';
import { createUsageCollectionSetupMock } from '../../../plugins/usage_collection/server/mocks';
const { makeUsageCollector } = new CollectorSet({
logger: loggerMock.create(),
maximumWaitTimeForAllCollectorsInS: 0,
});
const { makeUsageCollector } = createUsageCollectionSetupMock();
export const myCollector = makeUsageCollector<Usage, false>({
type: 'importing_from_export_collector',

View file

@ -6,14 +6,10 @@
* Side Public License, v 1.
*/
import { CollectorSet } from '../../plugins/usage_collection/server/collector';
import { loggerMock } from '../../core/server/logging/logger.mock';
import { createUsageCollectionSetupMock } from '../../plugins/usage_collection/server/mocks';
import { externallyDefinedSchema } from './constants';
const { makeUsageCollector } = new CollectorSet({
logger: loggerMock.create(),
maximumWaitTimeForAllCollectorsInS: 0,
});
const { makeUsageCollector } = createUsageCollectionSetupMock();
interface Usage {
locale?: string;

View file

@ -6,14 +6,10 @@
* Side Public License, v 1.
*/
import { CollectorSet } from '../../plugins/usage_collection/server/collector';
import { loggerMock } from '../../core/server/logging/logger.mock';
import { Usage } from './constants';
import { createUsageCollectionSetupMock } from '../../plugins/usage_collection/server/mocks';
const { makeUsageCollector } = new CollectorSet({
logger: loggerMock.create(),
maximumWaitTimeForAllCollectorsInS: 0,
});
const { makeUsageCollector } = createUsageCollectionSetupMock();
export const myCollector = makeUsageCollector<Usage>({
type: 'imported_usage_interface_collector',

View file

@ -6,13 +6,9 @@
* Side Public License, v 1.
*/
import { CollectorSet } from '../../plugins/usage_collection/server/collector';
import { loggerMock } from '../../core/server/logging/logger.mock';
import { createUsageCollectionSetupMock } from '../../plugins/usage_collection/server/mocks';
const { makeUsageCollector } = new CollectorSet({
logger: loggerMock.create(),
maximumWaitTimeForAllCollectorsInS: 0,
});
const { makeUsageCollector } = createUsageCollectionSetupMock();
interface Usage {
[key: string]: {

View file

@ -6,13 +6,10 @@
* Side Public License, v 1.
*/
import { CollectorSet, Collector } from '../../plugins/usage_collection/server/collector';
import { loggerMock } from '../../core/server/logging/logger.mock';
import type { Collector } from 'src/plugins/usage_collection/server';
import { createUsageCollectionSetupMock } from '../../plugins/usage_collection/server/mocks';
const collectorSet = new CollectorSet({
logger: loggerMock.create(),
maximumWaitTimeForAllCollectorsInS: 0,
});
const collectorSet = createUsageCollectionSetupMock();
interface Usage {
locale?: string;

View file

@ -6,13 +6,10 @@
* Side Public License, v 1.
*/
import { CollectorSet, MakeSchemaFrom } from '../../plugins/usage_collection/server/collector';
import { loggerMock } from '../../core/server/logging/logger.mock';
import type { MakeSchemaFrom } from 'src/plugins/usage_collection/server';
import { createUsageCollectionSetupMock } from '../../plugins/usage_collection/server/mocks';
const { makeUsageCollector } = new CollectorSet({
logger: loggerMock.create(),
maximumWaitTimeForAllCollectorsInS: 0,
});
const { makeUsageCollector } = createUsageCollectionSetupMock();
interface MyObject {
total: number;

View file

@ -6,13 +6,9 @@
* Side Public License, v 1.
*/
import { CollectorSet } from '../../plugins/usage_collection/server/collector';
import { loggerMock } from '../../core/server/logging/logger.mock';
import { createUsageCollectionSetupMock } from '../../plugins/usage_collection/server/mocks';
const { makeStatsCollector } = new CollectorSet({
logger: loggerMock.create(),
maximumWaitTimeForAllCollectorsInS: 0,
});
const { makeStatsCollector } = createUsageCollectionSetupMock();
interface Usage {
some_field: string;

View file

@ -6,13 +6,9 @@
* Side Public License, v 1.
*/
import { CollectorSet } from '../../plugins/usage_collection/server/collector';
import { loggerMock } from '../../core/server/logging/logger.mock';
import { createUsageCollectionSetupMock } from '../../plugins/usage_collection/server/mocks';
const { makeUsageCollector } = new CollectorSet({
logger: loggerMock.create(),
maximumWaitTimeForAllCollectorsInS: 0,
});
const { makeUsageCollector } = createUsageCollectionSetupMock();
interface Usage {
locale: string;

View file

@ -6,13 +6,9 @@
* Side Public License, v 1.
*/
import { CollectorSet } from '../../plugins/usage_collection/server/collector';
import { loggerMock } from '../../core/server/logging/logger.mock';
import { createUsageCollectionSetupMock } from '../../plugins/usage_collection/server/mocks';
const { makeUsageCollector } = new CollectorSet({
logger: loggerMock.create(),
maximumWaitTimeForAllCollectorsInS: 0,
});
const { makeUsageCollector } = createUsageCollectionSetupMock();
interface MyObject {
total: number;

View file

@ -6,13 +6,9 @@
* Side Public License, v 1.
*/
import { CollectorSet } from '../../plugins/usage_collection/server/collector';
import { loggerMock } from '../../core/server/logging/logger.mock';
import { createUsageCollectionSetupMock } from '../../plugins/usage_collection/server/mocks';
const { makeUsageCollector } = new CollectorSet({
logger: loggerMock.create(),
maximumWaitTimeForAllCollectorsInS: 0,
});
const { makeUsageCollector } = createUsageCollectionSetupMock();
interface MyObject {
total: number;

View file

@ -8,7 +8,7 @@
import { noop } from 'lodash';
import { Collector } from './collector';
import { CollectorSet } from './collector_set';
import { CollectorSet, CollectorSetConfig } from './collector_set';
import { UsageCollector } from './usage_collector';
import {
@ -16,29 +16,33 @@ import {
loggingSystemMock,
savedObjectsClientMock,
httpServerMock,
executionContextServiceMock,
} from '../../../../core/server/mocks';
import type { ExecutionContextSetup, Logger } from 'src/core/server';
describe('CollectorSet', () => {
const logger = loggingSystemMock.createLogger();
let logger: jest.Mocked<Logger>;
let executionContext: jest.Mocked<ExecutionContextSetup>;
const loggerSpies = {
debug: jest.spyOn(logger, 'debug'),
warn: jest.spyOn(logger, 'warn'),
};
let collectorSetConfig: CollectorSetConfig;
beforeEach(() => {
logger = loggingSystemMock.createLogger();
executionContext = executionContextServiceMock.createSetupContract();
collectorSetConfig = { logger, executionContext };
});
describe('registers a collector set and runs lifecycle events', () => {
let fetch: Function;
beforeEach(() => {
fetch = noop;
loggerSpies.debug.mockRestore();
loggerSpies.warn.mockRestore();
});
const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();
const req = void 0; // No need to instantiate any KibanaRequest in these tests
it('should throw an error if non-Collector type of object is registered', () => {
const collectors = new CollectorSet({ logger });
const collectors = new CollectorSet(collectorSetConfig);
const registerPojo = () => {
collectors.registerCollector({
type: 'type_collector_test',
@ -53,7 +57,7 @@ describe('CollectorSet', () => {
});
it('should throw when 2 collectors with the same type are registered', () => {
const collectorSet = new CollectorSet({ logger });
const collectorSet = new CollectorSet(collectorSetConfig);
collectorSet.registerCollector(
new Collector(logger, { type: 'test_duplicated', fetch: () => 1, isReady: () => true })
);
@ -73,7 +77,7 @@ describe('CollectorSet', () => {
it('should log debug status of fetching from the collector', async () => {
// @ts-expect-error we are just mocking the output of any call
mockEsClient.ping.mockResolvedValue({ passTest: 1000 });
const collectors = new CollectorSet({ logger });
const collectors = new CollectorSet(collectorSetConfig);
collectors.registerCollector(
new Collector(logger, {
type: 'MY_TEST_COLLECTOR',
@ -85,11 +89,9 @@ describe('CollectorSet', () => {
);
const result = await collectors.bulkFetch(mockEsClient, mockSoClient, req);
expect(loggerSpies.debug).toHaveBeenCalledTimes(2);
expect(loggerSpies.debug).toHaveBeenCalledWith('Getting ready collectors');
expect(loggerSpies.debug).toHaveBeenCalledWith(
'Fetching data from MY_TEST_COLLECTOR collector'
);
expect(logger.debug).toHaveBeenCalledTimes(2);
expect(logger.debug).toHaveBeenCalledWith('Getting ready collectors');
expect(logger.debug).toHaveBeenCalledWith('Fetching data from MY_TEST_COLLECTOR collector');
expect(result).toStrictEqual([
{
type: 'MY_TEST_COLLECTOR',
@ -108,7 +110,7 @@ describe('CollectorSet', () => {
});
it('should gracefully handle a collector fetch method throwing an error', async () => {
const collectors = new CollectorSet({ logger });
const collectors = new CollectorSet(collectorSetConfig);
collectors.registerCollector(
new Collector(logger, {
type: 'MY_TEST_COLLECTOR',
@ -138,7 +140,7 @@ describe('CollectorSet', () => {
});
it('should not break if isReady is not a function', async () => {
const collectors = new CollectorSet({ logger });
const collectors = new CollectorSet(collectorSetConfig);
collectors.registerCollector(
new Collector(logger, {
type: 'MY_TEST_COLLECTOR',
@ -167,7 +169,7 @@ describe('CollectorSet', () => {
});
it('should not break if isReady is not provided', async () => {
const collectors = new CollectorSet({ logger });
const collectors = new CollectorSet(collectorSetConfig);
collectors.registerCollector(
// @ts-expect-error we are intentionally sending it wrong.
new Collector(logger, {
@ -199,7 +201,7 @@ describe('CollectorSet', () => {
let collectorSet: CollectorSet;
beforeEach(() => {
collectorSet = new CollectorSet({ logger });
collectorSet = new CollectorSet(collectorSetConfig);
});
it('should snake_case and convert field names to api standards', () => {
@ -261,7 +263,12 @@ describe('CollectorSet', () => {
});
describe('makeStatsCollector', () => {
const collectorSet = new CollectorSet({ logger });
let collectorSet: CollectorSet;
beforeEach(() => {
collectorSet = new CollectorSet(collectorSetConfig);
});
test('TS should hide kibanaRequest when not opted-in', () => {
collectorSet.makeStatsCollector({
type: 'MY_TEST_COLLECTOR',
@ -326,7 +333,12 @@ describe('CollectorSet', () => {
});
describe('makeUsageCollector', () => {
const collectorSet = new CollectorSet({ logger });
let collectorSet: CollectorSet;
beforeEach(() => {
collectorSet = new CollectorSet(collectorSetConfig);
});
describe('TS validations', () => {
describe('when types are inferred', () => {
test('TS should hide kibanaRequest when not opted-in', () => {
@ -529,10 +541,14 @@ describe('CollectorSet', () => {
});
describe('bulkFetch', () => {
const collectorSetConfig = { logger, maximumWaitTimeForAllCollectorsInS: 1 };
let collectorSet = new CollectorSet(collectorSetConfig);
afterEach(() => {
collectorSet = new CollectorSet(collectorSetConfig);
let collectorSet: CollectorSet;
beforeEach(() => {
const collectorSetConfigWithMaxTime: CollectorSetConfig = {
...collectorSetConfig,
maximumWaitTimeForAllCollectorsInS: 1,
};
collectorSet = new CollectorSet(collectorSetConfigWithMaxTime);
});
it('skips collectors that are not ready', async () => {
@ -698,6 +714,70 @@ describe('CollectorSet', () => {
expect(results).toHaveLength(2);
});
it('calls fetch with execution context', async () => {
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'ready_col',
isReady: () => true,
schema: { test: { type: 'long' } },
fetch: () => ({ test: 1000 }),
})
);
const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();
await collectorSet.bulkFetch(mockEsClient, mockSoClient, undefined);
expect(executionContext.withContext).toHaveBeenCalledTimes(1);
expect(executionContext.withContext).toHaveBeenCalledWith(
{
type: 'usage_collection',
name: 'collector.fetch',
id: 'ready_col',
description: `Fetch method in the Collector "ready_col"`,
},
expect.any(Function)
);
});
it('calls fetch with execution context for every collector', async () => {
['ready_col_1', 'ready_col_2'].forEach((type) =>
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type,
isReady: () => true,
schema: { test: { type: 'long' } },
fetch: () => ({ test: 1000 }),
})
)
);
const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();
await collectorSet.bulkFetch(mockEsClient, mockSoClient, undefined);
expect(executionContext.withContext).toHaveBeenCalledTimes(2);
expect(executionContext.withContext).toHaveBeenCalledWith(
{
type: 'usage_collection',
name: 'collector.fetch',
id: 'ready_col_1',
description: `Fetch method in the Collector "ready_col_1"`,
},
expect.any(Function)
);
expect(executionContext.withContext).toHaveBeenCalledWith(
{
type: 'usage_collection',
name: 'collector.fetch',
id: 'ready_col_2',
description: `Fetch method in the Collector "ready_col_2"`,
},
expect.any(Function)
);
});
it('adds extra context to collectors with extendFetchContext config', async () => {
const mockReadyFetch = jest.fn().mockResolvedValue({});
collectorSet.registerCollector(

View file

@ -12,6 +12,8 @@ import type {
ElasticsearchClient,
SavedObjectsClientContract,
KibanaRequest,
KibanaExecutionContext,
ExecutionContextSetup,
} from 'src/core/server';
import { Collector } from './collector';
import type { ICollector, CollectorOptions } from './types';
@ -26,8 +28,9 @@ interface CollectorWithStatus {
collector: AnyCollector;
}
interface CollectorSetConfig {
export interface CollectorSetConfig {
logger: Logger;
executionContext: ExecutionContextSetup;
maximumWaitTimeForAllCollectorsInS?: number;
collectors?: AnyCollector[];
}
@ -42,14 +45,17 @@ interface CollectorStats {
export class CollectorSet {
private readonly logger: Logger;
private readonly executionContext: ExecutionContextSetup;
private readonly maximumWaitTimeForAllCollectorsInS: number;
private readonly collectors: Map<string, AnyCollector>;
constructor({
logger,
executionContext,
maximumWaitTimeForAllCollectorsInS = DEFAULT_MAXIMUM_WAIT_TIME_FOR_ALL_COLLECTORS_IN_S,
collectors = [],
}: CollectorSetConfig) {
this.logger = logger;
this.executionContext = executionContext;
this.collectors = new Map(collectors.map((collector) => [collector.type, collector]));
this.maximumWaitTimeForAllCollectorsInS = maximumWaitTimeForAllCollectorsInS;
}
@ -208,7 +214,15 @@ export class CollectorSet {
soClient,
...(collector.extendFetchContext.kibanaRequest && { kibanaRequest }),
};
const result = await collector.fetch(context);
const executionContext: KibanaExecutionContext = {
type: 'usage_collection',
name: 'collector.fetch',
id: collector.type,
description: `Fetch method in the Collector "${collector.type}"`,
};
const result = await this.executionContext.withContext(executionContext, () =>
collector.fetch(context)
);
collectorStats.succeeded.names.push(collector.type);
return { type: collector.type, result };
} catch (err) {
@ -297,6 +311,7 @@ export class CollectorSet {
private makeCollectorSetFromArray = (collectors: AnyCollector[]) => {
return new CollectorSet({
logger: this.logger,
executionContext: this.executionContext,
maximumWaitTimeForAllCollectorsInS: this.maximumWaitTimeForAllCollectorsInS,
collectors,
});

View file

@ -8,6 +8,7 @@
import {
elasticsearchServiceMock,
executionContextServiceMock,
httpServerMock,
loggingSystemMock,
savedObjectsClientMock,
@ -23,6 +24,7 @@ export { Collector };
export const createUsageCollectionSetupMock = () => {
const collectorSet = new CollectorSet({
logger: loggingSystemMock.createLogger(),
executionContext: executionContextServiceMock.createSetupContract(),
maximumWaitTimeForAllCollectorsInS: 1,
});
const { createUsageCounter, getUsageCounterByType } =

View file

@ -112,6 +112,7 @@ export class UsageCollectionPlugin implements Plugin<UsageCollectionSetup> {
const collectorSet = new CollectorSet({
logger: this.logger.get('usage-collection', 'collector-set'),
executionContext: core.executionContext,
maximumWaitTimeForAllCollectorsInS: config.maximumWaitTimeForAllCollectorsInS,
});

View file

@ -51,6 +51,7 @@ describe('/api/stats', () => {
router,
collectorSet: new CollectorSet({
logger: loggingSystemMock.create().asLoggerFactory().get(),
executionContext: executionContextServiceMock.createSetupContract(),
}),
config: {
allowAnonymous: true,

View file

@ -108,5 +108,38 @@ export default function ({ getService }: FtrProviderContext) {
retry,
});
});
it('propagates context for Telemetry collection', async () => {
await supertest
.post('/api/telemetry/v2/clusters/_stats')
.set('kbn-xsrf', 'true')
.send({ unencrypted: false })
.expect(200);
await assertLogContains({
description:
'usage_collection execution context propagates to Elasticsearch via "x-opaque-id" header',
predicate: (record) =>
Boolean(
// exclude part with collector types
record.http?.request?.id?.includes(
`kibana:usage_collection:collector.fetch:application_usage`
)
),
retry,
});
await assertLogContains({
description: 'execution context propagates to Kibana logs',
predicate: (record) =>
isExecutionContextLog(record?.message, {
type: 'usage_collection',
name: 'collector.fetch',
id: 'application_usage',
description: 'Fetch method in the Collector "application_usage"',
}),
retry,
});
});
});
}