[Telemetry] Report Kibana distro in local collectors + Usage Collectors in TS (#55859)

* [Telemetry] Report Kibana distro in local collectors + Usage Collectors in TS

* Ensure isReady is a function

* Move CollectorSet tests to TS + Jest

* Fix test

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Alejandro Fernández Haro 2020-02-05 14:08:30 +00:00 committed by GitHub
parent 3cb85d4070
commit 2bb46732e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 239 additions and 153 deletions

View file

@ -136,7 +136,7 @@ describe('telemetry_usage_collector', () => {
const collectorOptions = createTelemetryUsageCollector(usageCollector, server);
expect(collectorOptions.type).toBe('static_telemetry');
expect(await collectorOptions.fetch()).toEqual(expectedObject);
expect(await collectorOptions.fetch({} as any)).toEqual(expectedObject); // Sending any as the callCluster client because it's not needed in this collector but TS requires it when calling it.
});
});
});

View file

@ -17,9 +17,27 @@
* under the License.
*/
import { get, omit } from 'lodash';
import { omit } from 'lodash';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { CallCluster } from 'src/legacy/core_plugins/elasticsearch';
export function handleKibanaStats(server, response) {
export interface KibanaUsageStats {
kibana: {
index: string;
};
kibana_stats: {
os: {
platform: string;
platformRelease: string;
distro?: string;
distroRelease?: string;
};
};
[plugin: string]: any;
}
export function handleKibanaStats(server: any, response?: KibanaUsageStats) {
if (!response) {
server.log(
['warning', 'telemetry', 'local-stats'],
@ -30,8 +48,17 @@ export function handleKibanaStats(server, response) {
const { kibana, kibana_stats: kibanaStats, ...plugins } = response;
const platform = get(kibanaStats, 'os.platform', 'unknown');
const platformRelease = get(kibanaStats, 'os.platformRelease', 'unknown');
const os = {
platform: 'unknown',
platformRelease: 'unknown',
...kibanaStats.os,
};
const formattedOsStats = Object.entries(os).reduce((acc, [key, value]) => {
return {
...acc,
[`${key}s`]: [{ [key]: value, count: 1 }],
};
}, {});
const version = server
.config()
@ -44,16 +71,16 @@ export function handleKibanaStats(server, response) {
...omit(kibana, 'index'), // discard index
count: 1,
indices: 1,
os: {
platforms: [{ platform, count: 1 }],
platformReleases: [{ platformRelease, count: 1 }],
},
os: formattedOsStats,
versions: [{ version, count: 1 }],
plugins,
};
}
export async function getKibana(usageCollection, callWithInternalUser) {
export async function getKibana(
usageCollection: UsageCollectionSetup,
callWithInternalUser: CallCluster
): Promise<KibanaUsageStats> {
const usage = await usageCollection.bulkFetch(callWithInternalUser);
return usageCollection.toObject(usage);
}

View file

@ -22,18 +22,25 @@ import { get, omit } from 'lodash';
import { getClusterInfo } from './get_cluster_info';
import { getClusterStats } from './get_cluster_stats';
// @ts-ignore
import { getKibana, handleKibanaStats } from './get_kibana';
import { getKibana, handleKibanaStats, KibanaUsageStats } from './get_kibana';
import { StatsGetter } from '../collection_manager';
/**
* Handle the separate local calls by combining them into a single object response that looks like the
* "cluster_stats" document from X-Pack monitoring.
*
* @param {Object} server ??
* @param {Object} clusterInfo Cluster info (GET /)
* @param {Object} clusterStats Cluster stats (GET /_cluster/stats)
* @param {Object} kibana The Kibana Usage stats
* @return {Object} A combined object containing the different responses.
*/
export function handleLocalStats(server: any, clusterInfo: any, clusterStats: any, kibana: any) {
export function handleLocalStats(
server: any,
clusterInfo: any,
clusterStats: any,
kibana: KibanaUsageStats
) {
return {
timestamp: new Date().toISOString(),
cluster_uuid: get(clusterInfo, 'cluster_uuid'),

View file

@ -17,7 +17,30 @@
* under the License.
*/
export class Collector {
import { Logger } from 'kibana/server';
import { CallCluster } from 'src/legacy/core_plugins/elasticsearch';
export type CollectorFormatForBulkUpload<T, U> = (result: T) => { type: string; payload: U };
export interface CollectorOptions<T = unknown, U = T> {
type: string;
init?: Function;
fetch: (callCluster: CallCluster) => Promise<T> | T;
/*
* A hook for allowing the fetched data payload to be organized into a typed
* data model for internal bulk upload. See defaultFormatterForBulkUpload for
* a generic example.
*/
formatForBulkUpload?: CollectorFormatForBulkUpload<T, U>;
isReady: () => Promise<boolean> | boolean;
}
export class Collector<T = unknown, U = T> {
public readonly type: CollectorOptions<T, U>['type'];
public readonly init?: CollectorOptions<T, U>['init'];
public readonly fetch: CollectorOptions<T, U>['fetch'];
private readonly _formatForBulkUpload?: CollectorFormatForBulkUpload<T, U>;
public readonly isReady: CollectorOptions<T, U>['isReady'];
/*
* @param {Object} logger - logger object
* @param {String} options.type - property name as the key for the data
@ -27,8 +50,8 @@ export class Collector {
* @param {Function} options.rest - optional other properties
*/
constructor(
logger,
{ type, init, fetch, formatForBulkUpload = null, isReady = null, ...options } = {}
protected readonly log: Logger,
{ type, init, fetch, formatForBulkUpload, isReady, ...options }: CollectorOptions<T, U>
) {
if (type === undefined) {
throw new Error('Collector must be instantiated with a options.type string property');
@ -42,41 +65,27 @@ export class Collector {
throw new Error('Collector must be instantiated with a options.fetch function property');
}
this.log = logger;
Object.assign(this, options); // spread in other properties and mutate "this"
this.type = type;
this.init = init;
this.fetch = fetch;
this.isReady = typeof isReady === 'function' ? isReady : () => true;
this._formatForBulkUpload = formatForBulkUpload;
}
const defaultFormatterForBulkUpload = result => ({ type, payload: result });
this._formatForBulkUpload = formatForBulkUpload || defaultFormatterForBulkUpload;
if (typeof isReady === 'function') {
this.isReady = isReady;
public formatForBulkUpload(result: T) {
if (this._formatForBulkUpload) {
return this._formatForBulkUpload(result);
} else {
return this.defaultFormatterForBulkUpload(result);
}
}
/*
* @param {Function} callCluster - callCluster function
*/
fetchInternal(callCluster) {
if (typeof callCluster !== 'function') {
throw new Error('A `callCluster` function must be passed to the fetch methods of collectors');
}
return this.fetch(callCluster);
}
/*
* A hook for allowing the fetched data payload to be organized into a typed
* data model for internal bulk upload. See defaultFormatterForBulkUpload for
* a generic example.
*/
formatForBulkUpload(result) {
return this._formatForBulkUpload(result);
}
isReady() {
throw `isReady() must be implemented in ${this.type} collector`;
protected defaultFormatterForBulkUpload(result: T) {
return {
type: this.type,
payload: result,
};
}
}

View file

@ -18,58 +18,62 @@
*/
import { noop } from 'lodash';
import sinon from 'sinon';
import expect from '@kbn/expect';
import { Collector } from '../collector';
import { CollectorSet } from '../collector_set';
import { UsageCollector } from '../usage_collector';
import { Collector } from './collector';
import { CollectorSet } from './collector_set';
import { UsageCollector } from './usage_collector';
import { loggingServiceMock } from '../../../../core/server/mocks';
const mockLogger = () => ({
debug: sinon.spy(),
warn: sinon.spy(),
});
const logger = loggingServiceMock.createLogger();
const loggerSpies = {
debug: jest.spyOn(logger, 'debug'),
warn: jest.spyOn(logger, 'warn'),
};
describe('CollectorSet', () => {
describe('registers a collector set and runs lifecycle events', () => {
let init;
let fetch;
let init: Function;
let fetch: Function;
beforeEach(() => {
init = noop;
fetch = noop;
loggerSpies.debug.mockRestore();
loggerSpies.warn.mockRestore();
});
const mockCallCluster = () => Promise.resolve({ passTest: 1000 });
it('should throw an error if non-Collector type of object is registered', () => {
const logger = mockLogger();
const collectors = new CollectorSet({ logger });
const registerPojo = () => {
collectors.registerCollector({
type: 'type_collector_test',
init,
fetch,
});
} as any); // We are intentionally sending it wrong.
};
expect(registerPojo).to.throwException(({ message }) => {
expect(message).to.be('CollectorSet can only have Collector instances registered');
});
expect(registerPojo).toThrowError(
'CollectorSet can only have Collector instances registered'
);
});
it('should log debug status of fetching from the collector', async () => {
const mockCallCluster = () => Promise.resolve({ passTest: 1000 });
const logger = mockLogger();
const collectors = new CollectorSet({ logger });
collectors.registerCollector(
new Collector(logger, {
type: 'MY_TEST_COLLECTOR',
fetch: caller => caller(),
fetch: (caller: any) => caller(),
isReady: () => true,
})
);
const result = await collectors.bulkFetch(mockCallCluster);
const calls = logger.debug.getCalls();
expect(calls.length).to.be(1);
expect(calls[0].args).to.eql(['Fetching data from MY_TEST_COLLECTOR collector']);
expect(result).to.eql([
const result = await collectors.bulkFetch(mockCallCluster as any);
expect(loggerSpies.debug).toHaveBeenCalledTimes(1);
expect(loggerSpies.debug).toHaveBeenCalledWith(
'Fetching data from MY_TEST_COLLECTOR collector'
);
expect(result).toStrictEqual([
{
type: 'MY_TEST_COLLECTOR',
result: { passTest: 1000 },
@ -78,32 +82,90 @@ describe('CollectorSet', () => {
});
it('should gracefully handle a collector fetch method throwing an error', async () => {
const mockCallCluster = () => Promise.resolve({ passTest: 1000 });
const logger = mockLogger();
const collectors = new CollectorSet({ logger });
collectors.registerCollector(
new Collector(logger, {
type: 'MY_TEST_COLLECTOR',
fetch: () => new Promise((_resolve, reject) => reject()),
isReady: () => true,
})
);
let result;
try {
result = await collectors.bulkFetch(mockCallCluster);
result = await collectors.bulkFetch(mockCallCluster as any);
} catch (err) {
// Do nothing
}
// This must return an empty object instead of null/undefined
expect(result).to.eql([]);
expect(result).toStrictEqual([]);
});
it('should not break if isReady is not a function', async () => {
const collectors = new CollectorSet({ logger });
collectors.registerCollector(
new Collector(logger, {
type: 'MY_TEST_COLLECTOR',
fetch: () => ({ test: 1 }),
isReady: true as any,
})
);
const result = await collectors.bulkFetch(mockCallCluster as any);
expect(result).toStrictEqual([
{
type: 'MY_TEST_COLLECTOR',
result: { test: 1 },
},
]);
});
it('should not break if isReady is not provided', async () => {
const collectors = new CollectorSet({ logger });
collectors.registerCollector(
new Collector(logger, {
type: 'MY_TEST_COLLECTOR',
fetch: () => ({ test: 1 }),
} as any)
);
const result = await collectors.bulkFetch(mockCallCluster as any);
expect(result).toStrictEqual([
{
type: 'MY_TEST_COLLECTOR',
result: { test: 1 },
},
]);
});
it('should infer the types from the implementations of fetch and formatForBulkUpload', async () => {
const collectors = new CollectorSet({ logger });
collectors.registerCollector(
new Collector(logger, {
type: 'MY_TEST_COLLECTOR',
fetch: () => ({ test: 1 }),
formatForBulkUpload: result => ({
type: 'MY_TEST_COLLECTOR',
payload: { test: result.test * 2 },
}),
isReady: () => true,
})
);
const result = await collectors.bulkFetch(mockCallCluster as any);
expect(result).toStrictEqual([
{
type: 'MY_TEST_COLLECTOR',
result: { test: 1 }, // It matches the return of `fetch`. `formatForBulkUpload` is used later on
},
]);
});
});
describe('toApiFieldNames', () => {
let collectorSet;
let collectorSet: CollectorSet;
beforeEach(() => {
const logger = mockLogger();
collectorSet = new CollectorSet({ logger });
});
@ -126,7 +188,7 @@ describe('CollectorSet', () => {
};
const result = collectorSet.toApiFieldNames(apiData);
expect(result).to.eql({
expect(result).toStrictEqual({
os: {
load: { '15m': 2.3525390625, '1m': 2.22412109375, '5m': 2.4462890625 },
memory: { free_bytes: 458280960, total_bytes: 17179869184, used_bytes: 16721588224 },
@ -155,7 +217,7 @@ describe('CollectorSet', () => {
};
const result = collectorSet.toApiFieldNames(apiData);
expect(result).to.eql({
expect(result).toStrictEqual({
days_of_the_week: [
{ day_index: 1, day_name: 'monday' },
{ day_index: 2, day_name: 'tuesday' },
@ -166,21 +228,20 @@ describe('CollectorSet', () => {
});
describe('isUsageCollector', () => {
const collectorOptions = { type: 'MY_TEST_COLLECTOR', fetch: () => {} };
const collectorOptions = { type: 'MY_TEST_COLLECTOR', fetch: () => {}, isReady: () => true };
it('returns true only for UsageCollector instances', () => {
const logger = mockLogger();
const collectors = new CollectorSet({ logger });
const usageCollector = new UsageCollector(logger, collectorOptions);
const collector = new Collector(logger, collectorOptions);
const randomClass = new (class Random {})();
expect(collectors.isUsageCollector(usageCollector)).to.be(true);
expect(collectors.isUsageCollector(collector)).to.be(false);
expect(collectors.isUsageCollector(randomClass)).to.be(false);
expect(collectors.isUsageCollector({})).to.be(false);
expect(collectors.isUsageCollector(null)).to.be(false);
expect(collectors.isUsageCollector('')).to.be(false);
expect(collectors.isUsageCollector()).to.be(false);
expect(collectors.isUsageCollector(usageCollector)).toEqual(true);
expect(collectors.isUsageCollector(collector)).toEqual(false);
expect(collectors.isUsageCollector(randomClass)).toEqual(false);
expect(collectors.isUsageCollector({})).toEqual(false);
expect(collectors.isUsageCollector(null)).toEqual(false);
expect(collectors.isUsageCollector('')).toEqual(false);
expect(collectors.isUsageCollector(void 0)).toEqual(false);
});
});
});

View file

@ -20,39 +20,37 @@
import { snakeCase } from 'lodash';
import { Logger } from 'kibana/server';
import { CallCluster } from 'src/legacy/core_plugins/elasticsearch';
// @ts-ignore
import { Collector } from './collector';
// @ts-ignore
import { Collector, CollectorOptions } from './collector';
import { UsageCollector } from './usage_collector';
interface CollectorSetConfig {
logger: Logger;
maximumWaitTimeForAllCollectorsInS: number;
collectors?: Collector[];
maximumWaitTimeForAllCollectorsInS?: number;
collectors?: Array<Collector<any, any>>;
}
export class CollectorSet {
private _waitingForAllCollectorsTimestamp?: number;
private logger: Logger;
private readonly maximumWaitTimeForAllCollectorsInS: number;
private collectors: Collector[] = [];
private collectors: Array<Collector<any, any>> = [];
constructor({ logger, maximumWaitTimeForAllCollectorsInS, collectors = [] }: CollectorSetConfig) {
this.logger = logger;
this.collectors = collectors;
this.maximumWaitTimeForAllCollectorsInS = maximumWaitTimeForAllCollectorsInS || 60;
}
public makeStatsCollector = (options: any) => {
public makeStatsCollector = <T, U>(options: CollectorOptions<T, U>) => {
return new Collector(this.logger, options);
};
public makeUsageCollector = (options: any) => {
public makeUsageCollector = <T, U>(options: CollectorOptions<T, U>) => {
return new UsageCollector(this.logger, options);
};
/*
* @param collector {Collector} collector object
*/
public registerCollector = (collector: Collector) => {
public registerCollector = <T, U>(collector: Collector<T, U>) => {
// check instanceof
if (!(collector instanceof Collector)) {
throw new Error('CollectorSet can only have Collector instances registered');
@ -115,7 +113,7 @@ export class CollectorSet {
public bulkFetch = async (
callCluster: CallCluster,
collectors: Collector[] = this.collectors
collectors: Array<Collector<any, any>> = this.collectors
) => {
const responses = [];
for (const collector of collectors) {
@ -123,7 +121,7 @@ export class CollectorSet {
try {
responses.push({
type: collector.type,
result: await collector.fetchInternal(callCluster),
result: await collector.fetch(callCluster),
});
} catch (err) {
this.logger.warn(err);
@ -148,14 +146,13 @@ export class CollectorSet {
};
// convert an array of fetched stats results into key/object
public toObject = (statsData: any) => {
if (!statsData) return {};
return statsData.reduce((accumulatedStats: any, { type, result }: any) => {
public toObject = <Result, T>(statsData: Array<{ type: string; result: T }> = []) => {
return statsData.reduce<Result>((accumulatedStats, { type, result }) => {
return {
...accumulatedStats,
[type]: result,
};
}, {});
}, {} as Result);
};
// rename fields to use api conventions

View file

@ -18,7 +18,5 @@
*/
export { CollectorSet } from './collector_set';
// @ts-ignore
export { Collector } from './collector';
// @ts-ignore
export { UsageCollector } from './usage_collector';

View file

@ -1,51 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { KIBANA_STATS_TYPE } from '../../common/constants';
import { Collector } from './collector';
export class UsageCollector extends Collector {
/*
* @param {Object} logger - logger object
* @param {String} options.type - property name as the key for the data
* @param {Function} options.init (optional) - initialization function
* @param {Function} options.fetch - function to query data
* @param {Function} options.formatForBulkUpload - optional
* @param {Function} options.rest - optional other properties
*/
constructor(logger, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) {
super(logger, { type, init, fetch, formatForBulkUpload, ...options });
/*
* Currently, for internal bulk uploading, usage stats are part of
* `kibana_stats` type, under the `usage` namespace in the document.
*/
const defaultUsageFormatterForBulkUpload = result => {
return {
type: KIBANA_STATS_TYPE,
payload: {
usage: {
[type]: result,
},
},
};
};
this._formatForBulkUpload = formatForBulkUpload || defaultUsageFormatterForBulkUpload;
}
}

View file

@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { KIBANA_STATS_TYPE } from '../../common/constants';
import { Collector } from './collector';
export class UsageCollector<T = unknown, U = { usage: { [key: string]: T } }> extends Collector<
T,
U
> {
protected defaultUsageFormatterForBulkUpload(result: T) {
return {
type: KIBANA_STATS_TYPE,
payload: {
usage: {
[this.type]: result,
},
},
};
}
}

View file

@ -21,12 +21,13 @@ describe('createCloudUsageCollector', () => {
});
describe('Fetched Usage data', () => {
it('return isCloudEnabled boolean', () => {
it('return isCloudEnabled boolean', async () => {
const mockConfigs = getMockConfigs(true);
const usageCollection = mockUsageCollection() as any;
const collector = createCloudUsageCollector(usageCollection, mockConfigs);
const callCluster = {} as any; // Sending any as the callCluster client because it's not needed in this collector but TS requires it when calling it.
expect(collector.fetch().isCloudEnabled).toBe(true);
expect((await collector.fetch(callCluster)).isCloudEnabled).toBe(true); // Adding the await because the fetch can be a Promise or a synchronous method and TS complains in the test if not awaited
});
});
});