Refactor SearchLooper and rename it SearchPoll (#20315)

* Refactor SearchLooper by removing unused interface methods and internal logic.
* Rename SearchLooper to SearchPoll.
* Remove use of Angular $timeout.
* Make courier responsible for stopping the search poller when there's a fatal error.
* Integrate _search promises with the digest cycle.
This commit is contained in:
CJ Cenizal 2018-06-28 16:34:15 -07:00 committed by GitHub
parent 8f996d3c85
commit d1dce9b831
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 139 additions and 206 deletions

View file

@ -160,9 +160,9 @@ app.directive('dashboardApp', function ($injector) {
updateState();
$scope.refresh = (...args) => {
$scope.refresh = () => {
$rootScope.$broadcast('fetch');
courier.fetch(...args);
courier.fetch();
};
dashboardStateManager.handleTimeChange(timefilter.getTime());

View file

@ -29,33 +29,38 @@ import '../promises';
import { requestQueue } from './_request_queue';
import { FetchSoonProvider } from './fetch';
import { SearchLooperProvider } from './search_looper';
import { SearchPollProvider } from './search_poll';
uiModules.get('kibana/courier').service('courier', ($rootScope, Private) => {
const fetchSoon = Private(FetchSoonProvider);
// This manages the doc fetch interval.
const searchLooper = Private(SearchLooperProvider);
const searchPoll = Private(SearchPollProvider);
class Courier {
constructor() {
// Listen for refreshInterval changes
$rootScope.$listen(timefilter, 'refreshIntervalUpdate', function () {
const refreshValue = _.get(timefilter.getRefreshInterval(), 'value');
const refreshPause = _.get(timefilter.getRefreshInterval(), 'pause');
const refreshIntervalMs = _.get(timefilter.getRefreshInterval(), 'value');
const isRefreshPaused = _.get(timefilter.getRefreshInterval(), 'pause');
// Update the time between automatic search requests.
if (_.isNumber(refreshValue) && !refreshPause) {
searchLooper.setIntervalInMs(refreshValue);
searchPoll.setIntervalInMs(refreshIntervalMs);
if (isRefreshPaused) {
searchPoll.pause();
} else {
searchLooper.setIntervalInMs(0);
searchPoll.resume();
}
});
// Abort all pending requests if there's a fatal error.
const closeOnFatal = _.once(() => {
searchLooper.stop();
// If there was a fatal error, then stop future searches. We want to use pause instead of
// clearTimer because if the search results come back after the fatal error then we'll
// resume polling.
searchPoll.pause();
// And abort all pending requests.
_.invoke(requestQueue, 'abort');
if (requestQueue.length) {
@ -67,13 +72,12 @@ uiModules.get('kibana/courier').service('courier', ($rootScope, Private) => {
}
/**
* Process the pending request queue right now, returns
* a promise that resembles the success of the fetch completing,
* individual errors are routed to their respective requests.
* Fetch the pending requests.
*/
fetch = () => {
fetchSoon.fetchQueued().then(() => {
searchLooper.restart();
// Reset the timer using the time that we get this response as the starting point.
searchPoll.resetTimer();
});
};
}

View file

@ -53,6 +53,10 @@ export function FetchSoonProvider(Private, Promise) {
return Promise.all(requests.map(req => req.getCompletePromise()));
};
/**
* Return a promise that resembles the success of the fetch completing so we can execute
* logic based on this state change. Individual errors are routed to their respective requests.
*/
this.fetchQueued = () => {
return this.fetchSearchRequests(requestQueue.getStartable());
};

View file

@ -1,190 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import _ from 'lodash';
import { fatalError } from '../../notify';
import '../../promises';
import { requestQueue } from '../_request_queue';
import { FetchSoonProvider } from '../fetch';
export function SearchLooperProvider(Private, Promise, $timeout, $rootScope) {
const fetchSoon = Private(FetchSoonProvider);
class SearchLooper {
constructor() {
this._intervalInMs = undefined;
this._timer = null;
this._started = false;
}
/**
* Set the number of milliseconds between
* each loop
*
* @param {integer} intervalInMs
*/
setIntervalInMs = intervalInMs => {
this._intervalInMs = _.parseInt(intervalInMs) || 0;
if (!this._started) {
return;
}
if (this._intervalInMs) {
this.start(false);
} else {
this._unscheduleLoop();
}
};
start = loopOver => {
if (loopOver == null) {
loopOver = true;
}
if (!this._started) {
this._started = true;
} else {
this._unscheduleLoop();
}
if (loopOver) {
this._executeLoop();
} else {
this._scheduleLoop();
}
};
stop = () => {
this._unscheduleLoop();
this._started = false;
};
/**
* Restart the looper only if it is already started.
* Called automatically when ms is changed
*/
restart = () => {
this.start(false);
};
/**
* Is the looper currently started/running/scheduled/going to execute
*
* @return {boolean}
*/
started = () => {
return !!this._started;
};
/**
* Called when the loop is executed before the previous
* run has completed.
*
* @override
* @return {undefined}
*/
_onHastyLoop = () => {
if (this.afterHastyQueued) {
return;
}
this.afterHastyQueued = Promise.resolve(this.active)
.then(() => {
return this._executeLoop();
})
.finally(() => {
this.afterHastyQueued = null;
});
};
/**
* Wraps this._fn so that this._fn can be changed
* without rescheduling and schedules
* the next iteration
*
* @private
* @return {undefined}
*/
_executeLoop = () => {
if (this.active) {
this._onHastyLoop();
return;
}
this.active = Promise.try(this._executeLoopAction)
.then(() => {
this._scheduleLoop();
})
.catch(err => {
this.stop();
fatalError(err);
})
.finally(() => {
this.active = null;
});
};
_executeLoopAction = () => {
$rootScope.$broadcast('courier:searchRefresh');
const requests = requestQueue.getInactive();
// promise returned from fetch.fetchSearchRequests() only resolves when
// the requests complete, but we want to continue even if
// the requests abort so we make our own
fetchSoon.fetchSearchRequests(requests);
return Promise.all(
requests.map(request => request.getCompleteOrAbortedPromise())
);
};
/**
* Schedule the next iteration of the loop
*
* @private
* @return {number} - the timer promise
*/
_scheduleLoop = () => {
this._unscheduleLoop();
this._timer = this._intervalInMs
? $timeout(this._executeLoop, this._intervalInMs)
: null;
return this._timer;
};
/**
* Cancel the next iteration of the loop
*
* @private
* @return {number} - the timer promise
*/
_unscheduleLoop = () => {
if (this._timer) {
$timeout.cancel(this._timer);
this._timer = null;
}
};
}
return new SearchLooper();
}

View file

@ -17,4 +17,4 @@
* under the License.
*/
export { SearchLooperProvider } from './search_looper';
export { SearchPollProvider } from './search_poll';

View file

@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import _ from 'lodash';
import { fatalError } from '../../notify';
import '../../promises';
import { requestQueue } from '../_request_queue';
import { FetchSoonProvider } from '../fetch';
export function SearchPollProvider(Private, Promise, $rootScope) {
const fetchSoon = Private(FetchSoonProvider);
class SearchPoll {
constructor() {
this._isPolling = false;
this._intervalInMs = undefined;
this._timerId = null;
this._searchPromise = null;
this._isIntervalFasterThanSearch = false;
}
setIntervalInMs = intervalInMs => {
this._intervalInMs = _.parseInt(intervalInMs);
};
resume = () => {
this._isPolling = true;
this.resetTimer();
};
pause = () => {
this._isPolling = false;
this.clearTimer();
};
resetTimer = () => {
// Cancel the pending search and schedule a new one.
this.clearTimer();
if (this._isPolling) {
this._timerId = setTimeout(this._search, this._intervalInMs);
}
};
clearTimer = () => {
// Cancel the pending search, if there is one.
if (this._timerId) {
clearTimeout(this._timerId);
this._timerId = null;
}
};
_search = () => {
// If our interval is faster than the rate at which searches return results, then trigger
// a new search as soon as the results come back.
if (this._searchPromise) {
this._isIntervalFasterThanSearch = true;
return;
}
// Schedule another search.
this.resetTimer();
// We use resolve() here instead of try() because the latter won't trigger a $digest
// when the promise resolves.
this._searchPromise = Promise.resolve().then(() => {
$rootScope.$broadcast('courier:searchRefresh');
const requests = requestQueue.getInactive();
// The promise returned from fetchSearchRequests() only resolves when the requests complete.
// We want to continue even if the requests abort so we return a different promise.
fetchSoon.fetchSearchRequests(requests);
return Promise.all(
requests.map(request => request.getCompleteOrAbortedPromise())
);
})
.then(() => {
this._searchPromise = null;
// If the search response comes back before the interval fires, then we'll wait
// for the interval and let it kick off the next search. But if the interval fires before
// the search returns results, then we'll need to wait for the search to return results
// and then kick off another search again. A new search will also reset the interval.
if (this._isIntervalFasterThanSearch) {
this._isIntervalFasterThanSearch = false;
this._search();
}
})
.catch(err => {
// If there was a problem, then kill Kibana.
fatalError(err);
});
};
}
return new SearchPoll();
}