[Usage collection] Update bulkFetch logic (#121437)

This commit is contained in:
Ahmad Bamieh 2021-12-20 18:47:21 +02:00 committed by GitHub
parent 64f0e391f0
commit 46a0999f34
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 231 additions and 134 deletions

View file

@ -35,38 +35,6 @@ describe('FetcherTask', () => {
);
});
it('stops when all collectors are not ready', async () => {
const initializerContext = coreMock.createPluginInitializerContext({});
const fetcherTask = new FetcherTask(initializerContext);
const getCurrentConfigs = jest.fn().mockResolvedValue({});
const areAllCollectorsReady = jest.fn().mockResolvedValue(false);
const shouldSendReport = jest.fn().mockReturnValue(true);
const fetchTelemetry = jest.fn();
const sendTelemetry = jest.fn();
const updateReportFailure = jest.fn();
Object.assign(fetcherTask, {
getCurrentConfigs,
areAllCollectorsReady,
shouldSendReport,
fetchTelemetry,
updateReportFailure,
sendTelemetry,
});
await fetcherTask['sendIfDue']();
expect(fetchTelemetry).toBeCalledTimes(0);
expect(sendTelemetry).toBeCalledTimes(0);
expect(areAllCollectorsReady).toBeCalledTimes(1);
expect(updateReportFailure).toBeCalledTimes(0);
expect(fetcherTask['logger'].warn).toBeCalledTimes(1);
expect(fetcherTask['logger'].warn).toHaveBeenCalledWith(
`Error fetching usage. (Error: Not all collectors are ready.)`
);
});
it('fetches usage and send telemetry', async () => {
const initializerContext = coreMock.createPluginInitializerContext({});
const fetcherTask = new FetcherTask(initializerContext);
@ -79,7 +47,6 @@ describe('FetcherTask', () => {
const getCurrentConfigs = jest.fn().mockResolvedValue({
telemetryUrl: mockTelemetryUrl,
});
const areAllCollectorsReady = jest.fn().mockResolvedValue(true);
const shouldSendReport = jest.fn().mockReturnValue(true);
const fetchTelemetry = jest.fn().mockResolvedValue(mockClusters);
@ -88,7 +55,6 @@ describe('FetcherTask', () => {
Object.assign(fetcherTask, {
getCurrentConfigs,
areAllCollectorsReady,
shouldSendReport,
fetchTelemetry,
updateReportFailure,
@ -97,7 +63,6 @@ describe('FetcherTask', () => {
await fetcherTask['sendIfDue']();
expect(areAllCollectorsReady).toBeCalledTimes(1);
expect(fetchTelemetry).toBeCalledTimes(1);
expect(sendTelemetry).toBeCalledTimes(1);
expect(sendTelemetry).toHaveBeenNthCalledWith(1, mockTelemetryUrl, mockClusters);

View file

@ -77,10 +77,6 @@ export class FetcherTask {
}
}
private async areAllCollectorsReady() {
return (await this.telemetryCollectionManager?.areAllCollectorsReady()) ?? false;
}
private async sendIfDue() {
if (this.isSending) {
return;
@ -102,10 +98,6 @@ export class FetcherTask {
this.isSending = true;
try {
const allCollectorsReady = await this.areAllCollectorsReady();
if (!allCollectorsReady) {
throw new Error('Not all collectors are ready.');
}
clusters = await this.fetchTelemetry();
} catch (err) {
this.logger.warn(`Error fetching usage. (${err})`);

View file

@ -28,9 +28,6 @@ describe('Telemetry Collection Manager', () => {
describe('everything works when no collection mechanisms are registered', () => {
const telemetryCollectionManager = new TelemetryCollectionManagerPlugin(initializerContext);
const setupApi = telemetryCollectionManager.setup(coreMock.createSetup(), { usageCollection });
test('All collectors are ready (there are none)', async () => {
await expect(setupApi.areAllCollectorsReady()).resolves.toBe(true);
});
test('getStats returns empty', async () => {
const config: StatsGetterConfig = { unencrypted: false };
await expect(setupApi.getStats(config)).resolves.toStrictEqual([]);

View file

@ -69,7 +69,6 @@ export class TelemetryCollectionManagerPlugin
setCollectionStrategy: this.setCollectionStrategy.bind(this),
getOptInStats: this.getOptInStats.bind(this),
getStats: this.getStats.bind(this),
areAllCollectorsReady: this.areAllCollectorsReady.bind(this),
};
}
@ -80,7 +79,6 @@ export class TelemetryCollectionManagerPlugin
return {
getOptInStats: this.getOptInStats.bind(this),
getStats: this.getStats.bind(this),
areAllCollectorsReady: this.areAllCollectorsReady.bind(this),
};
}
@ -221,10 +219,6 @@ export class TelemetryCollectionManagerPlugin
return [];
}
private async areAllCollectorsReady() {
return await this.usageCollection?.areAllCollectorsReady();
}
private getOptInStatsForCollection = async (
collection: CollectionStrategy,
optInStatus: boolean,

View file

@ -21,13 +21,11 @@ export interface TelemetryCollectionManagerPluginSetup {
) => void;
getOptInStats: TelemetryCollectionManagerPlugin['getOptInStats'];
getStats: TelemetryCollectionManagerPlugin['getStats'];
areAllCollectorsReady: TelemetryCollectionManagerPlugin['areAllCollectorsReady'];
}
export interface TelemetryCollectionManagerPluginStart {
getOptInStats: TelemetryCollectionManagerPlugin['getOptInStats'];
getStats: TelemetryCollectionManagerPlugin['getStats'];
areAllCollectorsReady: TelemetryCollectionManagerPlugin['areAllCollectorsReady'];
}
export interface TelemetryOptInStats {

View file

@ -7,5 +7,5 @@
*/
export const KIBANA_STATS_TYPE = 'kibana_stats';
export const DEFAULT_MAXIMUM_WAIT_TIME_FOR_ALL_COLLECTORS_IN_S = 60;
export const DEFAULT_MAXIMUM_WAIT_TIME_FOR_ALL_COLLECTORS_IN_S = 1;
export const MAIN_APP_DEFAULT_VIEW_ID = 'main';

View file

@ -10,20 +10,22 @@ import { noop } from 'lodash';
import { Collector } from './collector';
import { CollectorSet } from './collector_set';
import { UsageCollector } from './usage_collector';
import {
elasticsearchServiceMock,
loggingSystemMock,
savedObjectsClientMock,
httpServerMock,
} from '../../../../core/server/mocks';
const logger = loggingSystemMock.createLogger();
const loggerSpies = {
debug: jest.spyOn(logger, 'debug'),
warn: jest.spyOn(logger, 'warn'),
};
describe('CollectorSet', () => {
const logger = loggingSystemMock.createLogger();
const loggerSpies = {
debug: jest.spyOn(logger, 'debug'),
warn: jest.spyOn(logger, 'warn'),
};
describe('registers a collector set and runs lifecycle events', () => {
let fetch: Function;
beforeEach(() => {
@ -83,7 +85,8 @@ describe('CollectorSet', () => {
);
const result = await collectors.bulkFetch(mockEsClient, mockSoClient, req);
expect(loggerSpies.debug).toHaveBeenCalledTimes(1);
expect(loggerSpies.debug).toHaveBeenCalledTimes(2);
expect(loggerSpies.debug).toHaveBeenCalledWith('Getting ready collectors');
expect(loggerSpies.debug).toHaveBeenCalledWith(
'Fetching data from MY_TEST_COLLECTOR collector'
);
@ -487,4 +490,151 @@ describe('CollectorSet', () => {
).toStrictEqual({ test: 1 });
});
});
describe('bulkFetch', () => {
const collectorSetConfig = { logger, maximumWaitTimeForAllCollectorsInS: 1 };
let collectorSet = new CollectorSet(collectorSetConfig);
afterEach(() => {
collectorSet = new CollectorSet(collectorSetConfig);
});
it('skips collectors that are not ready', async () => {
const mockIsReady = jest.fn().mockReturnValue(true);
const mockIsNotReady = jest.fn().mockResolvedValue(false);
const mockNonReadyFetch = jest.fn().mockResolvedValue({});
const mockReadyFetch = jest.fn().mockResolvedValue({});
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'ready_col',
isReady: mockIsReady,
schema: {},
fetch: mockReadyFetch,
})
);
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'not_ready_col',
isReady: mockIsNotReady,
schema: {},
fetch: mockNonReadyFetch,
})
);
const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();
const results = await collectorSet.bulkFetch(mockEsClient, mockSoClient, undefined);
expect(mockIsReady).toBeCalledTimes(1);
expect(mockReadyFetch).toBeCalledTimes(1);
expect(mockIsNotReady).toBeCalledTimes(1);
expect(mockNonReadyFetch).toBeCalledTimes(0);
expect(results).toMatchInlineSnapshot(`
Array [
Object {
"result": Object {},
"type": "ready_col",
},
]
`);
});
it('skips collectors that have timed out', async () => {
const mockFastReady = jest.fn().mockImplementation(async () => {
return new Promise((res) => {
setTimeout(() => res(true), 0.5 * 1000);
});
});
const mockTimedOutReady = jest.fn().mockImplementation(async () => {
return new Promise((res) => {
setTimeout(() => res(true), 2 * 1000);
});
});
const mockNonReadyFetch = jest.fn().mockResolvedValue({});
const mockReadyFetch = jest.fn().mockResolvedValue({});
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'ready_col',
isReady: mockFastReady,
schema: {},
fetch: mockReadyFetch,
})
);
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'timeout_col',
isReady: mockTimedOutReady,
schema: {},
fetch: mockNonReadyFetch,
})
);
const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();
const results = await collectorSet.bulkFetch(mockEsClient, mockSoClient, undefined);
expect(mockFastReady).toBeCalledTimes(1);
expect(mockReadyFetch).toBeCalledTimes(1);
expect(mockTimedOutReady).toBeCalledTimes(1);
expect(mockNonReadyFetch).toBeCalledTimes(0);
expect(results).toMatchInlineSnapshot(`
Array [
Object {
"result": Object {},
"type": "ready_col",
},
]
`);
});
it('passes context to fetch', async () => {
const mockReadyFetch = jest.fn().mockResolvedValue({});
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'ready_col',
isReady: () => true,
schema: {},
fetch: mockReadyFetch,
})
);
const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();
const results = await collectorSet.bulkFetch(mockEsClient, mockSoClient, undefined);
expect(mockReadyFetch).toBeCalledTimes(1);
expect(mockReadyFetch).toBeCalledWith({
esClient: mockEsClient,
soClient: mockSoClient,
});
expect(results).toHaveLength(1);
});
it('adds extra context to collectors with extendFetchContext config', async () => {
const mockReadyFetch = jest.fn().mockResolvedValue({});
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'ready_col',
isReady: () => true,
schema: {},
fetch: mockReadyFetch,
extendFetchContext: { kibanaRequest: true },
})
);
const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();
const request = httpServerMock.createKibanaRequest();
const results = await collectorSet.bulkFetch(mockEsClient, mockSoClient, request);
expect(mockReadyFetch).toBeCalledTimes(1);
expect(mockReadyFetch).toBeCalledWith({
esClient: mockEsClient,
soClient: mockSoClient,
kibanaRequest: request,
});
expect(results).toHaveLength(1);
});
});
});

