mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 09:19:04 -04:00
Merge pull request #262 from w33ble/segmented-fetch-tests
Segmented fetch refactor and tests ヽ(;^o^ヽ)
This commit is contained in:
commit
8e1492494a
7 changed files with 772 additions and 186 deletions
|
@ -44,7 +44,8 @@
|
|||
"FileSaver": "*",
|
||||
"elasticsearch": "*",
|
||||
"bluebird": "~2.1.3",
|
||||
"lesshat": "~3.0.2"
|
||||
"lesshat": "~3.0.2",
|
||||
"Faker": "~1.1.0"
|
||||
},
|
||||
"devDependencies": {}
|
||||
}
|
||||
|
|
|
@ -1,23 +1,24 @@
|
|||
define(function (require) {
|
||||
return function DiscoverSegmentedFetch(es, Private, Promise, Notifier) {
|
||||
var activeReq = null;
|
||||
var notifyEvent;
|
||||
var searchPromise;
|
||||
var getStateFromRequest = Private(require('components/courier/fetch/strategy/search')).getSourceStateFromRequest;
|
||||
var _ = require('lodash');
|
||||
var moment = require('moment');
|
||||
|
||||
var segmentedFetch = {};
|
||||
var searchStrategy = Private(require('components/courier/fetch/strategy/search'));
|
||||
var eventName = 'segmented fetch';
|
||||
|
||||
var notify = new Notifier({
|
||||
location: 'Segmented Fetch'
|
||||
});
|
||||
|
||||
segmentedFetch.abort = function () {
|
||||
activeReq = null;
|
||||
searchPromise.abort();
|
||||
clearNotifyEvent();
|
||||
};
|
||||
// var segmentedFetch = {};
|
||||
function segmentedFetch(searchSource) {
|
||||
this.searchSource = searchSource;
|
||||
this.queue = [];
|
||||
this.completedQueue = [];
|
||||
this.requestHandlers = {};
|
||||
this.activeRequest = null;
|
||||
this.notifyEvent = null;
|
||||
this.lastRequestPromise = Promise.resolve();
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch search results, but segment by index name.
|
||||
|
@ -29,39 +30,114 @@ define(function (require) {
|
|||
* in decening order, this should be set to descending so that the data comes in its
|
||||
* proper order, otherwize indices will be fetched ascending
|
||||
*
|
||||
* // all callbacks can return a promise to delay furthur processing
|
||||
* // all callbacks can return a promise to delay further processing
|
||||
* @param {function} opts.first - a function that will be called for the first segment
|
||||
* @param {function} opts.each - a function that will be called for each segment
|
||||
* @param {function} opts.eachMerged - a function that will be called with the merged result on each segment
|
||||
* @param {function} opts.status - a function that will be called for each segment and given the process status
|
||||
*
|
||||
* @return {Promise}
|
||||
*/
|
||||
segmentedFetch.fetch = function (opts) {
|
||||
segmentedFetch.prototype.fetch = function (opts) {
|
||||
var self = this;
|
||||
var req;
|
||||
opts = opts || {};
|
||||
var searchSource = opts.searchSource;
|
||||
var direction = opts.direction;
|
||||
var limitSize = false;
|
||||
var remainingSize = false;
|
||||
|
||||
notifyEvent = notify.event('segmented fetch');
|
||||
self._stopRequest();
|
||||
|
||||
if (opts.totalSize) {
|
||||
limitSize = true;
|
||||
remainingSize = opts.totalSize;
|
||||
return (self.lastRequestPromise = self.lastRequestPromise.then(function () {
|
||||
// keep an internal record of the attached handlers
|
||||
self._setRequestHandlers(opts);
|
||||
|
||||
return Promise.try(function () {
|
||||
return self._extractQueue(opts.direction);
|
||||
})
|
||||
.then(function () {
|
||||
req = self._createRequest();
|
||||
return req;
|
||||
})
|
||||
.then(function (req) {
|
||||
return self._startRequest(req);
|
||||
})
|
||||
.then(function () {
|
||||
return self._executeRequest(req, opts);
|
||||
})
|
||||
.then(function () {
|
||||
return self._stopRequest();
|
||||
});
|
||||
}));
|
||||
};
|
||||
|
||||
segmentedFetch.prototype.abort = function () {
|
||||
this._stopRequest();
|
||||
return this.lastRequestPromise;
|
||||
};
|
||||
|
||||
segmentedFetch.prototype._startRequest = function (req) {
|
||||
var self = this;
|
||||
self.requestStats = {
|
||||
took: 0,
|
||||
hits: {
|
||||
hits: [],
|
||||
total: 0,
|
||||
max_score: 0
|
||||
}
|
||||
};
|
||||
|
||||
self._setRequest(req);
|
||||
self.notifyEvent = notify.event(eventName);
|
||||
};
|
||||
|
||||
segmentedFetch.prototype._stopRequest = function () {
|
||||
var self = this;
|
||||
|
||||
self._setRequest();
|
||||
self._clearNotification();
|
||||
if (self.searchPromise && 'abort' in self.searchPromise) {
|
||||
self.searchPromise.abort();
|
||||
}
|
||||
};
|
||||
|
||||
var req = searchSource._createRequest();
|
||||
req.moment = moment();
|
||||
req.source.activeFetchCount += 1;
|
||||
segmentedFetch.prototype._setRequest = function (req) {
|
||||
req = req || null;
|
||||
this.activeRequest = req;
|
||||
};
|
||||
|
||||
// track the req out of scope so that while we are itterating we can
|
||||
// ensure we are still relevant
|
||||
activeReq = req;
|
||||
segmentedFetch.prototype._clearNotification = function () {
|
||||
var self = this;
|
||||
if (_.isFunction(self.notifyEvent)) {
|
||||
self.notifyEvent();
|
||||
}
|
||||
};
|
||||
|
||||
var queue = searchSource.get('index').toIndexList();
|
||||
var total = queue.length;
|
||||
var active = null;
|
||||
var complete = [];
|
||||
segmentedFetch.prototype._setRequestHandlers = function (handlers) {
|
||||
this.requestHandlers = {
|
||||
first: handlers.first,
|
||||
each: handlers.each,
|
||||
eachMerged: handlers.eachMerged,
|
||||
status: handlers.status,
|
||||
};
|
||||
};
|
||||
|
||||
segmentedFetch.prototype._statusReport = function (active) {
|
||||
var self = this;
|
||||
|
||||
if (!self.requestHandlers.status) return;
|
||||
|
||||
var status = {
|
||||
total: self.queue.length,
|
||||
complete: self.completedQueue.length,
|
||||
remaining: self.queue.length,
|
||||
active: active
|
||||
};
|
||||
self.requestHandlers.status(status);
|
||||
|
||||
return status;
|
||||
};
|
||||
|
||||
segmentedFetch.prototype._extractQueue = function (direction) {
|
||||
var self = this;
|
||||
var queue = self.searchSource.get('index').toIndexList();
|
||||
|
||||
if (!_.isArray(queue)) {
|
||||
queue = [queue];
|
||||
|
@ -71,109 +147,156 @@ define(function (require) {
|
|||
queue = queue.reverse();
|
||||
}
|
||||
|
||||
var i = -1;
|
||||
var merged = {
|
||||
took: 0,
|
||||
hits: {
|
||||
hits: [],
|
||||
total: 0,
|
||||
max_score: 0
|
||||
}
|
||||
};
|
||||
|
||||
function reportStatus() {
|
||||
if (!opts.status) return;
|
||||
opts.status({
|
||||
total: total,
|
||||
complete: complete.length,
|
||||
remaining: queue.length,
|
||||
active: active
|
||||
});
|
||||
}
|
||||
|
||||
reportStatus();
|
||||
getStateFromRequest(req)
|
||||
.then(function (state) {
|
||||
return (function recurse() {
|
||||
var index = queue.shift();
|
||||
active = index;
|
||||
|
||||
reportStatus();
|
||||
|
||||
if (limitSize) {
|
||||
state.body.size = remainingSize;
|
||||
}
|
||||
req.state = state;
|
||||
|
||||
return execSearch(index, state)
|
||||
.then(function (resp) {
|
||||
// abort if fetch is called twice quickly
|
||||
if (req !== activeReq) return;
|
||||
|
||||
// a response was swallowed intentionally. Try the next one
|
||||
if (!resp) {
|
||||
if (queue.length) return recurse();
|
||||
else return done();
|
||||
}
|
||||
|
||||
// increment i after we are sure that we have a valid response
|
||||
// so that we always call opts.first()
|
||||
i++;
|
||||
|
||||
var start; // promise that starts the chain
|
||||
if (i === 0 && _.isFunction(opts.first)) {
|
||||
start = Promise.try(opts.first, [resp, req]);
|
||||
} else {
|
||||
start = Promise.resolve();
|
||||
}
|
||||
|
||||
if (limitSize) {
|
||||
remainingSize -= resp.hits.hits.length;
|
||||
}
|
||||
|
||||
return start.then(function () {
|
||||
var prom = each(merged, resp);
|
||||
return prom;
|
||||
})
|
||||
.then(function () {
|
||||
if (_.isFunction(opts.each)) return opts.each(resp, req);
|
||||
})
|
||||
.then(function () {
|
||||
var mergedCopy = _.omit(merged, '_bucketIndex');
|
||||
req.resp = mergedCopy;
|
||||
|
||||
if (_.isFunction(opts.eachMerged)) {
|
||||
// resolve with a "shallow clone" that omits the _aggIndex
|
||||
// which helps with watchers and protects the index
|
||||
return opts.eachMerged(mergedCopy, req);
|
||||
}
|
||||
})
|
||||
.then(function () {
|
||||
complete.push(index);
|
||||
if (queue.length) return recurse();
|
||||
return done();
|
||||
});
|
||||
});
|
||||
}());
|
||||
})
|
||||
.then(req.defer.resolve, req.defer.reject);
|
||||
|
||||
function done() {
|
||||
clearNotifyEvent();
|
||||
req.complete = true;
|
||||
req.ms = req.moment.diff() * -1;
|
||||
req.source.activeFetchCount -= 1;
|
||||
return (i + 1);
|
||||
}
|
||||
|
||||
return req.defer.promise;
|
||||
return self.queue = queue;
|
||||
};
|
||||
|
||||
function each(merged, resp) {
|
||||
merged.took += resp.took;
|
||||
merged.hits.total = Math.max(merged.hits.total, resp.hits.total);
|
||||
merged.hits.max_score = Math.max(merged.hits.max_score, resp.hits.max_score);
|
||||
[].push.apply(merged.hits.hits, resp.hits.hits);
|
||||
segmentedFetch.prototype._createRequest = function () {
|
||||
var self = this;
|
||||
var req = self.searchSource._createRequest();
|
||||
req.moment = moment();
|
||||
req.source.activeFetchCount += 1;
|
||||
return req;
|
||||
};
|
||||
|
||||
segmentedFetch.prototype._executeSearch = function (index, state) {
|
||||
var resolve, reject;
|
||||
|
||||
this.searchPromise = new Promise(function () {
|
||||
resolve = arguments[0];
|
||||
reject = arguments[1];
|
||||
});
|
||||
|
||||
var clientPromise = es.search({
|
||||
index: index,
|
||||
type: state.type,
|
||||
ignoreUnavailable: true,
|
||||
body: state.body
|
||||
});
|
||||
|
||||
this.searchPromise.abort = function () {
|
||||
clientPromise.abort();
|
||||
resolve(false);
|
||||
};
|
||||
|
||||
clientPromise.then(resolve)
|
||||
.catch(function (err) {
|
||||
// don't throw ClusterBlockException errors
|
||||
if (err.status === 403 && err.message.match(/ClusterBlockException.+index closed/)) {
|
||||
resolve(false);
|
||||
} else {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
|
||||
return this.searchPromise;
|
||||
};
|
||||
|
||||
segmentedFetch.prototype._executeRequest = function (req, opts) {
|
||||
var self = this;
|
||||
var complete = [];
|
||||
var remainingSize = false;
|
||||
|
||||
if (opts.totalSize) {
|
||||
remainingSize = opts.totalSize;
|
||||
}
|
||||
|
||||
// initial status report
|
||||
self._statusReport(null);
|
||||
|
||||
return searchStrategy.getSourceStateFromRequest(req)
|
||||
.then(function (state) {
|
||||
var loopCount = -1;
|
||||
return self._processQueue(req, state, remainingSize, loopCount);
|
||||
})
|
||||
.then(function (count) {
|
||||
return req.defer.resolve(count);
|
||||
})
|
||||
.catch(function (err) {
|
||||
req.defer.reject(err);
|
||||
return err;
|
||||
});
|
||||
};
|
||||
|
||||
segmentedFetch.prototype._processQueue = function (req, state, remainingSize, loopCount) {
|
||||
var self = this;
|
||||
var index = self.queue.shift();
|
||||
|
||||
// abort if request changed (fetch is called twice quickly)
|
||||
if (req !== self.activeRequest) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (remainingSize !== false) {
|
||||
state.body.size = remainingSize;
|
||||
}
|
||||
|
||||
req.state = state;
|
||||
|
||||
// update the status on every iteration
|
||||
self._statusReport(index);
|
||||
|
||||
return self._executeSearch(index, state)
|
||||
.then(function (resp) {
|
||||
// a response was swallowed intentionally. Try the next one
|
||||
if (!resp) {
|
||||
if (self.queue.length) return self._processQueue(req, state, remainingSize, loopCount);
|
||||
else return self._processQueueComplete(req, loopCount);
|
||||
}
|
||||
|
||||
// increment loopCount after we are sure that we have a valid response
|
||||
// so that we always call self.requestHandlers.first()
|
||||
loopCount++;
|
||||
|
||||
var start; // promise that starts the chain
|
||||
if (loopCount === 0 && _.isFunction(self.requestHandlers.first)) {
|
||||
start = Promise.try(self.requestHandlers.first, [resp, req]);
|
||||
} else {
|
||||
start = Promise.resolve();
|
||||
}
|
||||
|
||||
if (remainingSize !== false) {
|
||||
remainingSize -= resp.hits.hits.length;
|
||||
}
|
||||
|
||||
return start.then(function () {
|
||||
var prom = mergeRequestStats(self.requestStats, resp);
|
||||
return prom;
|
||||
})
|
||||
.then(function () {
|
||||
if (_.isFunction(self.requestHandlers.each)) {
|
||||
return self.requestHandlers.each(resp, req);
|
||||
}
|
||||
})
|
||||
.then(function () {
|
||||
var mergedCopy = _.omit(self.requestStats, '_bucketIndex');
|
||||
req.resp = mergedCopy;
|
||||
|
||||
if (_.isFunction(self.requestHandlers.eachMerged)) {
|
||||
// resolve with a "shallow clone" that omits the _aggIndex
|
||||
// which helps with watchers and protects the index
|
||||
return self.requestHandlers.eachMerged(mergedCopy, req);
|
||||
}
|
||||
})
|
||||
.then(function () {
|
||||
self.completedQueue.push(index);
|
||||
if (self.queue.length) return self._processQueue(req, state, remainingSize, loopCount);
|
||||
return self._processQueueComplete(req, loopCount);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
segmentedFetch.prototype._processQueueComplete = function (req, loopCount) {
|
||||
req.complete = true;
|
||||
req.ms = req.moment.diff() * -1;
|
||||
req.source.activeFetchCount -= 1;
|
||||
return (loopCount + 1);
|
||||
};
|
||||
|
||||
function mergeRequestStats(requestStats, resp) {
|
||||
requestStats.took += resp.took;
|
||||
requestStats.hits.total = Math.max(requestStats.hits.total, resp.hits.total);
|
||||
requestStats.hits.max_score = Math.max(requestStats.hits.max_score, resp.hits.max_score);
|
||||
[].push.apply(requestStats.hits.hits, resp.hits.hits);
|
||||
|
||||
if (!resp.aggregations) return;
|
||||
|
||||
|
@ -181,53 +304,29 @@ define(function (require) {
|
|||
return key.substr(0, 4) === 'agg_';
|
||||
});
|
||||
|
||||
if (!aggKey) throw new Error('aggKey not found in response: ' + Object.keys(resp.aggregations));
|
||||
|
||||
// start merging aggregations
|
||||
if (!merged.aggregations) {
|
||||
merged.aggregations = {};
|
||||
merged.aggregations[aggKey] = {
|
||||
if (!requestStats.aggregations) {
|
||||
requestStats.aggregations = {};
|
||||
requestStats.aggregations[aggKey] = {
|
||||
buckets: []
|
||||
};
|
||||
merged._bucketIndex = {};
|
||||
requestStats._bucketIndex = {};
|
||||
}
|
||||
|
||||
resp.aggregations[aggKey].buckets.forEach(function (bucket) {
|
||||
var mbucket = merged._bucketIndex[bucket.key];
|
||||
var mbucket = requestStats._bucketIndex[bucket.key];
|
||||
if (mbucket) {
|
||||
mbucket.doc_count += bucket.doc_count;
|
||||
return;
|
||||
}
|
||||
|
||||
mbucket = merged._bucketIndex[bucket.key] = bucket;
|
||||
merged.aggregations[aggKey].buckets.push(mbucket);
|
||||
mbucket = requestStats._bucketIndex[bucket.key] = bucket;
|
||||
requestStats.aggregations[aggKey].buckets.push(mbucket);
|
||||
});
|
||||
}
|
||||
|
||||
function execSearch(index, state) {
|
||||
searchPromise = es.search({
|
||||
index: index,
|
||||
type: state.type,
|
||||
ignoreUnavailable: true,
|
||||
body: state.body
|
||||
});
|
||||
|
||||
// don't throw ClusterBlockException errors
|
||||
searchPromise.catch(function (err) {
|
||||
if (err.status === 403 && err.message.match(/ClusterBlockException.+index closed/)) {
|
||||
return false;
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
|
||||
return searchPromise;
|
||||
}
|
||||
|
||||
function clearNotifyEvent() {
|
||||
if (_.isFunction(notifyEvent)) {
|
||||
notifyEvent();
|
||||
}
|
||||
}
|
||||
|
||||
return segmentedFetch;
|
||||
};
|
||||
});
|
|
@ -45,12 +45,12 @@ define(function (require) {
|
|||
}
|
||||
});
|
||||
|
||||
|
||||
app.controller('discover', function ($scope, config, courier, $route, $window, savedSearches, savedVisualizations,
|
||||
Notifier, $location, globalState, appStateFactory, timefilter, Promise, Private) {
|
||||
|
||||
var Vis = Private(require('components/vis/vis'));
|
||||
var segmentedFetch = $scope.segmentedFetch = Private(require('apps/discover/_segmented_fetch'));
|
||||
var SegmentedFetch = Private(require('apps/discover/_segmented_fetch'));
|
||||
|
||||
var HitSortFn = Private(require('apps/discover/_hit_sort_fn'));
|
||||
|
||||
var notify = new Notifier({
|
||||
|
@ -61,16 +61,17 @@ define(function (require) {
|
|||
var savedSearch = $route.current.locals.savedSearch;
|
||||
$scope.$on('$destroy', savedSearch.destroy);
|
||||
|
||||
// abort any seqmented query requests when leaving discover
|
||||
$scope.$on('$routeChangeStart', function () {
|
||||
segmentedFetch.abort();
|
||||
});
|
||||
|
||||
// list of indexPattern id's
|
||||
var indexPatternList = $route.current.locals.indexList;
|
||||
|
||||
// the actual courier.SearchSource
|
||||
$scope.searchSource = savedSearch.searchSource;
|
||||
var segmentedFetch = $scope.segmentedFetch = new SegmentedFetch($scope.searchSource);
|
||||
|
||||
// abort any seqmented query requests when leaving discover
|
||||
$scope.$on('$routeChangeStart', function () {
|
||||
segmentedFetch.abort();
|
||||
});
|
||||
|
||||
// Manage state & url state
|
||||
var initialQuery = $scope.searchSource.get('query');
|
||||
|
@ -238,7 +239,11 @@ define(function (require) {
|
|||
if (!init.complete) return;
|
||||
|
||||
$scope.updateTime();
|
||||
if (_.isEmpty($state.columns)) refreshColumns();
|
||||
|
||||
if (_.isEmpty($state.columns)) {
|
||||
refreshColumns();
|
||||
}
|
||||
|
||||
$scope.updateDataSource()
|
||||
.then(setupVisualization)
|
||||
.then(function () {
|
||||
|
@ -270,7 +275,6 @@ define(function (require) {
|
|||
}
|
||||
|
||||
return segmentedFetch.fetch({
|
||||
searchSource: $scope.searchSource,
|
||||
totalSize: sortBy === 'non-time' ? false : totalSize,
|
||||
direction: sortBy === 'time' ? sort[1] : 'desc',
|
||||
status: function (status) {
|
||||
|
|
|
@ -4,7 +4,8 @@
|
|||
<div class="typeahead" kbn-typeahead="discover">
|
||||
<div class="input-group"
|
||||
ng-class="discoverSearch.$invalid ? 'has-error' : ''">
|
||||
<input query-input="searchSource" input-focus
|
||||
<input query-input="searchSource"
|
||||
input-focus
|
||||
kbn-typeahead-input
|
||||
ng-model="state.query"
|
||||
placeholder="Search..."
|
||||
|
|
|
@ -11,26 +11,27 @@ require.config({
|
|||
lodash: 'utils/_mixins',
|
||||
|
||||
// bower_components
|
||||
angular: '../bower_components/angular/angular',
|
||||
'angular-route': '../bower_components/angular-route/angular-route',
|
||||
'angular-bootstrap': '../bower_components/angular-bootstrap/ui-bootstrap-tpls',
|
||||
'angular-bindonce': '../bower_components/angular-bindonce/bindonce',
|
||||
'angular-ui-ace': '../bower_components/angular-ui-ace/ui-ace',
|
||||
'angular-bootstrap': '../bower_components/angular-bootstrap/ui-bootstrap-tpls',
|
||||
'angular-elastic': '../bower_components/angular-elastic/elastic',
|
||||
'angular-route': '../bower_components/angular-route/angular-route',
|
||||
'angular-ui-ace': '../bower_components/angular-ui-ace/ui-ace',
|
||||
ace: '../bower_components/ace-builds/src-noconflict/ace',
|
||||
angular: '../bower_components/angular/angular',
|
||||
async: '../bower_components/async/lib/async',
|
||||
bower_components: '../bower_components',
|
||||
css: '../bower_components/require-css/css',
|
||||
d3: '../bower_components/d3/d3',
|
||||
text: '../bower_components/requirejs-text/text',
|
||||
elasticsearch: '../bower_components/elasticsearch/elasticsearch.angular',
|
||||
faker: '../bower_components/Faker/faker',
|
||||
file_saver: '../bower_components/FileSaver/FileSaver',
|
||||
gridster: '../bower_components/gridster/dist/jquery.gridster',
|
||||
inflection: '../bower_components/inflection/lib/inflection',
|
||||
jquery: '../bower_components/jquery/dist/jquery',
|
||||
jsonpath: '../bower_components/jsonpath/lib/jsonpath',
|
||||
lodash_src: '../bower_components/lodash/dist/lodash',
|
||||
moment: '../bower_components/moment/moment',
|
||||
gridster: '../bower_components/gridster/dist/jquery.gridster',
|
||||
jsonpath: '../bower_components/jsonpath/lib/jsonpath',
|
||||
inflection: '../bower_components/inflection/lib/inflection',
|
||||
file_saver: '../bower_components/FileSaver/FileSaver',
|
||||
bower_components: '../bower_components'
|
||||
text: '../bower_components/requirejs-text/text'
|
||||
},
|
||||
shim: {
|
||||
angular: {
|
||||
|
|
|
@ -61,6 +61,7 @@
|
|||
'kibana',
|
||||
'sinon/sinon',
|
||||
'specs/apps/discover/hit_sort_fn',
|
||||
'specs/apps/discover/segmented_fetch',
|
||||
'specs/directives/confirm-click',
|
||||
'specs/directives/timepicker',
|
||||
'specs/directives/truncate',
|
||||
|
|
479
test/unit/specs/apps/discover/segmented_fetch.js
Normal file
479
test/unit/specs/apps/discover/segmented_fetch.js
Normal file
|
@ -0,0 +1,479 @@
|
|||
define(function (require) {
|
||||
var sinon = require('test_utils/auto_release_sinon');
|
||||
var faker = require('faker');
|
||||
var Promise = require('bluebird');
|
||||
var _ = require('lodash');
|
||||
|
||||
var SegmentedFetch;
|
||||
var segmentedFetch;
|
||||
var searchStrategy;
|
||||
var searchSource;
|
||||
var mockSearchSource;
|
||||
var searchSourceStubs;
|
||||
var es;
|
||||
var notify;
|
||||
|
||||
function init() {
|
||||
module('kibana', function ($provide) {
|
||||
// mock notifier
|
||||
$provide.factory('Notifier', function () {
|
||||
function NotifierMock(opts) {
|
||||
this.opts = opts;
|
||||
}
|
||||
|
||||
|
||||
var stopEventSpy = sinon.spy();
|
||||
NotifierMock.prototype.event = sinon.stub().returns(stopEventSpy);
|
||||
|
||||
return NotifierMock;
|
||||
});
|
||||
|
||||
// mock es client
|
||||
$provide.factory('es', function () {
|
||||
return {};
|
||||
});
|
||||
});
|
||||
|
||||
inject(function ($injector, Private) {
|
||||
es = $injector.get('es');
|
||||
var Notifier = $injector.get('Notifier');
|
||||
notify = new Notifier();
|
||||
|
||||
SegmentedFetch = Private(require('apps/discover/_segmented_fetch'));
|
||||
|
||||
// mock the searchSource
|
||||
searchSourceStubs = {
|
||||
get: sinon.stub(),
|
||||
toIndexList: sinon.stub().returns([]),
|
||||
createRequest: sinon.spy(function () {
|
||||
return {
|
||||
defer: Promise.defer(),
|
||||
source: {
|
||||
activeFetchCount: 0
|
||||
}
|
||||
};
|
||||
})
|
||||
};
|
||||
|
||||
mockSearchSource = {
|
||||
get: searchSourceStubs.get.returns({
|
||||
toIndexList: searchSourceStubs.toIndexList.returns([])
|
||||
}),
|
||||
_createRequest: searchSourceStubs.createRequest
|
||||
};
|
||||
|
||||
// create segmentedFetch instance with mocked searchSource
|
||||
segmentedFetch = new SegmentedFetch(mockSearchSource);
|
||||
|
||||
// stub the searchStrategy
|
||||
searchStrategy = Private(require('components/courier/fetch/strategy/search'));
|
||||
sinon.stub(searchStrategy, 'getSourceStateFromRequest');
|
||||
});
|
||||
}
|
||||
|
||||
describe('segmented fetch', function () {
|
||||
require('test_utils/no_digest_promises').activateForSuite();
|
||||
|
||||
beforeEach(init);
|
||||
|
||||
describe('_executeSearch', function () {
|
||||
it('should attach abort method to searchPromise', function () {
|
||||
es.search = function () { return Promise.resolve(); };
|
||||
segmentedFetch._executeSearch('test-index', {body: '', type: ''});
|
||||
|
||||
expect(segmentedFetch.searchPromise).to.have.property('abort');
|
||||
});
|
||||
|
||||
it('should abort client promise', function () {
|
||||
var clientAbortSpy = sinon.spy();
|
||||
es.search = function () {
|
||||
function MockClass() {
|
||||
}
|
||||
|
||||
// mock the search client promise
|
||||
MockClass.prototype.then = function () {
|
||||
return this;
|
||||
};
|
||||
MockClass.prototype.catch = function () {
|
||||
return this;
|
||||
};
|
||||
MockClass.prototype.abort = clientAbortSpy;
|
||||
|
||||
return new MockClass();
|
||||
};
|
||||
|
||||
segmentedFetch._executeSearch(1, {body: '', type: ''});
|
||||
segmentedFetch.abort();
|
||||
|
||||
|
||||
return segmentedFetch.searchPromise.then(function (resolve) {
|
||||
expect(clientAbortSpy.callCount).to.be(1);
|
||||
expect(resolve).to.be(false);
|
||||
});
|
||||
});
|
||||
|
||||
it('should resolve on ClusterBlockException', function () {
|
||||
es.search = Promise.method(function () {
|
||||
throw {
|
||||
status: 403,
|
||||
message: 'ClusterBlockException mock error test, index closed'
|
||||
};
|
||||
});
|
||||
|
||||
segmentedFetch._executeSearch('test-index', {body: '', type: ''});
|
||||
|
||||
return segmentedFetch.searchPromise.then(function (resolve) {
|
||||
expect(resolve).to.be(false);
|
||||
});
|
||||
});
|
||||
|
||||
it('should reject on es client errors', function () {
|
||||
es.search = Promise.method(function () {
|
||||
throw new Error('es client error of some kind');
|
||||
});
|
||||
|
||||
segmentedFetch._executeSearch('test-index', {body: '', type: ''});
|
||||
|
||||
return segmentedFetch.searchPromise.catch(function (err) {
|
||||
expect(err.message).to.be('es client error of some kind');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('_processQueue', function () {
|
||||
var queueSpy;
|
||||
var completeSpy;
|
||||
var queue = [];
|
||||
|
||||
// mock es client response trackers
|
||||
var totalTime;
|
||||
var totalHits;
|
||||
var maxHits;
|
||||
var maxScore;
|
||||
var aggregationKeys;
|
||||
|
||||
var getESResponse = function (index, state) {
|
||||
var took = _.random(20, 60);
|
||||
var score = _.random(20, 90) / 100;
|
||||
var hits = faker.Lorem.sentence().split(' ');
|
||||
var aggKey = 'key' + _.random(1, 100);
|
||||
totalTime += took;
|
||||
totalHits += hits.length;
|
||||
maxHits = Math.max(maxHits, hits.length);
|
||||
maxScore = Math.max(maxScore, score);
|
||||
aggregationKeys = _.union(aggregationKeys, [aggKey]);
|
||||
|
||||
return Promise.resolve({
|
||||
took: took,
|
||||
hits: {
|
||||
hits: hits,
|
||||
total: maxHits,
|
||||
max_score: score
|
||||
},
|
||||
aggregations: {
|
||||
'agg_test': {
|
||||
buckets: [{
|
||||
doc_count: hits.length,
|
||||
key: aggKey
|
||||
}]
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
beforeEach(function () {
|
||||
totalTime = 0;
|
||||
totalHits = 0;
|
||||
maxHits = 0;
|
||||
maxScore = 0;
|
||||
aggregationKeys = [];
|
||||
|
||||
queueSpy = sinon.spy(SegmentedFetch.prototype, '_processQueue');
|
||||
completeSpy = sinon.spy(SegmentedFetch.prototype, '_processQueueComplete');
|
||||
|
||||
for (var i = 0; i < _.random(3, 6); i++) {
|
||||
queue.push('test-' + i);
|
||||
}
|
||||
|
||||
sinon.stub(SegmentedFetch.prototype, '_extractQueue', function () {
|
||||
this.queue = queue.slice(0);
|
||||
});
|
||||
|
||||
searchStrategy.getSourceStateFromRequest.returns(Promise.resolve({
|
||||
body: {
|
||||
size: 10
|
||||
}
|
||||
}));
|
||||
});
|
||||
|
||||
it('should merge stats and complete', function () {
|
||||
|
||||
sinon.stub(SegmentedFetch.prototype, '_executeSearch', getESResponse);
|
||||
|
||||
function eachHandler(resp, req) {
|
||||
// check results from mergeRequestStats
|
||||
expect(segmentedFetch.requestStats).to.have.property('aggregations');
|
||||
expect(segmentedFetch.requestStats.aggregations['agg_test'].buckets.length).to.be(aggregationKeys.length);
|
||||
expect(segmentedFetch.requestStats.took).to.be(totalTime);
|
||||
expect(segmentedFetch.requestStats.hits.hits.length).to.be(totalHits);
|
||||
expect(segmentedFetch.requestStats.hits.total).to.be(maxHits);
|
||||
expect(segmentedFetch.requestStats.hits.max_score).to.be(maxScore);
|
||||
|
||||
// check aggregation stats
|
||||
aggregationKeys.forEach(function (key) {
|
||||
expect(segmentedFetch.requestStats._bucketIndex).to.have.property(key);
|
||||
});
|
||||
}
|
||||
|
||||
return segmentedFetch.fetch({ each: eachHandler }).then(function () {
|
||||
expect(completeSpy.callCount).to.be(1);
|
||||
expect(queueSpy.callCount).to.be(queue.length);
|
||||
});
|
||||
});
|
||||
|
||||
it('should complete on falsey response', function () {
|
||||
sinon.stub(SegmentedFetch.prototype, '_executeSearch', function (index, state) {
|
||||
return Promise.resolve(false);
|
||||
});
|
||||
|
||||
return segmentedFetch.fetch().then(function () {
|
||||
expect(completeSpy.callCount).to.be(1);
|
||||
expect(queueSpy.callCount).to.be(queue.length);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('fetch', function () {
|
||||
it('should return a promise', function () {
|
||||
sinon.stub(SegmentedFetch.prototype, '_executeRequest', Promise.resolve);
|
||||
|
||||
var fetch = segmentedFetch.fetch();
|
||||
expect('then' in fetch).to.be(true);
|
||||
return fetch;
|
||||
});
|
||||
|
||||
it('should stop the request', function () {
|
||||
var stopSpy = sinon.spy(SegmentedFetch.prototype, '_stopRequest');
|
||||
sinon.stub(SegmentedFetch.prototype, '_executeRequest', Promise.resolve);
|
||||
|
||||
return segmentedFetch.fetch().then(function () {
|
||||
// always called on fetch, called again at resolution
|
||||
expect(stopSpy.callCount).to.be(2);
|
||||
});
|
||||
});
|
||||
|
||||
it('should stop multiple requests', function () {
|
||||
var stopSpy = sinon.spy(SegmentedFetch.prototype, '_stopRequest');
|
||||
sinon.stub(SegmentedFetch.prototype, '_executeRequest').returns(Promise.delay(5));
|
||||
|
||||
segmentedFetch.fetch();
|
||||
|
||||
return Promise.delay(1).then(function () {
|
||||
return segmentedFetch.fetch().then(function () {
|
||||
// 1 for fetch
|
||||
// 1 for second fetch
|
||||
// 1 for stopping the first request early
|
||||
// 1 for resolving the second request
|
||||
expect(stopSpy.callCount).to.be(4);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('should wait before starting new requests', function () {
|
||||
var startSpy = sinon.spy(SegmentedFetch.prototype, '_startRequest');
|
||||
var stopSpy = sinon.spy(SegmentedFetch.prototype, '_stopRequest');
|
||||
var fetchCount = _.random(3, 6);
|
||||
var resolveCount = 0;
|
||||
var resolvedPromises = [];
|
||||
|
||||
sinon.stub(SegmentedFetch.prototype, '_executeRequest', function () {
|
||||
// keep resolving faster as we move along
|
||||
return Promise.delay(fetchCount - resolveCount);
|
||||
});
|
||||
|
||||
_.times(fetchCount, function (idx) {
|
||||
resolvedPromises.push(segmentedFetch.fetch().then(function () {
|
||||
var resolveOrder = idx + 1;
|
||||
++resolveCount;
|
||||
|
||||
expect(resolveCount).to.be(resolveOrder);
|
||||
expect(startSpy.callCount).to.be(resolveOrder);
|
||||
// called once for every fetch, and again for each resolution
|
||||
expect(stopSpy.callCount).to.be(fetchCount + resolveOrder);
|
||||
}));
|
||||
});
|
||||
|
||||
return Promise.all(resolvedPromises);
|
||||
});
|
||||
|
||||
it('should perform actions on searchSource', function () {
|
||||
sinon.stub(SegmentedFetch.prototype, '_executeRequest', Promise.resolve);
|
||||
|
||||
return segmentedFetch.fetch().then(function () {
|
||||
// read the searchSource queue
|
||||
expect(searchSourceStubs.get.callCount).to.be(1);
|
||||
expect(searchSourceStubs.toIndexList.callCount).to.be(1);
|
||||
// create the searchSource request
|
||||
expect(searchSourceStubs.createRequest.callCount).to.be(1);
|
||||
});
|
||||
});
|
||||
|
||||
it('should create a notification event', function () {
|
||||
sinon.stub(SegmentedFetch.prototype, '_executeRequest', Promise.resolve);
|
||||
|
||||
return segmentedFetch.fetch().then(function () {
|
||||
expect(notify.event.callCount).to.be(1);
|
||||
});
|
||||
});
|
||||
|
||||
it('should report initial status', function () {
|
||||
var statusStub = sinon.stub();
|
||||
sinon.stub(SegmentedFetch.prototype, '_processQueue', function () {
|
||||
return new Promise(function (res) { return res(); });
|
||||
});
|
||||
searchStrategy.getSourceStateFromRequest.returns(Promise.resolve());
|
||||
|
||||
return segmentedFetch.fetch({
|
||||
status: statusStub
|
||||
}).then(function () {
|
||||
expect(statusStub.callCount).to.be(1);
|
||||
|
||||
var status = statusStub.getCall(0).args[0];
|
||||
expect(status.active).to.be(null);
|
||||
expect(status.total).to.be(searchSourceStubs.toIndexList.length);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('abort', function () {
|
||||
it('should return a promise', function () {
|
||||
var abort = segmentedFetch.abort();
|
||||
expect('then' in abort).to.be(true);
|
||||
return abort;
|
||||
});
|
||||
|
||||
it('should abort the existing fetch', function () {
|
||||
var loopCount = 3;
|
||||
var queue = [];
|
||||
for (var i = 0; i <= loopCount; i++) {
|
||||
queue.push('queue-index-' + i);
|
||||
}
|
||||
|
||||
sinon.stub(SegmentedFetch.prototype, '_extractQueue', function () {
|
||||
this.queue = queue;
|
||||
});
|
||||
|
||||
sinon.stub(SegmentedFetch.prototype, '_executeSearch', function () {
|
||||
return new Promise(function (resolve) {
|
||||
resolve({
|
||||
took: 10,
|
||||
hits: {
|
||||
total: 10,
|
||||
max_score: 1,
|
||||
hits: []
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
searchStrategy.getSourceStateFromRequest.returns(Promise.resolve({
|
||||
body: {
|
||||
size: 10
|
||||
}
|
||||
}));
|
||||
|
||||
var eachHandler = sinon.spy(function () {
|
||||
if (eachHandler.callCount === loopCount) {
|
||||
segmentedFetch.abort();
|
||||
}
|
||||
});
|
||||
|
||||
return segmentedFetch.fetch({ each: eachHandler }).then(function () {
|
||||
expect(eachHandler.callCount).to.be(loopCount);
|
||||
});
|
||||
});
|
||||
|
||||
it('should abort the searchPromise', function () {
|
||||
var searchPromiseAbortStub = sinon.spy();
|
||||
|
||||
sinon.stub(SegmentedFetch.prototype, '_extractQueue', function () {
|
||||
this.queue = ['one', 'two', 'three'];
|
||||
});
|
||||
|
||||
sinon.stub(SegmentedFetch.prototype, '_executeSearch', function () {
|
||||
this.searchPromise = { abort: searchPromiseAbortStub };
|
||||
return Promise.resolve();
|
||||
});
|
||||
|
||||
sinon.stub(SegmentedFetch.prototype, '_executeRequest', function () {
|
||||
var self = this;
|
||||
return self._executeSearch()
|
||||
.then(function () {
|
||||
if (typeof self.requestHandlers.each === 'function') {
|
||||
return self.requestHandlers.each();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
searchStrategy.getSourceStateFromRequest.returns(Promise.resolve({
|
||||
body: {
|
||||
size: 10
|
||||
}
|
||||
}));
|
||||
|
||||
var eachHandler = sinon.spy(function () {
|
||||
segmentedFetch.abort();
|
||||
});
|
||||
|
||||
return segmentedFetch.fetch({ each: eachHandler }).then(function () {
|
||||
expect(eachHandler.callCount).to.be(1);
|
||||
// 1 for fetch, 1 for actual abort call
|
||||
expect(searchPromiseAbortStub.callCount).to.be(2);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
it('should clear the notification', function () {
|
||||
segmentedFetch.notifyEvent = sinon.spy();
|
||||
|
||||
sinon.stub(SegmentedFetch.prototype, 'fetch', function (opts) {
|
||||
var SegmentedFetchSelf = this;
|
||||
var fakeRequest = {};
|
||||
|
||||
return Promise.try(function () {
|
||||
return SegmentedFetchSelf._startRequest();
|
||||
})
|
||||
.then(function () {
|
||||
SegmentedFetchSelf._setRequest(fakeRequest);
|
||||
})
|
||||
.then(function () {
|
||||
// dumb mock or the fetch lifecycle
|
||||
// loop, running each
|
||||
while (SegmentedFetchSelf.activeRequest !== null) {
|
||||
if (typeof opts.each === 'function') {
|
||||
opts.each();
|
||||
}
|
||||
}
|
||||
|
||||
// return when activeRequest is null
|
||||
return;
|
||||
})
|
||||
.then(function () {
|
||||
SegmentedFetchSelf._stopRequest();
|
||||
});
|
||||
});
|
||||
|
||||
var eachHandler = sinon.spy(function () {
|
||||
// will set activeRequest to null
|
||||
segmentedFetch.abort();
|
||||
});
|
||||
|
||||
return segmentedFetch.fetch({ each: eachHandler }).then(function () {
|
||||
expect(eachHandler.callCount).to.be(1);
|
||||
// 1 for stop from fetch, 1 from abort
|
||||
expect(segmentedFetch.notifyEvent.callCount).to.be(2);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
Loading…
Add table
Add a link
Reference in a new issue