several subtle adjustments made to the courier, and a couple to the mapper, as lessons were learned

This commit is contained in:
Spencer Alger 2014-02-27 12:01:29 -07:00
parent c4c14f15f2
commit 0dc5159cd8
6 changed files with 202 additions and 125 deletions

View file

@ -147,13 +147,16 @@ define(function (require) {
* PUBLIC API
*/
// start fetching results on an interval
// start fetching results on an interval, restart if already started
Courier.prototype.start = function () {
if (!this.running()) {
this._schedule('doc');
this._schedule('search');
this.fetch();
if (this.running()) {
this.stop();
}
this._schedule('doc');
this._schedule('search');
this.fetch();
return this;
};
@ -193,7 +196,7 @@ define(function (require) {
var courier = this;
_.forOwn(onFetch, function (fn, type) {
if (onlyType && onlyType !== type) return;
if (courier._refs[type].length) fn(courier);
fn(courier);
courier._refs[type].forEach(function (ref) {
ref.fetchCount ++;
});

View file

@ -3,6 +3,7 @@ define(function (require) {
var _ = require('lodash');
var EventEmitter = require('utils/event_emitter');
var Mapper = require('courier/mapper');
var nextTick = require('utils/next_tick');
function DataSource(courier, initialState) {
var state;
@ -23,25 +24,34 @@ define(function (require) {
this._state = state;
this._courier = courier;
var onNewListener = _.bind(function (name) {
// new newListener is emitted before it is added, count will be 0
if (name !== 'results' || EventEmitter.listenerCount(this, 'results') !== 0) return;
courier._openDataSource(this);
this.removeListener('newListener', onNewListener);
this.on('removeListener', onRemoveListener);
}, this);
this.on('newListener', function (name, listener) {
if (name !== 'results') return;
var onRemoveListener = _.bind(function () {
if (this._previousResult) {
// always call the listener async, to match normal "emit" behavior
var result = this._previousResult;
var self = this;
nextTick(function () {
listener.call(self, result);
});
}
// newListener is emitted before the listener is actually is added,
// so count should be 0 if this is the first
if (EventEmitter.listenerCount(this, 'results') === 0) {
courier._openDataSource(this);
}
});
this.on('removeListener', function onRemoveListener() {
if (EventEmitter.listenerCount(this, 'results') > 0) return;
courier._closeDataSource(this);
this.removeListener('removeListener', onRemoveListener);
this.on('newListener', onNewListener);
}, this);
this.on('newListener', onNewListener);
});
this.extend = function () {
return courier.createSource(this._getType()).inherits(this);
return courier
.createSource(this._getType())
.inherits(this);
};
this.courier = function (newCourier) {
@ -52,7 +62,12 @@ define(function (require) {
// get/set internal state values
this._methods.forEach(function (name) {
this[name] = function (val) {
state[name] = val;
if (val == null) {
delete state[name];
} else {
state[name] = val;
}
return this;
};
}, this);
@ -71,22 +86,72 @@ define(function (require) {
return this._state[name];
};
/**
* Change the entire state of a DataSource
* @param {object|string} state - The DataSource's new state, or a
* string of the state value to set
*/
DataSource.prototype.set = function (state, val) {
if (typeof state === 'string') {
return this[state](val);
}
this._state = state;
return this;
};
/**
* Clear the disabled flag, you do not need to call this unless you
* explicitly disabled the DataSource
*/
DataSource.prototype.enableFetch = function () {
delete this._fetchDisabled;
return this;
};
/**
* Disable the DataSource, preventing it or any of it's children from being searched
*/
DataSource.prototype.disableFetch = function () {
this._fetchDisabled = true;
return this;
};
/**
* Attach a scope to this DataSource so that callbacks and event listeners
* can properly trigger it's $digest cycles
* @param {AngularScope} $scope
* @return {this} - chainable
*/
DataSource.prototype.$scope = function ($scope) {
this._$scope = $scope;
return this;
};
/**
* fetch the field names for this DataSource
* @param {Function} cb
* @callback {Error, Array} - calls cb with a possible error or an array of field names
* @todo
*/
DataSource.prototype.getFields = function (cb) {
this._courier._mapper.getFields(this, this._wrapcb(cb));
};
/**
* flatten an object to a simple encodable object
* clear the field list cache
* @param {Function} cb
* @callback {Error, Array} - calls cb with a possible error
*/
DataSource.prototype.clearFieldCache = function (cb) {
this._courier._mapper.clearCache(this, this._wrapcb(cb));
};
/**
* return a simple, encodable object representing the state of the DataSource
* @return {[type]} [description]
*/
DataSource.prototype.toJSON = function () {
return _.omit(this._state, 'inherits');
return _.clone(this._state);
};
/**
@ -98,49 +163,22 @@ define(function (require) {
};
/**
* Set the $scope for a datasource, when a datasource is bound
* to a scope, it's event listeners will be wrapped in a call to that
* scope's $apply method (safely).
* Custom on method wraps event listeners before
* adding them so that $digest is properly setup
*
* This also binds the DataSource to the lifetime of the scope: when the scope
* is destroyed, the datasource is closed
*
* @param {AngularScope} $scope - the scope where the event emitter "occurs",
* helps angular determine where to start checking for changes
* @return {this} - chainable
* @param {string} event - the name of the event to listen for
* @param {function} listener - the function to call when the event is emitted
*/
DataSource.prototype.$scope = function ($scope) {
var courier = this;
DataSource.prototype.on = function (event, listener) {
var wrapped = this._wrapcb(listener);
if (courier._$scope) {
// simply change the scope that callbacks will point to
courier._$scope = $scope;
return this;
}
// set .listener so that this specific one can
// be removed by .removeListener()
wrapped.listener = listener;
courier._$scope = $scope;
// wrap the 'on' method so that all listeners
// can be wrapped in calls to $scope.$apply
var origOn = courier.on;
courier.on = function (event, listener) {
var wrapped = courier._wrapcb(listener);
// set .listener so that it can be removed by
// .removeListener() using the original function
wrapped.listener = listener;
return origOn.call(courier, event, wrapped);
};
// make sure the alias is still set
courier.addListener = courier.on;
courier.on.restore = function () {
delete courier._$scope;
courier.on = courier.addListener = origOn;
};
return this;
return EventEmitter.prototype.on.call(this, event, wrapped);
};
DataSource.prototype.addListener = DataSource.prototype.on;
/*****
* PRIVATE API
@ -174,12 +212,15 @@ define(function (require) {
// walk the chain and merge each property
var current = this;
var currentState;
while (current) {
currentState = current._state;
_.forOwn(currentState, collectProp);
current = currentState.inherits;
// stop processing if this or one of it's parents is disabled
if (current._fetchDisabled) return;
// merge the properties from the state into the flattened copy
_.forOwn(current._state, collectProp);
// move to this sources parent
current = current._parent;
}
current = null;
if (type === 'search') {
// defaults for the query
@ -210,19 +251,27 @@ define(function (require) {
return flatState;
};
/**
* Wrap a function in $scope.$apply or $scope.$eval for the Source's
* current $scope
*
* @param {Function} cb - the function to wrap
* @return {Function} - the wrapped function
*/
DataSource.prototype._wrapcb = function (cb) {
var courier = this;
var source = this;
cb = (typeof cb !== 'function') ? _.noop : cb;
var wrapped = function () {
var args = arguments;
// always use the stored ref so that it can be updated if needed
var $scope = courier._$scope;
var $scope = source._$scope;
// don't fall apart if we don't have a scope
if (!$scope) return cb.apply(courier, args);
if (!$scope) return cb.apply(source, args);
// use angular's $apply or $eval functions for the given scope
$scope[$scope.$$phase ? '$eval' : '$apply'](function () {
cb.apply(courier, args);
cb.apply(source, args);
});
};
return wrapped;

View file

@ -25,19 +25,24 @@ define(function (require) {
DocSource.fetch = function (courier, refs, cb) {
var client = courier._getClient();
var allRefs = [];
var getBody = {
var body = {
docs: []
};
_.each(refs, function (ref) {
var source = ref.source;
if (source._getType() !== 'doc') return;
var state = source._flatten();
if (!state || !state._id) return;
allRefs.push(ref);
getBody.docs.push(source._flatten());
body.docs.push(state);
});
return client.mget({ body: getBody })
// always callback asynchronously
if (!allRefs.length) return nextTick(cb);
return client.mget({ body: body })
.then(function (resp) {
_.each(resp.docs, function (resp, i) {
var ref = allRefs[i];
@ -52,6 +57,7 @@ define(function (require) {
ref.version = void 0;
source._clearVersion();
}
source._previousResult = resp;
source.emit('results', resp);
});
@ -105,25 +111,8 @@ define(function (require) {
* @return {undefined}
*/
DocSource.prototype.doUpdate = function (fields, cb) {
var source = this;
var courier = this._courier;
var client = courier._getClient();
var state = this._state;
client.update({
id: state.id,
type: state.type,
index: state.index,
version: source._getVersion(),
body: {
doc: fields
}
}, function (err, resp) {
if (err) return cb(err);
courier._docUpdated(source);
return cb();
});
if (!this._state.id) return this.doIndex(fields, cb);
return this._sendToEs('update', true, { doc: fields }, cb);
};
/**
@ -133,18 +122,26 @@ define(function (require) {
* @return {[type]} [description]
*/
DocSource.prototype.doIndex = function (body, cb) {
return this._sendToEs('index', false, body, cb);
};
DocSource.prototype._sendToEs = function (method, validateVersion, body, cb) {
var source = this;
var courier = this._courier;
var client = courier._getClient();
var state = this._state;
client.index({
id: state.id,
type: state.type,
index: state.index,
var params = {
id: this._state.id,
type: this._state.type,
index: this._state.index,
body: body,
ignore: [409]
}, function (err, resp) {
};
if (validateVersion) {
params.version = source._getVersion();
}
client[method](params, function (err, resp) {
if (err) return cb(err);
if (resp && resp.status === 409) {
@ -158,10 +155,11 @@ define(function (require) {
source._storeVersion(resp._version);
courier._docUpdated(source);
return cb();
return cb(void 0, resp._id);
});
};
/*****
* PRIVATE API
*****/
@ -223,18 +221,19 @@ define(function (require) {
* @return {undefined}
*/
DocSource.prototype._storeVersion = function (version) {
if (!version) return this._clearVersion();
var id = this._versionKey();
if (version) {
localStorage.setItem(id, version);
} else {
localStorage.removeItem(id);
}
localStorage.setItem(id, version);
};
/**
* Clears the stored version for a DocSource
*/
DocSource.prototype._clearVersion = DocSource.prototype._storeVersion;
DocSource.prototype._clearVersion = function () {
var id = this._versionKey();
localStorage.removeItem(id);
};
return DocSource;
});

View file

@ -1,6 +1,7 @@
define(function (require) {
var DataSource = require('courier/data_source/data_source');
var inherits = require('utils/inherits');
var nextTick = require('utils/next_tick');
var errors = require('courier/errors');
var FetchFailure = require('courier/errors').FetchFailure;
var _ = require('lodash');
@ -22,17 +23,19 @@ define(function (require) {
*/
SearchSource.fetch = function (courier, refs, cb) {
var client = courier._getClient();
var allRefs = [];
var body = '';
cb = cb || _.noop;
// we must have refs in the same order to process the response
// so copy them here in the right order
var allRefs = [];
_.each(refs, function (ref) {
var source = ref.source;
if (source._getType() !== 'search') {
return;
}
allRefs.push(source);
var state = source._flatten();
if (!state) return;
allRefs.push(ref);
body +=
JSON.stringify({ index: state.index, type: state.type })
+ '\n'
@ -40,11 +43,14 @@ define(function (require) {
+ '\n';
});
// always callback async
if (!allRefs.length) return nextTick(cb);
return client.msearch({ body: body }, function (err, resp) {
if (err) return cb(err);
_.each(resp.responses, function (resp, i) {
var source = allRefs[i];
var source = allRefs[i].source;
if (resp.error) return source._error(new FetchFailure(resp));
source.emit('results', resp);
});
@ -74,10 +80,31 @@ define(function (require) {
'aggs',
'from',
'size',
'source',
'inherits'
'source'
];
/**
* Set a searchSource that this source should inherit from
* @param {SearchSource} searchSource - the parent searchSource
* @return {this} - chainable
*/
SearchSource.prototype.inherits = function (parent) {
this._parent = parent;
return this;
};
/**
* Fetch just this source
* @param {Function} cb - callback
*/
SearchSource.prototype.fetch = function (cb) {
var source = this;
var refs = this._courier._refs.search.filter(function (ref) {
return (ref.source === source);
});
SearchSource.fetch(this._courier, refs, cb);
};
/******
* PRIVATE APIS
******/
@ -101,10 +128,6 @@ define(function (require) {
*/
SearchSource.prototype._mergeProp = function (state, val, key) {
switch (key) {
case 'inherits':
case '_type':
// ignore
return;
case 'filter':
state.filters = state.filters || [];
state.filters.push(val);

View file

@ -38,7 +38,7 @@ define(function (require) {
this.getFields = function (dataSource, callback) {
if (self.getFieldsFromObject(dataSource)) {
// If we already have the fields in our object, use that, but
// make sure we stay async
// make sure we stay async and
nextTick(callback, void 0, self.getFieldsFromObject(dataSource));
} else {
// Otherwise, try to get fields from Elasticsearch cache
@ -99,7 +99,8 @@ define(function (require) {
* @return {Object} An object containing fields with their mappings, or false if not found.
*/
this.getFieldsFromObject = function (dataSource) {
return !_.isUndefined(mappings[dataSource._state.index]) ? mappings[dataSource._state.index] : false;
// don't pass pack our reference to truth, clone it
return !_.isUndefined(mappings[dataSource._state.index]) ? _.clone(mappings[dataSource._state.index]) : false;
};
/**
@ -176,7 +177,7 @@ define(function (require) {
* @param {Function} callback A function to be executed with the results.
*/
var cacheFieldsToObject = function (dataSource, fields) {
mappings[dataSource._state.index] = _.clone(fields);
mappings[dataSource._state.index] = fields;
return !_.isUndefined(mappings[dataSource._state.index]) ? true : false;
};

View file

@ -10,7 +10,7 @@ define(function (require) {
var courier; // share the courier amoungst all of the apps
angular.module('kibana/services')
.service('courier', function (es, promises) {
.service('courier', function (es, $rootScope, promises) {
if (courier) return courier;
promises.playNice(DocSource.prototype, [
@ -26,7 +26,9 @@ define(function (require) {
courier.errors = errors;
courier.rootSearchSource = courier.createSource('search');
courier.rootSearchSource = courier
.createSource('search')
.$scope($rootScope);
return courier;
});