major, hopefully final, refactor of segmentedFetch and updated tests to match

This commit is contained in:
Joe Fleming 2014-08-21 14:07:33 -07:00
parent f99b5f613a
commit 915071002b
2 changed files with 232 additions and 227 deletions

View file

@ -20,216 +20,6 @@ define(function (require) {
this.notifyEvent = null;
}
// segmentedFetch.prototype.setSearchSource(searchSource) {
// this.searchSource = searchSource;
// };
segmentedFetch.prototype._startRequest = function (req) {
var self = this;
var p = Promise.resolve();
// stop any existing segmentedFetches
if (self.running) {
p = p.then(self._stopRequest);
}
return p.then(function () {
self.activeRequest = req;
self.running = true;
self.notifyEvent = notify.event(eventName);
});
};
segmentedFetch.prototype._stopRequest = function () {
var self = this;
var p = Promise.resolve();
return p.then(function () {
self.activeRequest = null;
self.running = false;
if (_.isFunction(self.notifyEvent)) {
self.notifyEvent();
self.notifyEvent = null;
}
});
};
segmentedFetch.prototype._execSearch = function (index, state) {
this.searchPromise = es.search({
index: index,
type: state.type,
ignoreUnavailable: true,
body: state.body
});
// don't throw ClusterBlockException errors
this.searchPromise.catch(function (err) {
if (err.status === 403 && err.message.match(/ClusterBlockException.+index closed/)) {
return false;
} else {
throw err;
}
});
return this.searchPromise;
};
segmentedFetch.prototype._extractQueue = function (direction) {
var self = this;
var queue = self.searchSource.get('index').toIndexList();
if (!_.isArray(queue)) {
queue = [queue];
}
if (direction === 'desc') {
queue = queue.reverse();
}
self.queue = queue;
};
segmentedFetch.prototype._createRequest = function () {
var self = this;
var req = self.searchSource._createRequest();
req.moment = moment();
req.source.activeFetchCount += 1;
return req;
};
segmentedFetch.prototype._statusReport = function (active) {
var self = this;
if (!self.requestHandlers.status) return;
var status = {
total: self.queue.length,
complete: self.completedQueue.length,
remaining: self.queue.length,
active: active
};
self.requestHandlers.status(status);
return status;
};
segmentedFetch.prototype._processQueue = function (req, opts) {
var self = this;
var complete = [];
var limitSize = false;
var remainingSize = false;
if (opts.totalSize) {
limitSize = true;
remainingSize = opts.totalSize;
}
var i = -1;
var merged = {
took: 0,
hits: {
hits: [],
total: 0,
max_score: 0
}
};
// initial status report
self._statusReport(null);
searchStrategy.getSourceStateFromRequest(req)
.then(function (state) {
return (function recurse() {
var index = self.queue.shift();
self._statusReport(index);
if (limitSize) {
state.body.size = remainingSize;
}
req.state = state;
return self._execSearch(index, state)
.then(function (resp) {
// abort if fetch is called twice quickly
if (!self.running || req !== self.activeRequest) return;
// a response was swallowed intentionally. Try the next one
if (!resp) {
if (self.queue.length) return recurse();
else return done();
}
// increment i after we are sure that we have a valid response
// so that we always call opts.first()
i++;
var start; // promise that starts the chain
if (i === 0 && _.isFunction(opts.first)) {
start = Promise.try(opts.first, [resp, req]);
} else {
start = Promise.resolve();
}
if (limitSize) {
remainingSize -= resp.hits.hits.length;
}
return start.then(function () {
var prom = each(merged, resp);
return prom;
})
.then(function () {
if (_.isFunction(opts.each)) {
return opts.each(resp, req);
}
})
.then(function () {
var mergedCopy = _.omit(merged, '_bucketIndex');
req.resp = mergedCopy;
if (_.isFunction(opts.eachMerged)) {
// resolve with a "shallow clone" that omits the _aggIndex
// which helps with watchers and protects the index
return opts.eachMerged(mergedCopy, req);
}
})
.then(function () {
complete.push(index);
if (self.queue.length) return recurse();
return done();
});
});
}());
})
.then(req.defer.resolve, req.defer.reject);
return req.defer.promise;
function done() {
return self._stopRequest().then(function () {
req.complete = true;
req.ms = req.moment.diff() * -1;
req.source.activeFetchCount -= 1;
return (i + 1);
});
}
};
segmentedFetch.prototype.abort = function () {
var self = this;
var stop = self._stopRequest();
// if we have a searchPromise, abort it as well
if (self.searchPromise && 'abort' in self.searchPromise) {
return stop.then(self.searchPromise.abort);
}
return stop;
};
/**
* Fetch search results, but segment by index name.
*
@ -267,15 +57,230 @@ define(function (require) {
self._startRequest(req);
})
.then(function () {
self._processQueue(req, opts);
self._executeRequest(req, opts);
});
};
function each(merged, resp) {
merged.took += resp.took;
merged.hits.total = Math.max(merged.hits.total, resp.hits.total);
merged.hits.max_score = Math.max(merged.hits.max_score, resp.hits.max_score);
[].push.apply(merged.hits.hits, resp.hits.hits);
segmentedFetch.prototype.abort = function () {
var self = this;
var stop = self._stopRequest();
// if we have a searchPromise, abort it as well
if (self.searchPromise && 'abort' in self.searchPromise) {
return stop.then(self.searchPromise.abort);
}
return stop;
};
segmentedFetch.prototype._startRequest = function (req) {
var self = this;
self.requestStats = {
took: 0,
hits: {
hits: [],
total: 0,
max_score: 0
}
};
var p = Promise.resolve();
// stop any existing segmentedFetches
if (self.running) {
p = p.then(self._stopRequest);
}
return p.then(function () {
self.activeRequest = req;
self.running = true;
self.notifyEvent = notify.event(eventName);
});
};
segmentedFetch.prototype._stopRequest = function () {
var self = this;
var p = Promise.resolve();
return p.then(function () {
self.activeRequest = null;
self.running = false;
if (_.isFunction(self.notifyEvent)) {
self.notifyEvent();
self.notifyEvent = null;
}
});
};
segmentedFetch.prototype._setRequestHandlers = function (handlers) {
this.requestHandlers = {
first: handlers.first,
each: handlers.each,
eachMerged: handlers.eachMerged,
status: handlers.status,
};
};
segmentedFetch.prototype._statusReport = function (active) {
var self = this;
if (!self.requestHandlers.status) return;
var status = {
total: self.queue.length,
complete: self.completedQueue.length,
remaining: self.queue.length,
active: active
};
self.requestHandlers.status(status);
return status;
};
segmentedFetch.prototype._extractQueue = function (direction) {
var self = this;
var queue = self.searchSource.get('index').toIndexList();
if (!_.isArray(queue)) {
queue = [queue];
}
if (direction === 'desc') {
queue = queue.reverse();
}
self.queue = queue;
};
segmentedFetch.prototype._createRequest = function () {
var self = this;
var req = self.searchSource._createRequest();
req.moment = moment();
req.source.activeFetchCount += 1;
return req;
};
segmentedFetch.prototype._executeSearch = function (index, state) {
this.searchPromise = es.search({
index: index,
type: state.type,
ignoreUnavailable: true,
body: state.body
});
// don't throw ClusterBlockException errors
this.searchPromise.catch(function (err) {
if (err.status === 403 && err.message.match(/ClusterBlockException.+index closed/)) {
return false;
} else {
throw err;
}
});
return this.searchPromise;
};
segmentedFetch.prototype._executeRequest = function (req, opts) {
var self = this;
var complete = [];
var remainingSize = false;
if (opts.totalSize) {
remainingSize = opts.totalSize;
}
// initial status report
self._statusReport(null);
searchStrategy.getSourceStateFromRequest(req)
.then(function (state) {
var loopCount = -1;
return self._processQueue(req, state, remainingSize, loopCount);
})
.then(req.defer.resolve, req.defer.reject);
return req.defer.promise;
};
segmentedFetch.prototype._processQueue = function (req, state, remainingSize, loopCount) {
var self = this;
var index = self.queue.shift();
// update the status on every iteration
self._statusReport(index);
if (remainingSize !== false) {
state.body.size = remainingSize;
}
req.state = state;
return self._executeSearch(index, state)
.then(function (resp) {
// abort if not in running state, or fetch is called twice quickly
if (!self.running || req !== self.activeRequest) return;
// a response was swallowed intentionally. Try the next one
if (!resp) {
if (self.queue.length) return self._processQueue(req, state, remainingSize, loopCount);
else return self._processQueueComplete(req, loopCount);
}
// increment loopCount after we are sure that we have a valid response
// so that we always call self.requestHandlers.first()
loopCount++;
var start; // promise that starts the chain
if (loopCount === 0 && _.isFunction(self.requestHandlers.first)) {
start = Promise.try(self.requestHandlers.first, [resp, req]);
} else {
start = Promise.resolve();
}
if (remainingSize !== false) {
remainingSize -= resp.hits.hits.length;
}
return start.then(function () {
var prom = mergeRequestStats(self.requestStats, resp);
return prom;
})
.then(function () {
if (_.isFunction(self.requestHandlers.each)) {
return self.requestHandlers.each(resp, req);
}
})
.then(function () {
var mergedCopy = _.omit(self.requestStats, '_bucketIndex');
req.resp = mergedCopy;
if (_.isFunction(self.requestHandlers.eachMerged)) {
// resolve with a "shallow clone" that omits the _aggIndex
// which helps with watchers and protects the index
return self.requestHandlers.eachMerged(mergedCopy, req);
}
})
.then(function () {
self.completedQueue.push(index);
if (self.queue.length) return self._processQueue(req, state, remainingSize, loopCount);
return self._processQueueComplete(req, loopCount);
});
});
};
segmentedFetch.prototype._processQueueComplete = function (req, loopCount) {
return this._stopRequest().then(function () {
req.complete = true;
req.ms = req.moment.diff() * -1;
req.source.activeFetchCount -= 1;
return (loopCount + 1);
});
};
function mergeRequestStats(requestStats, resp) {
requestStats.took += resp.took;
requestStats.hits.total = Math.max(requestStats.hits.total, resp.hits.total);
requestStats.hits.max_score = Math.max(requestStats.hits.max_score, resp.hits.max_score);
[].push.apply(requestStats.hits.hits, resp.hits.hits);
if (!resp.aggregations) return;
@ -284,23 +289,23 @@ define(function (require) {
});
// start merging aggregations
if (!merged.aggregations) {
merged.aggregations = {};
merged.aggregations[aggKey] = {
if (!requestStats.aggregations) {
requestStats.aggregations = {};
requestStats.aggregations[aggKey] = {
buckets: []
};
merged._bucketIndex = {};
requestStats._bucketIndex = {};
}
resp.aggregations[aggKey].buckets.forEach(function (bucket) {
var mbucket = merged._bucketIndex[bucket.key];
var mbucket = requestStats._bucketIndex[bucket.key];
if (mbucket) {
mbucket.doc_count += bucket.doc_count;
return;
}
mbucket = merged._bucketIndex[bucket.key] = bucket;
merged.aggregations[aggKey].buckets.push(mbucket);
mbucket = requestStats._bucketIndex[bucket.key] = bucket;
requestStats.aggregations[aggKey].buckets.push(mbucket);
});
}

View file

@ -62,7 +62,7 @@ define(function (require) {
describe('fetch', function () {
it('should return a promise', function () {
SegmentedFetch.prototype._startRequest = Promise.resolve;
SegmentedFetch.prototype._processQueue = Promise.resolve;
SegmentedFetch.prototype._executeRequest = Promise.resolve;
var fetch = segmentedFetch.fetch();
expect('then' in fetch).to.be(true);
@ -70,7 +70,7 @@ define(function (require) {
});
it('should set the running state', function () {
SegmentedFetch.prototype._processQueue = Promise.resolve;
SegmentedFetch.prototype._executeRequest = Promise.resolve;
return segmentedFetch.fetch().then(function () {
expect(segmentedFetch.running).to.be(true);
@ -79,7 +79,7 @@ define(function (require) {
it('should stop existing requests', function (done) {
segmentedFetch.running = true;
SegmentedFetch.prototype._processQueue = Promise.resolve;
SegmentedFetch.prototype._executeRequest = Promise.resolve;
SegmentedFetch.prototype._stopProcess = sinon.stub().returns(Promise.resolve());
return segmentedFetch.fetch().then(function () {
@ -95,7 +95,7 @@ define(function (require) {
it('should perform actions on searchSource', function () {
SegmentedFetch.prototype._startRequest = Promise.resolve;
SegmentedFetch.prototype._processQueue = Promise.resolve;
SegmentedFetch.prototype._executeRequest = Promise.resolve;
return segmentedFetch.fetch().then(function () {
// read the searchSource queue