Individual DataSource objects can be fetched, which follows the standard courier.fetch code path but only fetches that one doc.

Added the ability to specify a Courier ref that should be fetched.

DataSources now convert Client errors into RequestFailure errors that
provide more user friendly error messages.

Several places where _.each was used were converted to use .forEach for simpler debugging
This commit is contained in:
Spencer Alger 2014-03-12 10:10:01 -07:00
parent 7e8ce4a35c
commit 0f186e6dea
5 changed files with 101 additions and 60 deletions

View file

@ -204,9 +204,25 @@ define(function (require) {
}
};
// force a fetch of all datasources right now, optionally filter by type
Courier.prototype.fetch = function (onlyType) {
// force a fetch of all datasources right now
// optionally filter by type or pass a specific ref
Courier.prototype.fetch = function (refOrType) {
var courier = this;
var onlyType = (typeof refOrType === 'string' && refOrType);
var onlyRef = (typeof refOrType === 'object' && refOrType);
if (onlyRef) {
// only want to fetch one ref
sourceTypes[onlyRef.source._getType()].fetch(
courier,
[onlyRef],
function (err) {
if (err) return courier._error(err);
}
);
return;
}
_.forOwn(onFetch, function (fn, type) {
if (onlyType && onlyType !== type) return;
fn(courier);
@ -247,11 +263,16 @@ define(function (require) {
return this._client;
};
Courier.prototype._getRefFor = function (source) {
return _.find(this._refs[source._getType()], function (ref) {
return (ref.source === source);
});
};
// start using a DocSource in fetches/updates
Courier.prototype._openDataSource = function (source) {
var refs = this._refs[source._getType()];
if (!_.find(refs, { source: source })) {
refs.push({
if (!this._getRefFor(source)) {
this._refs[source._getType()].push({
source: source,
fetchCount: 0
});
@ -285,7 +306,7 @@ define(function (require) {
Courier.prototype._docUpdated = function (source) {
var updated = source._state;
_.each(this._refs.doc, function (ref) {
this._refs.doc.forEach(function (ref) {
var state = ref.source._state;
if (
state.id === updated.id

View file

@ -163,6 +163,14 @@ define(function (require) {
return JSON.stringify(this.toJSON());
};
/**
* Fetch just this source
* @param {Function} cb - callback
*/
DataSource.prototype.fetch = function (cb) {
this._courier.fetch(this._courier._getRefFor(this), cb);
};
/**
* Custom on method wraps event listeners before
* adding them so that $digest is properly setup

View file

@ -4,6 +4,7 @@ define(function (require) {
var nextTick = require('utils/next_tick');
var VersionConflict = require('courier/errors').VersionConflict;
var FetchFailure = require('courier/errors').FetchFailure;
var RequestFailure = require('courier/errors').RequestFailure;
var listenerCount = require('utils/event_emitter').listenerCount;
var _ = require('lodash');
@ -29,7 +30,7 @@ define(function (require) {
docs: []
};
_.each(refs, function (ref) {
refs.forEach(function (ref) {
var source = ref.source;
var state = source._flatten();
@ -45,7 +46,7 @@ define(function (require) {
return client.mget({ body: body }, function (err, resp) {
if (err) return cb(err);
_.each(resp.docs, function (resp, i) {
resp.docs.forEach(function (resp, i) {
var ref = allRefs[i];
var source = ref.source;
@ -125,41 +126,6 @@ define(function (require) {
return this._sendToEs('index', false, body, cb);
};
DocSource.prototype._sendToEs = function (method, validateVersion, body, cb) {
var source = this;
var courier = this._courier;
var client = courier._getClient();
var params = {
id: this._state.id,
type: this._state.type,
index: this._state.index,
body: body,
ignore: [409]
};
if (validateVersion) {
params.version = source._getVersion();
}
client[method](params, function (err, resp) {
if (err) return cb(err);
if (resp && resp.status === 409) {
err = new VersionConflict(resp);
if (listenerCount(source, 'conflict')) {
return source.emit('conflict', err);
} else {
return cb(err);
}
}
source._storeVersion(resp._version);
courier._docUpdated(source);
return cb(void 0, resp._id);
});
};
/*****
* PRIVATE API
*****/
@ -235,5 +201,47 @@ define(function (require) {
localStorage.removeItem(id);
};
/**
* Backend for doUpdate and doIndex
* @param {String} method - the client method to call
* @param {Boolean} validateVersion - should our knowledge
* of the the docs current version be sent to es?
* @param {String} body - HTTP request body
* @param {Function} cb - callback
*/
DocSource.prototype._sendToEs = function (method, validateVersion, body, cb) {
var source = this;
var courier = this._courier;
var client = courier._getClient();
var params = {
id: this._state.id,
type: this._state.type,
index: this._state.index,
body: body,
ignore: [409]
};
if (validateVersion) {
params.version = source._getVersion();
}
client[method](params, function (err, resp) {
if (err) return cb(new RequestFailure(err, resp));
if (resp && resp.status === 409) {
err = new VersionConflict(resp);
if (listenerCount(source, 'conflict')) {
return source.emit('conflict', err);
} else {
return cb(err);
}
}
source._storeVersion(resp._version);
courier._docUpdated(source);
if (typeof cb === 'function') return cb(void 0, resp._id);
});
};
return DocSource;
});

View file

@ -4,6 +4,7 @@ define(function (require) {
var nextTick = require('utils/next_tick');
var errors = require('courier/errors');
var FetchFailure = require('courier/errors').FetchFailure;
var RequestFailure = require('courier/errors').RequestFailure;
var _ = require('lodash');
function SearchSource(courier, initialState) {
@ -30,7 +31,7 @@ define(function (require) {
// so copy them here in the right order
var allRefs = [];
_.each(refs, function (ref) {
refs.forEach(function (ref) {
var source = ref.source;
var state = source._flatten();
if (!state) return;
@ -47,7 +48,7 @@ define(function (require) {
if (!allRefs.length) return nextTick(cb);
return client.msearch({ body: body }, function (err, resp) {
if (err) return cb(err);
if (err) return cb(new RequestFailure(err, resp));
_.each(resp.responses, function (resp, i) {
var source = allRefs[i].source;
@ -93,18 +94,6 @@ define(function (require) {
return this;
};
/**
* Fetch just this source
* @param {Function} cb - callback
*/
SearchSource.prototype.fetch = function (cb) {
var source = this;
var refs = this._courier._refs.search.filter(function (ref) {
return (ref.source === source);
});
SearchSource.fetch(this._courier, refs, cb);
};
/******
* PRIVATE APIS
******/

View file

@ -36,9 +36,24 @@ define(function (require) {
};
inherits(errors.HastyRefresh, CourierError);
/**
* Request Failure - When an entire mutli request fails
* @param {Error} err - the Error that came back
* @param {Object} resp - optional HTTP response
*/
errors.RequestFailure = function RequestFailure(err, resp) {
CourierError.call(this,
'Request to Elasticsearch failed: ' + JSON.stringify(resp || err.message),
errors.RequestFailure);
this.origError = err;
this.resp = resp;
};
inherits(errors.RequestFailure, CourierError);
/**
* FetchFailure Error - where there is an error getting a doc
* FetchFailure Error - when there is an error getting a doc or search within
* a multi-response response body
* @param {String} [msg] - An error message that will probably end up in a log.
*/
errors.FetchFailure = function FetchFailure(resp) {
@ -52,8 +67,8 @@ define(function (require) {
/**
* Connection Error
* @param {String} [msg] - An error message that will probably end up in a log.
* A doc was re-indexed but it was out of date.
* @param {Object} resp - The response from es (one of the multi-response responses).
*/
errors.VersionConflict = function VersionConflict(resp) {
CourierError.call(this,