View file

@ -5,7 +5,7 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { withTimeout } from '@kbn/std';
import { snakeCase } from 'lodash';
import type {
Logger,
@ -16,26 +16,33 @@ import type {
import { Collector } from './collector';
import type { ICollector, CollectorOptions } from './types';
import { UsageCollector, UsageCollectorOptions } from './usage_collector';
import { DEFAULT_MAXIMUM_WAIT_TIME_FOR_ALL_COLLECTORS_IN_S } from '../../common/constants';
// Needed for the general array containing all the collectors. We don't really care about their types here
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type AnyCollector = ICollector<any, any>;
interface CollectorWithStatus {
isReadyWithTimeout: Awaited<ReturnType<typeof withTimeout>>;
collector: AnyCollector;
}
interface CollectorSetConfig {
logger: Logger;
maximumWaitTimeForAllCollectorsInS?: number;
collectors?: AnyCollector[];
}
export class CollectorSet {
private _waitingForAllCollectorsTimestamp?: number;
private readonly logger: Logger;
private readonly maximumWaitTimeForAllCollectorsInS: number;
private readonly collectors: Map<string, AnyCollector>;
constructor({ logger, maximumWaitTimeForAllCollectorsInS, collectors = [] }: CollectorSetConfig) {
constructor({
logger,
maximumWaitTimeForAllCollectorsInS = DEFAULT_MAXIMUM_WAIT_TIME_FOR_ALL_COLLECTORS_IN_S,
collectors = [],
}: CollectorSetConfig) {
this.logger = logger;
this.collectors = new Map(collectors.map((collector) => [collector.type, collector]));
this.maximumWaitTimeForAllCollectorsInS = maximumWaitTimeForAllCollectorsInS || 60;
this.maximumWaitTimeForAllCollectorsInS = maximumWaitTimeForAllCollectorsInS;
}
/**
@ -92,51 +99,70 @@ export class CollectorSet {
return [...this.collectors.values()].find((c) => c.type === type);
};
public areAllCollectorsReady = async (collectorSet: CollectorSet = this) => {
if (!(collectorSet instanceof CollectorSet)) {
private getReadyCollectors = async (
collectors: Map<string, AnyCollector> = this.collectors
): Promise<AnyCollector[]> => {
if (!(collectors instanceof Map)) {
throw new Error(
`areAllCollectorsReady method given bad collectorSet parameter: ` + typeof collectorSet
`getReadyCollectors method given bad Map of collectors: ` + typeof collectors
);
}
const collectors = [...collectorSet.collectors.values()];
const collectorsWithStatus = await Promise.all(
collectors.map(async (collector) => {
return {
isReady: await collector.isReady(),
collector,
};
const secondInMs = 1000;
const collectorsWithStatus: CollectorWithStatus[] = await Promise.all(
[...collectors.values()].map(async (collector) => {
const isReadyWithTimeout = await withTimeout<boolean>({
promise: (async (): Promise<boolean> => {
try {
return await collector.isReady();
} catch (err) {
this.logger.debug(`Collector ${collector.type} failed to get ready. ${err}`);
return false;
}
})(),
timeoutMs: this.maximumWaitTimeForAllCollectorsInS * secondInMs,
});
return { isReadyWithTimeout, collector };
})
);
const collectorsTypesNotReady = collectorsWithStatus
.filter((collectorWithStatus) => collectorWithStatus.isReady === false)
.map((collectorWithStatus) => collectorWithStatus.collector.type);
const timedOutCollectorsTypes = collectorsWithStatus
.filter((collectorWithStatus) => collectorWithStatus.isReadyWithTimeout.timedout)
.map(({ collector }) => collector.type);
const allReady = collectorsTypesNotReady.length === 0;
if (!allReady && this.maximumWaitTimeForAllCollectorsInS >= 0) {
const nowTimestamp = +new Date();
this._waitingForAllCollectorsTimestamp =
this._waitingForAllCollectorsTimestamp || nowTimestamp;
const timeWaitedInMS = nowTimestamp - this._waitingForAllCollectorsTimestamp;
const timeLeftInMS = this.maximumWaitTimeForAllCollectorsInS * 1000 - timeWaitedInMS;
if (timeLeftInMS <= 0) {
this.logger.debug(
`All collectors are not ready (waiting for ${collectorsTypesNotReady.join(',')}) ` +
`but we have waited the required ` +
`${this.maximumWaitTimeForAllCollectorsInS}s and will return data from all collectors that are ready.`
);
return true;
} else {
this.logger.debug(`All collectors are not ready. Waiting for ${timeLeftInMS}ms longer.`);
}
} else {
this._waitingForAllCollectorsTimestamp = undefined;
if (timedOutCollectorsTypes.length) {
this.logger.debug(
`Some collectors timedout getting ready (${timedOutCollectorsTypes.join(', ')}). ` +
`Waited for ${this.maximumWaitTimeForAllCollectorsInS}s and will return data from collectors that are ready.`
);
}
return allReady;
const nonTimedOutCollectors = collectorsWithStatus.filter(
(
collectorWithStatus
): collectorWithStatus is {
isReadyWithTimeout: { timedout: false; value: boolean };
collector: AnyCollector;
} => collectorWithStatus.isReadyWithTimeout.timedout === false
);
const collectorsTypesNotReady = nonTimedOutCollectors
.filter(({ isReadyWithTimeout }) => isReadyWithTimeout.value === false)
.map(({ collector }) => collector.type);
if (collectorsTypesNotReady.length) {
this.logger.debug(
`Some collectors are not ready (${collectorsTypesNotReady.join(',')}). ` +
`will return data from all collectors that are ready.`
);
}
const readyCollectors = nonTimedOutCollectors
.filter(({ isReadyWithTimeout }) => isReadyWithTimeout.value === true)
.map(({ collector }) => collector);
return readyCollectors;
};
public bulkFetch = async (
@ -145,8 +171,10 @@ export class CollectorSet {
kibanaRequest: KibanaRequest | undefined, // intentionally `| undefined` to enforce providing the parameter
collectors: Map<string, AnyCollector> = this.collectors
) => {
this.logger.debug(`Getting ready collectors`);
const readyCollectors = await this.getReadyCollectors(collectors);
const responses = await Promise.all(
[...collectors.values()].map(async (collector) => {
readyCollectors.map(async (collector) => {
this.logger.debug(`Fetching data from ${collector.type} collector`);
try {
const context = {

View file

@ -31,7 +31,6 @@ export const createUsageCollectionSetupMock = () => {
const usageCollectionSetupMock: jest.Mocked<UsageCollectionSetup> = {
createUsageCounter,
getUsageCounterByType,
areAllCollectorsReady: jest.fn().mockImplementation(collectorSet.areAllCollectorsReady),
bulkFetch: jest.fn().mockImplementation(collectorSet.bulkFetch),
getCollectorByType: jest.fn().mockImplementation(collectorSet.getCollectorByType),
toApiFieldNames: jest.fn().mockImplementation(collectorSet.toApiFieldNames),
@ -41,7 +40,6 @@ export const createUsageCollectionSetupMock = () => {
registerCollector: jest.fn().mockImplementation(collectorSet.registerCollector),
};
usageCollectionSetupMock.areAllCollectorsReady.mockResolvedValue(true);
return usageCollectionSetupMock;
};

View file

@ -59,11 +59,6 @@ export interface UsageCollectionSetup {
getCollectorByType: <TFetchReturn, ExtraOptions extends object>(
type: string
) => Collector<TFetchReturn, ExtraOptions> | undefined;
/**
* Returns if all the collectors are ready to fetch their reported usage.
* @internal: telemetry use
*/
areAllCollectorsReady: () => Promise<boolean>;
/**
* Fetches the collection from all the registered collectors
* @internal: telemetry use
@ -147,7 +142,6 @@ export class UsageCollectionPlugin implements Plugin<UsageCollectionSetup> {
});
return {
areAllCollectorsReady: collectorSet.areAllCollectorsReady,
bulkFetch: collectorSet.bulkFetch,
getCollectorByType: collectorSet.getCollectorByType,
makeStatsCollector: collectorSet.makeStatsCollector,

View file

@ -22,11 +22,6 @@ import {
ServiceStatusLevels,
} from '../../../../../core/server';
import { CollectorSet } from '../../collector';
const STATS_NOT_READY_MESSAGE = i18n.translate('usageCollection.stats.notReadyMessage', {
defaultMessage: 'Stats are not ready yet. Please try again later.',
});
const SNAPSHOT_REGEX = /-snapshot/i;
interface UsageObject {
@ -100,18 +95,10 @@ export function registerStatsRoute({
const { asCurrentUser } = context.core.elasticsearch.client;
const savedObjectsClient = context.core.savedObjects.client;
if (shouldGetUsage) {
const collectorsReady = await collectorSet.areAllCollectorsReady();
if (!collectorsReady) {
return res.customError({ statusCode: 503, body: { message: STATS_NOT_READY_MESSAGE } });
}
}
const usagePromise = shouldGetUsage
? getUsage(asCurrentUser, savedObjectsClient, req)
: Promise.resolve<UsageObject>({});
const [usage, clusterUuid] = await Promise.all([
usagePromise,
shouldGetUsage
? getUsage(asCurrentUser, savedObjectsClient, req)
: Promise.resolve<UsageObject>({}),
getClusterUuid(asCurrentUser),
]);

View file

@ -21,9 +21,6 @@ class MockCollectorSet {
isUsageCollector(x) {
return !!x.isUsageCollector;
}
areAllCollectorsReady() {
return this.mockCollectors.every((collector) => collector.isReady());
}
getCollectorByType(type) {
return (
this.mockCollectors.find((collector) => collector.type === type) || this.mockCollectors[0]

View file

@ -3993,7 +3993,6 @@
"uiActions.errors.incompatibleAction": "操作に互換性がありません",
"uiActions.triggers.rowClickkDescription": "テーブル行をクリック",
"uiActions.triggers.rowClickTitle": "テーブル行クリック",
"usageCollection.stats.notReadyMessage": "まだ統計が準備できていません。しばらくたってから再試行してください。",
"visDefaultEditor.advancedToggle.advancedLinkLabel": "高度な設定",
"visDefaultEditor.agg.disableAggButtonTooltip": "{schemaTitle} {aggTitle} アグリゲーションを無効にする",
"visDefaultEditor.agg.enableAggButtonTooltip": "{schemaTitle} {aggTitle} アグリゲーションを有効にする",

View file

@ -4025,7 +4025,6 @@
"uiActions.errors.incompatibleAction": "操作不兼容",
"uiActions.triggers.rowClickkDescription": "表格行的单击",
"uiActions.triggers.rowClickTitle": "表格行单击",
"usageCollection.stats.notReadyMessage": "统计信息尚未准备就绪。请稍后重试。",
"visDefaultEditor.advancedToggle.advancedLinkLabel": "高级",
"visDefaultEditor.agg.disableAggButtonTooltip": "禁用 {schemaTitle} {aggTitle} 聚合",
"visDefaultEditor.agg.enableAggButtonTooltip": "启用 {schemaTitle} {aggTitle} 聚合",

View file

@ -81,7 +81,6 @@ export default async function ({ readConfigFile }) {
'--server.uuid=5b2de169-2785-441b-ae8c-186a1936b17d',
'--xpack.maps.showMapsInspectorAdapter=true',
'--xpack.maps.preserveDrawingBuffer=true',
'--usageCollection.maximumWaitTimeForAllCollectorsInS=1',
'--xpack.security.encryptionKey="wuGNaIhoMpk5sO4UBxgr3NyW1sFcLgIf"', // server restarts should not invalidate active sessions
'--xpack.encryptedSavedObjects.encryptionKey="DkdXazszSCYexXqz4YktBGHCRkV6hyNK"',
'--xpack.discoverEnhanced.actions.exploreDataInContextMenu.enabled=true',