track promise chain in fetch, abort es request in _stopRequest, proxy abort to resolve promise

This commit is contained in:
Joe Fleming 2014-08-25 15:53:42 -07:00
parent c62d6bcb67
commit 2989a3ee6b

View file

@ -17,6 +17,7 @@ define(function (require) {
this.requestHandlers = {};
this.activeRequest = null;
this.notifyEvent = null;
this.lastRequestPromise = Promise.resolve();
}
/**
@ -42,45 +43,38 @@ define(function (require) {
var req;
opts = opts || {};
// keep an internal record of the attached handlers
self._setRequestHandlers(opts);
self._stopRequest();
return Promise.try(function () {
return self._startRequest();
})
.then(function () {
return self._extractQueue(opts.direction);
})
.then(function () {
req = self._createRequest();
return req;
})
.then(function (req) {
return self._setRequest(req);
})
.then(function () {
return self._executeRequest(req, opts);
})
.then(function () {
return self._stopRequest();
});
return (self.lastRequestPromise = self.lastRequestPromise.then(function () {
// keep an internal record of the attached handlers
self._setRequestHandlers(opts);
return Promise.try(function () {
return self._extractQueue(opts.direction);
})
.then(function () {
req = self._createRequest();
return req;
})
.then(function (req) {
return self._startRequest(req);
})
.then(function () {
return self._executeRequest(req, opts);
})
.then(function () {
return self._stopRequest();
});
}));
};
segmentedFetch.prototype.abort = function () {
var self = this;
return new Promise(function (resolve) {
self._setRequest();
if (self.searchPromise && 'abort' in self.searchPromise) {
self.searchPromise.abort();
}
resolve();
});
return self._stopRequest();
};
segmentedFetch.prototype._startRequest = function () {
segmentedFetch.prototype._startRequest = function (req) {
var self = this;
self.requestStats = {
took: 0,
@ -91,23 +85,10 @@ define(function (require) {
}
};
function initRequest() {
self._processDeferred = Promise.defer();
self.notifyEvent = notify.event(eventName);
}
return new Promise(function (resolve) {
// cause existing request to exit
if (self._processDeferred) {
self._setRequest();
self._processDeferred.promise.then(function () {
initRequest();
resolve();
});
} else {
initRequest();
resolve();
}
self._setRequest(req);
self.notifyEvent = notify.event(eventName);
resolve();
});
};
@ -117,7 +98,9 @@ define(function (require) {
return new Promise(function (resolve) {
self._setRequest();
self._clearNotification();
self._processDeferred.resolve();
if (self.searchPromise && 'abort' in self.searchPromise) {
self.searchPromise.abort();
}
resolve();
});
};
@ -183,19 +166,31 @@ define(function (require) {
};
segmentedFetch.prototype._executeSearch = function (index, state) {
this.searchPromise = es.search({
var resolve, reject;
this.searchPromise = new Promise(function (res, rej) {
resolve = res;
reject = rej;
});
var clientPromise = es.search({
index: index,
type: state.type,
ignoreUnavailable: true,
body: state.body
});
this.searchPromise.abort = function () {
clientPromise.abort();
resolve(false);
};
// don't throw ClusterBlockException errors
this.searchPromise.catch(function (err) {
clientPromise.then(resolve).catch(function (err) {
if (err.status === 403 && err.message.match(/ClusterBlockException.+index closed/)) {
return false;
resolve(false);
} else {
throw err;
reject(err);
}
});
@ -232,6 +227,11 @@ define(function (require) {
var self = this;
var index = self.queue.shift();
// abort if request changed (fetch is called twice quickly)
if (req !== self.activeRequest) {
return;
}
if (remainingSize !== false) {
state.body.size = remainingSize;
}
@ -243,11 +243,6 @@ define(function (require) {
return self._executeSearch(index, state)
.then(function (resp) {
// abort if request changed (fetch is called twice quickly)
if (req !== self.activeRequest) {
return;
}
// a response was swallowed intentionally. Try the next one
if (!resp) {
if (self.queue.length) return self._processQueue(req, state, remainingSize, loopCount);