mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 01:13:23 -04:00
Individual DataSource objects can be fetched, which follows the standard courier.fetch code path but only fetches that one doc.
Added the ability to specify a Courier ref that should be fetched. DataSources now convert Client errors into RequestFailure errors that provide more user friendly error messages. Several places where _.each was used were converted to use .forEach for simpler debugging
This commit is contained in:
parent
7ddd2666a6
commit
1e89a28581
5 changed files with 101 additions and 60 deletions
|
@ -204,9 +204,25 @@ define(function (require) {
|
|||
}
|
||||
};
|
||||
|
||||
// force a fetch of all datasources right now, optionally filter by type
|
||||
Courier.prototype.fetch = function (onlyType) {
|
||||
// force a fetch of all datasources right now
|
||||
// optionally filter by type or pass a specific ref
|
||||
Courier.prototype.fetch = function (refOrType) {
|
||||
var courier = this;
|
||||
var onlyType = (typeof refOrType === 'string' && refOrType);
|
||||
var onlyRef = (typeof refOrType === 'object' && refOrType);
|
||||
|
||||
if (onlyRef) {
|
||||
// only want to fetch one ref
|
||||
sourceTypes[onlyRef.source._getType()].fetch(
|
||||
courier,
|
||||
[onlyRef],
|
||||
function (err) {
|
||||
if (err) return courier._error(err);
|
||||
}
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
_.forOwn(onFetch, function (fn, type) {
|
||||
if (onlyType && onlyType !== type) return;
|
||||
fn(courier);
|
||||
|
@ -247,11 +263,16 @@ define(function (require) {
|
|||
return this._client;
|
||||
};
|
||||
|
||||
Courier.prototype._getRefFor = function (source) {
|
||||
return _.find(this._refs[source._getType()], function (ref) {
|
||||
return (ref.source === source);
|
||||
});
|
||||
};
|
||||
|
||||
// start using a DocSource in fetches/updates
|
||||
Courier.prototype._openDataSource = function (source) {
|
||||
var refs = this._refs[source._getType()];
|
||||
if (!_.find(refs, { source: source })) {
|
||||
refs.push({
|
||||
if (!this._getRefFor(source)) {
|
||||
this._refs[source._getType()].push({
|
||||
source: source,
|
||||
fetchCount: 0
|
||||
});
|
||||
|
@ -285,7 +306,7 @@ define(function (require) {
|
|||
Courier.prototype._docUpdated = function (source) {
|
||||
var updated = source._state;
|
||||
|
||||
_.each(this._refs.doc, function (ref) {
|
||||
this._refs.doc.forEach(function (ref) {
|
||||
var state = ref.source._state;
|
||||
if (
|
||||
state.id === updated.id
|
||||
|
|
|
@ -163,6 +163,14 @@ define(function (require) {
|
|||
return JSON.stringify(this.toJSON());
|
||||
};
|
||||
|
||||
/**
|
||||
* Fetch just this source
|
||||
* @param {Function} cb - callback
|
||||
*/
|
||||
DataSource.prototype.fetch = function (cb) {
|
||||
this._courier.fetch(this._courier._getRefFor(this), cb);
|
||||
};
|
||||
|
||||
/**
|
||||
* Custom on method wraps event listeners before
|
||||
* adding them so that $digest is properly setup
|
||||
|
|
|
@ -4,6 +4,7 @@ define(function (require) {
|
|||
var nextTick = require('utils/next_tick');
|
||||
var VersionConflict = require('courier/errors').VersionConflict;
|
||||
var FetchFailure = require('courier/errors').FetchFailure;
|
||||
var RequestFailure = require('courier/errors').RequestFailure;
|
||||
var listenerCount = require('utils/event_emitter').listenerCount;
|
||||
var _ = require('lodash');
|
||||
|
||||
|
@ -29,7 +30,7 @@ define(function (require) {
|
|||
docs: []
|
||||
};
|
||||
|
||||
_.each(refs, function (ref) {
|
||||
refs.forEach(function (ref) {
|
||||
var source = ref.source;
|
||||
|
||||
var state = source._flatten();
|
||||
|
@ -45,7 +46,7 @@ define(function (require) {
|
|||
return client.mget({ body: body }, function (err, resp) {
|
||||
if (err) return cb(err);
|
||||
|
||||
_.each(resp.docs, function (resp, i) {
|
||||
resp.docs.forEach(function (resp, i) {
|
||||
var ref = allRefs[i];
|
||||
var source = ref.source;
|
||||
|
||||
|
@ -125,41 +126,6 @@ define(function (require) {
|
|||
return this._sendToEs('index', false, body, cb);
|
||||
};
|
||||
|
||||
DocSource.prototype._sendToEs = function (method, validateVersion, body, cb) {
|
||||
var source = this;
|
||||
var courier = this._courier;
|
||||
var client = courier._getClient();
|
||||
var params = {
|
||||
id: this._state.id,
|
||||
type: this._state.type,
|
||||
index: this._state.index,
|
||||
body: body,
|
||||
ignore: [409]
|
||||
};
|
||||
|
||||
if (validateVersion) {
|
||||
params.version = source._getVersion();
|
||||
}
|
||||
|
||||
client[method](params, function (err, resp) {
|
||||
if (err) return cb(err);
|
||||
|
||||
if (resp && resp.status === 409) {
|
||||
err = new VersionConflict(resp);
|
||||
if (listenerCount(source, 'conflict')) {
|
||||
return source.emit('conflict', err);
|
||||
} else {
|
||||
return cb(err);
|
||||
}
|
||||
}
|
||||
|
||||
source._storeVersion(resp._version);
|
||||
courier._docUpdated(source);
|
||||
return cb(void 0, resp._id);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
/*****
|
||||
* PRIVATE API
|
||||
*****/
|
||||
|
@ -235,5 +201,47 @@ define(function (require) {
|
|||
localStorage.removeItem(id);
|
||||
};
|
||||
|
||||
/**
|
||||
* Backend for doUpdate and doIndex
|
||||
* @param {String} method - the client method to call
|
||||
* @param {Boolean} validateVersion - should our knowledge
|
||||
* of the the docs current version be sent to es?
|
||||
* @param {String} body - HTTP request body
|
||||
* @param {Function} cb - callback
|
||||
*/
|
||||
DocSource.prototype._sendToEs = function (method, validateVersion, body, cb) {
|
||||
var source = this;
|
||||
var courier = this._courier;
|
||||
var client = courier._getClient();
|
||||
var params = {
|
||||
id: this._state.id,
|
||||
type: this._state.type,
|
||||
index: this._state.index,
|
||||
body: body,
|
||||
ignore: [409]
|
||||
};
|
||||
|
||||
if (validateVersion) {
|
||||
params.version = source._getVersion();
|
||||
}
|
||||
|
||||
client[method](params, function (err, resp) {
|
||||
if (err) return cb(new RequestFailure(err, resp));
|
||||
|
||||
if (resp && resp.status === 409) {
|
||||
err = new VersionConflict(resp);
|
||||
if (listenerCount(source, 'conflict')) {
|
||||
return source.emit('conflict', err);
|
||||
} else {
|
||||
return cb(err);
|
||||
}
|
||||
}
|
||||
|
||||
source._storeVersion(resp._version);
|
||||
courier._docUpdated(source);
|
||||
if (typeof cb === 'function') return cb(void 0, resp._id);
|
||||
});
|
||||
};
|
||||
|
||||
return DocSource;
|
||||
});
|
|
@ -4,6 +4,7 @@ define(function (require) {
|
|||
var nextTick = require('utils/next_tick');
|
||||
var errors = require('courier/errors');
|
||||
var FetchFailure = require('courier/errors').FetchFailure;
|
||||
var RequestFailure = require('courier/errors').RequestFailure;
|
||||
var _ = require('lodash');
|
||||
|
||||
function SearchSource(courier, initialState) {
|
||||
|
@ -30,7 +31,7 @@ define(function (require) {
|
|||
// so copy them here in the right order
|
||||
var allRefs = [];
|
||||
|
||||
_.each(refs, function (ref) {
|
||||
refs.forEach(function (ref) {
|
||||
var source = ref.source;
|
||||
var state = source._flatten();
|
||||
if (!state) return;
|
||||
|
@ -47,7 +48,7 @@ define(function (require) {
|
|||
if (!allRefs.length) return nextTick(cb);
|
||||
|
||||
return client.msearch({ body: body }, function (err, resp) {
|
||||
if (err) return cb(err);
|
||||
if (err) return cb(new RequestFailure(err, resp));
|
||||
|
||||
_.each(resp.responses, function (resp, i) {
|
||||
var source = allRefs[i].source;
|
||||
|
@ -93,18 +94,6 @@ define(function (require) {
|
|||
return this;
|
||||
};
|
||||
|
||||
/**
|
||||
* Fetch just this source
|
||||
* @param {Function} cb - callback
|
||||
*/
|
||||
SearchSource.prototype.fetch = function (cb) {
|
||||
var source = this;
|
||||
var refs = this._courier._refs.search.filter(function (ref) {
|
||||
return (ref.source === source);
|
||||
});
|
||||
SearchSource.fetch(this._courier, refs, cb);
|
||||
};
|
||||
|
||||
/******
|
||||
* PRIVATE APIS
|
||||
******/
|
||||
|
|
|
@ -36,9 +36,24 @@ define(function (require) {
|
|||
};
|
||||
inherits(errors.HastyRefresh, CourierError);
|
||||
|
||||
/**
|
||||
* Request Failure - When an entire mutli request fails
|
||||
* @param {Error} err - the Error that came back
|
||||
* @param {Object} resp - optional HTTP response
|
||||
*/
|
||||
errors.RequestFailure = function RequestFailure(err, resp) {
|
||||
CourierError.call(this,
|
||||
'Request to Elasticsearch failed: ' + JSON.stringify(resp || err.message),
|
||||
errors.RequestFailure);
|
||||
|
||||
this.origError = err;
|
||||
this.resp = resp;
|
||||
};
|
||||
inherits(errors.RequestFailure, CourierError);
|
||||
|
||||
/**
|
||||
* FetchFailure Error - where there is an error getting a doc
|
||||
* FetchFailure Error - when there is an error getting a doc or search within
|
||||
* a multi-response response body
|
||||
* @param {String} [msg] - An error message that will probably end up in a log.
|
||||
*/
|
||||
errors.FetchFailure = function FetchFailure(resp) {
|
||||
|
@ -52,8 +67,8 @@ define(function (require) {
|
|||
|
||||
|
||||
/**
|
||||
* Connection Error
|
||||
* @param {String} [msg] - An error message that will probably end up in a log.
|
||||
* A doc was re-indexed but it was out of date.
|
||||
* @param {Object} resp - The response from es (one of the multi-response responses).
|
||||
*/
|
||||
errors.VersionConflict = function VersionConflict(resp) {
|
||||
CourierError.call(this,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue