mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
[7.x] [Telemetry] [Monitoring] Only retry fetching usage once… (#54323)
* fix interval and add tests * Update x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js Co-Authored-By: Christiane (Tina) Heiligers <christiane.heiligers@elastic.co> Co-authored-by: Christiane (Tina) Heiligers <christiane.heiligers@elastic.co> Co-authored-by: Christiane (Tina) Heiligers <christiane.heiligers@elastic.co>
This commit is contained in:
parent
70870e26dd
commit
7c4f6cc2da
2 changed files with 85 additions and 27 deletions
|
@ -209,7 +209,7 @@ describe('BulkUploader', () => {
|
|||
}, CHECK_DELAY);
|
||||
});
|
||||
|
||||
it('refetches UsageCollectors if uploading to local cluster was not successful', done => {
|
||||
it('stops refetching UsageCollectors if uploading to local cluster was not successful', async () => {
|
||||
const usageCollectorFetch = sinon
|
||||
.stub()
|
||||
.returns({ type: 'type_usage_collector_test', result: { testData: 12345 } });
|
||||
|
@ -227,12 +227,52 @@ describe('BulkUploader', () => {
|
|||
|
||||
uploader._onPayload = async () => ({ took: 0, ignored: true, errors: false });
|
||||
|
||||
uploader.start(collectors);
|
||||
setTimeout(() => {
|
||||
uploader.stop();
|
||||
expect(usageCollectorFetch.callCount).to.be.greaterThan(1);
|
||||
done();
|
||||
}, CHECK_DELAY);
|
||||
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
|
||||
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
|
||||
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
|
||||
|
||||
expect(uploader._holdSendingUsage).to.eql(true);
|
||||
expect(usageCollectorFetch.callCount).to.eql(1);
|
||||
});
|
||||
|
||||
it('fetches UsageCollectors once uploading to local cluster is successful again', async () => {
|
||||
const usageCollectorFetch = sinon
|
||||
.stub()
|
||||
.returns({ type: 'type_usage_collector_test', result: { usageData: 12345 } });
|
||||
|
||||
const statsCollectorFetch = sinon
|
||||
.stub()
|
||||
.returns({ type: 'type_stats_collector_test', result: { statsData: 12345 } });
|
||||
|
||||
const collectors = new MockCollectorSet(server, [
|
||||
{
|
||||
fetch: statsCollectorFetch,
|
||||
isReady: () => true,
|
||||
formatForBulkUpload: result => result,
|
||||
isUsageCollector: false,
|
||||
},
|
||||
{
|
||||
fetch: usageCollectorFetch,
|
||||
isReady: () => true,
|
||||
formatForBulkUpload: result => result,
|
||||
isUsageCollector: true,
|
||||
},
|
||||
]);
|
||||
|
||||
const uploader = new BulkUploader({ ...server, interval: FETCH_INTERVAL });
|
||||
let bulkIgnored = true;
|
||||
uploader._onPayload = async () => ({ took: 0, ignored: bulkIgnored, errors: false });
|
||||
|
||||
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
|
||||
expect(uploader._holdSendingUsage).to.eql(true);
|
||||
|
||||
bulkIgnored = false;
|
||||
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
|
||||
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
|
||||
|
||||
expect(uploader._holdSendingUsage).to.eql(false);
|
||||
expect(usageCollectorFetch.callCount).to.eql(2);
|
||||
expect(statsCollectorFetch.callCount).to.eql(3);
|
||||
});
|
||||
|
||||
it('calls UsageCollectors if last reported exceeds during a _usageInterval', done => {
|
||||
|
|
|
@ -40,8 +40,14 @@ export class BulkUploader {
|
|||
}
|
||||
|
||||
this._timer = null;
|
||||
// Hold sending and fetching usage until monitoring.bulk is successful. This means that we
|
||||
// send usage data on the second tick. But would save a lot of bandwidth fetching usage on
|
||||
// every tick when ES is failing or monitoring is disabled.
|
||||
this._holdSendingUsage = false;
|
||||
this._interval = interval;
|
||||
this._lastFetchUsageTime = null;
|
||||
// Limit sending and fetching usage to once per day once usage is successfully stored
|
||||
// into the monitoring indices.
|
||||
this._usageInterval = TELEMETRY_COLLECTION_INTERVAL;
|
||||
|
||||
this._log = {
|
||||
|
@ -65,6 +71,29 @@ export class BulkUploader {
|
|||
});
|
||||
}
|
||||
|
||||
filterCollectorSet(usageCollection) {
|
||||
const successfulUploadInLastDay =
|
||||
this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now();
|
||||
|
||||
return usageCollection.getFilteredCollectorSet(c => {
|
||||
// this is internal bulk upload, so filter out API-only collectors
|
||||
if (c.ignoreForInternalUploader) {
|
||||
return false;
|
||||
}
|
||||
// Only collect usage data at the same interval as telemetry would (default to once a day)
|
||||
if (usageCollection.isUsageCollector(c)) {
|
||||
if (this._holdSendingUsage) {
|
||||
return false;
|
||||
}
|
||||
if (successfulUploadInLastDay) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Start the interval timer
|
||||
* @param {usageCollection} usageCollection object to use for initial the fetch/upload and fetch/uploading on interval
|
||||
|
@ -72,31 +101,15 @@ export class BulkUploader {
|
|||
*/
|
||||
start(usageCollection) {
|
||||
this._log.info('Starting monitoring stats collection');
|
||||
const filterCollectorSet = _usageCollection => {
|
||||
const successfulUploadInLastDay =
|
||||
this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now();
|
||||
|
||||
return _usageCollection.getFilteredCollectorSet(c => {
|
||||
// this is internal bulk upload, so filter out API-only collectors
|
||||
if (c.ignoreForInternalUploader) {
|
||||
return false;
|
||||
}
|
||||
// Only collect usage data at the same interval as telemetry would (default to once a day)
|
||||
if (successfulUploadInLastDay && _usageCollection.isUsageCollector(c)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
};
|
||||
|
||||
if (this._timer) {
|
||||
clearInterval(this._timer);
|
||||
} else {
|
||||
this._fetchAndUpload(filterCollectorSet(usageCollection)); // initial fetch
|
||||
this._fetchAndUpload(this.filterCollectorSet(usageCollection)); // initial fetch
|
||||
}
|
||||
|
||||
this._timer = setInterval(() => {
|
||||
this._fetchAndUpload(filterCollectorSet(usageCollection));
|
||||
this._fetchAndUpload(this.filterCollectorSet(usageCollection));
|
||||
}, this._interval);
|
||||
}
|
||||
|
||||
|
@ -146,12 +159,17 @@ export class BulkUploader {
|
|||
const sendSuccessful = !result.ignored && !result.errors;
|
||||
if (!sendSuccessful && hasUsageCollectors) {
|
||||
this._lastFetchUsageTime = null;
|
||||
this._holdSendingUsage = true;
|
||||
this._log.debug(
|
||||
'Resetting lastFetchWithUsage because uploading to the cluster was not successful.'
|
||||
);
|
||||
}
|
||||
if (sendSuccessful && hasUsageCollectors) {
|
||||
this._lastFetchUsageTime = Date.now();
|
||||
|
||||
if (sendSuccessful) {
|
||||
this._holdSendingUsage = false;
|
||||
if (hasUsageCollectors) {
|
||||
this._lastFetchUsageTime = Date.now();
|
||||
}
|
||||
}
|
||||
this._log.debug(`Uploaded bulk stats payload to the local cluster`);
|
||||
} catch (err) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue