bootstrap is now a part of the route resolution process, dataSources use promises to respond to all requests, and the courier is completly within the angular ecosystem

This commit is contained in:
Spencer Alger 2014-04-07 12:01:19 -07:00
parent 3eb84d1866
commit 18f88b83c2
49 changed files with 1941 additions and 2212 deletions

View file

@ -1,336 +0,0 @@
define(function (require) {
var EventEmitter = require('utils/event_emitter');
var inherits = require('utils/inherits');
var errors = require('courier/errors');
var _ = require('lodash');
var angular = require('angular');
var DocSource = require('courier/data_source/doc');
var SearchSource = require('courier/data_source/search');
var HastyRefresh = require('courier/errors').HastyRefresh;
var nextTick = require('utils/next_tick');
var Mapper = require('courier/mapper');
// map constructors to type keywords
var sourceTypes = {
doc: DocSource,
search: SearchSource
};
// fetch process for the two source types
var onFetch = {
// execute a search right now
search: function (courier) {
if (courier._activeSearchRequest) {
// ensure that this happens async, otherwise listeners
// might miss error events
return nextTick(function () {
courier._error(new HastyRefresh());
});
}
var aborter = SearchSource.fetch(
courier,
courier._refs.search,
function (err) {
if (err) return courier._error(err);
courier._activeSearchRequest = null;
}
);
// if there was an error, this might not be set, but
// we should still pretend for now that there is an active
// request to make behavior predictable
courier._activeSearchRequest = aborter || true;
},
// validate that all of the DocSource objects are up to date
// then fetch the onces that are not
doc: function (courier) {
DocSource.validate(courier, courier._refs.doc, function (err, invalid) {
if (err) return courier._error(err);
// if all of the docs are up to date we don't need to do anything else
if (invalid.length === 0) return;
DocSource.fetch(courier, invalid, function (err) {
if (err) return courier._error(err);
});
});
}
};
// default config values
var defaults = {
fetchInterval: 30000,
docInterval: 1500,
internalIndex: 'kibana4-int',
mapperCacheType: 'mappings'
};
/**
* Federated query service, supports two data source types: doc and search.
*
* search:
* - inherits filters, and other query properties
* - automatically emit results on a set interval
*
* doc:
* - tracks doc versions
* - emits same results event when the doc is updated
* - helps seperate versions of kibana running on the same machine stay in sync
* - tracks version and uses it to verify that updates are safe to make
* - emits conflict event when that happens
*
* @param {object} config
* @param {Client} config.client - The elasticsearch.js client to use for querying. Should be
* setup and ready to go.
* @param {EsClient} [config.client] - The elasticsearch client that the courier should use
* (can be set at a later time with the `.client()` method)
* @param {integer} [config.fetchInterval=30000] - The amount in ms between each fetch (deafult
* is 30 seconds)
*/
function Courier(config) {
if (!(this instanceof Courier)) return new Courier(config);
config = _.defaults(config || {}, defaults);
this._client = config.client;
// array's to store references to individual sources of each type
// wrapped in some metadata
this._refs = _.transform(sourceTypes, function (refs, fn, type) {
refs[type] = [];
});
// stores all timer ids
this._timer = {};
// interval times for each type
this._interval = {};
// interval hook/fn for each type
this._onInterval = {};
// make the mapper accessable
this._mapper = new Mapper(this, {
cacheIndex: config.internalIndex,
cacheType: config.mapperCacheType
});
_.each(sourceTypes, function (fn, type) {
var courier = this;
// the name used outside of this module
var publicName;
if (type === 'search') {
publicName = 'fetchInterval';
} else {
publicName = type + 'Interval';
}
// store the config value passed in for this interval
this._interval[type] = config[publicName];
// store a quick "bound" method for triggering
this._onInterval[type] = function () {
courier.fetch(type);
courier._schedule(type);
};
// create a public getter/setter for this interval type
this[publicName] = function (val) {
// getter
if (val === void 0) return courier._interval[type];
// setter
courier._interval[type] = val;
if (courier.running()) courier._schedule(type);
return this;
};
}, this);
}
inherits(Courier, EventEmitter);
/**
* PUBLIC API
*/
// start fetching results on an interval, restart if already started
Courier.prototype.start = function () {
if (this.running()) {
this.stop();
}
this._schedule('doc');
this._schedule('search');
this.fetch();
return this;
};
// is the courier currently running?
Courier.prototype.running = function () {
return !!_.size(this._timer);
};
// stop the courier from fetching more results
Courier.prototype.stop = function () {
this._clearScheduled('search');
this._clearScheduled('doc');
};
// close the courier, stopping it from refreshing and
// closing all of the sources
Courier.prototype.close = function () {
_.each(sourceTypes, function (fn, type) {
this._refs[type].forEach(function (ref) {
this._closeDataSource(ref.source);
}, this);
}, this);
};
// be default, the courier will throw an error if a fetch
// occurs before a previous fetch finishes. To prevent this, you
// should call abort before calling .fetch()
Courier.prototype.abort = function () {
if (this._activeSearchRequest) {
// the .fetch method might not return a
if (typeof this._activeSearchRequest.abort === 'function') {
this._activeSearchRequest.abort();
}
this._activeSearchRequest = null;
}
};
// force a fetch of all datasources right now
// optionally filter by type or pass a specific ref
Courier.prototype.fetch = function (refOrType) {
var courier = this;
var onlyType = (typeof refOrType === 'string' && refOrType);
var onlyRef = (typeof refOrType === 'object' && refOrType);
if (onlyRef) {
// only want to fetch one ref
sourceTypes[onlyRef.source._getType()].fetch(
courier,
[onlyRef],
function (err) {
if (err) return courier._error(err);
}
);
return;
}
_.forOwn(onFetch, function (fn, type) {
if (onlyType && onlyType !== type) return;
fn(courier);
courier._refs[type].forEach(function (ref) {
ref.fetchCount ++;
});
});
};
// data source factory
Courier.prototype.createSource = function (type, initialState) {
type = type || 'search';
if ('function' !== typeof sourceTypes[type]) throw new TypeError(
'Invalid source type ' + type
);
var Constructor = sourceTypes[type];
return new Constructor(this, initialState);
};
/*****
* PRIVATE API
*****/
// handle errors in a standard way. The only errors that should make it here are
// - issues with msearch/mget syntax
// - unable to reach ES
// - HastyRefresh
Courier.prototype._error = function (err) {
this.stop();
return this.emit('error', err);
};
// every time a child object (DataSource, Mapper) needs the client, it should
// call _getClient
Courier.prototype._getClient = function () {
if (!this._client) throw new Error('Client is not set on the Courier yet.');
return this._client;
};
Courier.prototype._getRefFor = function (source) {
return _.find(this._refs[source._getType()], function (ref) {
return (ref.source === source);
});
};
// start using a DocSource in fetches/updates
Courier.prototype._openDataSource = function (source) {
if (!this._getRefFor(source)) {
this._refs[source._getType()].push({
source: source,
fetchCount: 0
});
}
};
// stop using a DataSource in fetches/updates
Courier.prototype._closeDataSource = function (source) {
var type = source._getType();
var refs = this._refs[type];
_(refs).where({ source: source }).each(_.partial(_.pull, refs));
if (refs.length === 0) this._clearScheduled(type);
};
// schedule a fetch after fetchInterval
Courier.prototype._schedule = function (type) {
this._clearScheduled(type);
if (this._interval[type]) {
this._timer[type] = setTimeout(this._onInterval[type], this._interval[type]);
}
};
// properly clear scheduled fetches
Courier.prototype._clearScheduled = function (type) {
clearTimeout(this._timer[type]);
delete this._timer[type];
};
// alert the courior that a doc has been updated
// and that it should update matching docs
Courier.prototype._docUpdated = function (source) {
var updated = source._state;
this._refs.doc.forEach(function (ref) {
var state = ref.source._state;
if (
state.id === updated.id
&& state.type === updated.type
&& state.index === updated.index
) {
delete ref.version;
}
});
this.fetch('doc');
};
// get the list of open data source objects
// primarily for testing purposes
Courier.prototype._openSources = function (type) {
if (!type) {
return _.transform(this._refs, function (open, refs) {
[].push.apply(open, refs);
}, []);
}
return this._refs[type] || [];
};
return Courier;
});

View file

@ -1,298 +0,0 @@
define(function (require) {
var inherits = require('utils/inherits');
var _ = require('lodash');
var EventEmitter = require('utils/event_emitter');
var Mapper = require('courier/mapper');
var nextTick = require('utils/next_tick');
function DataSource(courier, initialState) {
EventEmitter.call(this);
this._state = (function () {
// state can be serialized as JSON, and passed back in to restore
if (initialState) {
if (typeof initialState === 'string') {
return JSON.parse(initialState);
} else {
return _.cloneDeep(initialState);
}
} else {
return {};
}
}());
this._dynamicState = this._dynamicState || {};
this._courier = courier;
// before newListener to prevent unnecessary "emit" when added
this.on('removeListener', function onRemoveListener() {
if (EventEmitter.listenerCount(this, 'results') > 0) return;
courier._closeDataSource(this);
});
this.on('newListener', function (name, listener) {
if (name !== 'results') return;
if (this._previousResult) {
// always call the listener async, to match normal "emit" behavior
var result = this._previousResult;
var self = this;
nextTick(function () {
listener.call(self, result);
});
}
// newListener is emitted before the listener is actually is added,
// so count should be 0 if this is the first
if (EventEmitter.listenerCount(this, 'results') === 0) {
courier._openDataSource(this);
}
});
this.extend = function () {
return courier
.createSource(this._getType())
.inherits(this);
};
this.courier = function (newCourier) {
courier = this._courier = newCourier;
return this;
};
// get/set internal state values
this._methods.forEach(function (name) {
this[name] = function (val) {
if (val == null) {
delete this._state[name];
} else {
this._state[name] = val;
}
return this;
};
}, this);
}
inherits(DataSource, EventEmitter);
/*****
* PUBLIC API
*****/
/**
* Get values from the state
* @param {string} name - The name of the property desired
*/
DataSource.prototype.get = function (name) {
var current = this;
while (current) {
if (current._state[name] !== void 0) return current._state[name];
if (current._dynamicState[name] !== void 0) return current._dynamicState[name]();
current = current._parent;
}
};
/**
* Change the entire state of a DataSource
* @param {object|string} state - The DataSource's new state, or a
* string of the state value to set
*/
DataSource.prototype.set = function (state, val) {
if (typeof state === 'string') {
return this[state](val);
}
this._state = state;
return this;
};
/**
* Clear the disabled flag, you do not need to call this unless you
* explicitly disabled the DataSource
*/
DataSource.prototype.enableAuthFetch = function () {
delete this._fetchDisabled;
return this;
};
/**
* Disable the DataSource, preventing it or any of it's children from being searched
*/
DataSource.prototype.disableAutoFetch = function () {
this._fetchDisabled = true;
return this;
};
/**
* Attach a scope to this DataSource so that callbacks and event listeners
* can properly trigger it's $digest cycles
* @param {AngularScope} $scope
* @return {this} - chainable
*/
DataSource.prototype.$scope = function ($scope) {
this._$scope = $scope;
return this;
};
/**
* fetch the field names for this DataSource
* @param {Function} cb
* @callback {Error, Array} - calls cb with a possible error or an array of field names
*/
DataSource.prototype.getFields = function (cb) {
if (!this.get('index')) {
// Without an index there is nothing to do here.
return nextTick(cb);
}
this._courier._mapper.getFields(this, this._wrapcb(cb));
};
/**
* clear the field list cache
* @param {Function} cb
* @callback {Error, Array} - calls cb with a possible error
*/
DataSource.prototype.clearFieldCache = function (cb) {
this._courier._mapper.clearCache(this, this._wrapcb(cb));
};
/**
* return a simple, encodable object representing the state of the DataSource
* @return {[type]} [description]
*/
DataSource.prototype.toJSON = function () {
return _.clone(this._state);
};
/**
* Create a string representation of the object
* @return {[type]} [description]
*/
DataSource.prototype.toString = function () {
return JSON.stringify(this.toJSON());
};
/**
* Fetch just this source
* @param {Function} cb - callback
*/
DataSource.prototype.fetch = function (cb) {
this._courier.fetch(this._courier._getRefFor(this), cb);
};
/**
* Custom on method wraps event listeners before
* adding them so that $digest is properly setup
*
* @param {string} event - the name of the event to listen for
* @param {function} listener - the function to call when the event is emitted
*/
DataSource.prototype.on = function (event, listener) {
var wrapped = this._wrapcb(listener);
// set .listener so that this specific one can
// be removed by .removeListener()
wrapped.listener = listener;
return EventEmitter.prototype.on.call(this, event, wrapped);
};
DataSource.prototype.addListener = DataSource.prototype.on;
/*****
* PRIVATE API
*****/
/**
* Handle errors by allowing them to bubble from the DataSource
* to the courior. Maybe we should walk the inheritance chain too.
* @param {Error} err - The error that occured
* @return {undefined}
*/
DataSource.prototype._error = function (err) {
if (EventEmitter.listenerCount(this, 'error')) {
this.emit('error', err);
} else {
this._courier._error(err);
}
};
/**
* Walk the inheritance chain of a source and return it's
* flat representaion (taking into account merging rules)
* @return {object} - the flat state of the DataSource
*/
DataSource.prototype._flatten = function () {
var type = this._getType();
// the merged state of this dataSource and it's ancestors
var flatState = {};
var collectProp = _.partial(this._mergeProp, flatState);
// walk the chain and merge each property
var current = this;
while (current) {
// stop processing if this or one of it's parents is disabled
if (current._fetchDisabled) return;
// merge the properties from the state into the flattened copy
_.forOwn(current._state, collectProp);
// move to this sources parent
current = current._parent;
}
current = null;
if (type === 'search') {
// defaults for the query
_.forOwn({
query: {
'match_all': {}
}
}, collectProp);
// switch to filtered query if there are filters
if (flatState.filters) {
if (flatState.filters.length) {
flatState.body.query = {
filtered: {
query: flatState.body.query,
filter: {
bool: {
must: flatState.filters
}
}
}
};
}
delete flatState.filters;
}
}
return flatState;
};
/**
* Wrap a function in $scope.$apply or $scope.$eval for the Source's
* current $scope
*
* @param {Function} cb - the function to wrap
* @return {Function} - the wrapped function
*/
DataSource.prototype._wrapcb = function (cb) {
var source = this;
cb = (typeof cb !== 'function') ? _.noop : cb;
var wrapped = function () {
var args = arguments;
// always use the stored ref so that it can be updated if needed
var $scope = source._$scope;
// don't fall apart if we don't have a scope
if (!$scope) return cb.apply(source, args);
// use angular's $apply or $eval functions for the given scope
$scope[$scope.$root.$$phase ? '$eval' : '$apply'](function () {
cb.apply(source, args);
});
};
return wrapped;
};
return DataSource;
});

View file

@ -1,247 +0,0 @@
define(function (require) {
var DataSource = require('courier/data_source/data_source');
var inherits = require('utils/inherits');
var nextTick = require('utils/next_tick');
var VersionConflict = require('courier/errors').VersionConflict;
var FetchFailure = require('courier/errors').FetchFailure;
var RequestFailure = require('courier/errors').RequestFailure;
var listenerCount = require('utils/event_emitter').listenerCount;
var _ = require('lodash');
function DocSource(courier, initialState) {
DataSource.call(this, courier, initialState);
}
inherits(DocSource, DataSource);
/**
* Method used by the Courier to fetch multiple DocSource objects at one time.
* Those objects come in via the refs array, which is a list of objects containing
* at least `source` and `version` keys.
*
* @param {Courier} courier - The courier requesting the records
* @param {array} refs - The list of refs
* @param {Function} cb - Callback
* @return {undefined}
*/
DocSource.fetch = function (courier, refs, cb) {
var client = courier._getClient();
var allRefs = [];
var body = {
docs: []
};
refs.forEach(function (ref) {
var source = ref.source;
var state = source._flatten();
if (!state || !state._id) return;
allRefs.push(ref);
body.docs.push(state);
});
// always callback asynchronously
if (!allRefs.length) return nextTick(cb);
return client.mget({ body: body }, function (err, resp) {
if (err) return cb(err);
resp.docs.forEach(function (resp, i) {
var ref = allRefs[i];
var source = ref.source;
if (resp.error) return source._error(new FetchFailure(resp));
if (resp.found) {
if (ref.version === resp._version) return; // no change
ref.version = resp._version;
source._storeVersion(resp._version);
} else {
ref.version = void 0;
source._clearVersion();
}
source._previousResult = resp;
source.emit('results', resp);
});
cb(void 0, resp);
});
};
/**
* Method used to check if a list of refs is
* @param {[type]} courier [description]
* @param {[type]} refs [description]
* @param {Function} cb [description]
* @return {[type]} [description]
*/
DocSource.validate = function (courier, refs, cb) {
var invalid = _.filter(refs, function (ref) {
var storedVersion = ref.source._getVersion();
if (!storedVersion && ref.version) {
// stored version was cleared, we need to clear our cached
delete ref.version;
}
/* jshint eqeqeq: false */
return (!ref.fetchCount || !ref.version || ref.version != storedVersion);
});
// callbacks should always be called async
nextTick(cb, void 0, invalid);
};
/*****
* PUBLIC API
*****/
/**
* List of methods that is turned into a chainable API in the constructor
* @type {Array}
*/
DocSource.prototype._methods = [
'index',
'type',
'id',
'sourceInclude',
'sourceExclude'
];
/**
* Applies a partial update to the document
* @param {object} fields - The fields to change and their new values (es doc field)
* @param {Function} cb - Callback to know when the update is complete
* @return {undefined}
*/
DocSource.prototype.doUpdate = function (fields, cb) {
if (!this._state.id) return this.doIndex(fields, cb);
return this._sendToEs('update', true, { doc: fields }, cb);
};
/**
* Update the document stored
* @param {[type]} body [description]
* @param {Function} cb [description]
* @return {[type]} [description]
*/
DocSource.prototype.doIndex = function (body, cb) {
return this._sendToEs('index', false, body, cb);
};
/*****
* PRIVATE API
*****/
/**
* Get the type of this DataSource
* @return {string} - 'doc'
*/
DocSource.prototype._getType = function () {
return 'doc';
};
/**
* Used to merge properties into the state within ._flatten().
* The state is passed in and modified by the function
*
* @param {object} state - the current merged state
* @param {*} val - the value at `key`
* @param {*} key - The key of `val`
* @return {undefined}
*/
DocSource.prototype._mergeProp = function (state, val, key) {
key = '_' + key;
if (val != null && state[key] == null) {
state[key] = val;
}
};
/**
* Creates a key based on the doc's index/type/id
* @return {string}
*/
DocSource.prototype._versionKey = function () {
var state = this._state;
return 'DocVersion:' + (
[
state.index,
state.type,
state.id
]
.map(encodeURIComponent)
.join('/')
);
};
/**
* Fetches the stored version from localStorage
* @return {number} - the version number, or NaN
*/
DocSource.prototype._getVersion = function () {
var v = localStorage.getItem(this._versionKey());
return v ? _.parseInt(v) : void 0;
};
/**
* Stores the version into localStorage
* @param {number, NaN} version - the current version number, NaN works well forcing a refresh
* @return {undefined}
*/
DocSource.prototype._storeVersion = function (version) {
if (!version) return this._clearVersion();
var id = this._versionKey();
localStorage.setItem(id, version);
};
/**
* Clears the stored version for a DocSource
*/
DocSource.prototype._clearVersion = function () {
var id = this._versionKey();
localStorage.removeItem(id);
};
/**
* Backend for doUpdate and doIndex
* @param {String} method - the client method to call
* @param {Boolean} validateVersion - should our knowledge
* of the the docs current version be sent to es?
* @param {String} body - HTTP request body
* @param {Function} cb - callback
*/
DocSource.prototype._sendToEs = function (method, validateVersion, body, cb) {
cb = this._wrapcb(cb);
var source = this;
var courier = this._courier;
var client = courier._getClient();
// straight assignment will causes undefined values
var params = _.pick(this._state, 'id', 'type', 'index');
params.body = body;
params.ignore = [409];
if (validateVersion) {
params.version = source._getVersion();
}
client[method](params, function (err, resp) {
if (err) return cb(new RequestFailure(err, resp));
if (resp && resp.status === 409) {
err = new VersionConflict(resp);
if (listenerCount(source, 'conflict')) {
return source.emit('conflict', err);
} else {
return cb(err);
}
}
source._storeVersion(resp._version);
courier._docUpdated(source);
if (typeof cb === 'function') return cb(void 0, resp._id);
});
};
return DocSource;
});

View file

@ -1,143 +0,0 @@
define(function (require) {
var DataSource = require('courier/data_source/data_source');
var inherits = require('utils/inherits');
var nextTick = require('utils/next_tick');
var errors = require('courier/errors');
var FetchFailure = require('courier/errors').FetchFailure;
var RequestFailure = require('courier/errors').RequestFailure;
var _ = require('lodash');
function SearchSource(courier, initialState) {
DataSource.call(this, courier, initialState);
}
inherits(SearchSource, DataSource);
/**
* Method used by the Courier to fetch multiple SearchSource request at a time.
* Those objects come in via the refs array, which is a list of objects containing
* a `source` keys.
*
* @param {Courier} courier - The courier requesting the results
* @param {array} refs - The list of refs
* @param {Function} cb - Callback
* @return {undefined}
*/
SearchSource.fetch = function (courier, refs, cb) {
var client = courier._getClient();
var body = '';
cb = cb || _.noop;
// we must have refs in the same order to process the response
// so copy them here in the right order
var allRefs = [];
refs.forEach(function (ref) {
var source = ref.source;
var state = source._flatten();
if (!state) return;
allRefs.push(ref);
body +=
JSON.stringify({ index: state.index, type: state.type })
+ '\n'
+ JSON.stringify(state.body)
+ '\n';
});
// always callback async
if (!allRefs.length) return nextTick(cb);
return client.msearch({ body: body }, function (err, resp) {
if (err) return cb(new RequestFailure(err, resp));
_.each(resp.responses, function (resp, i) {
var source = allRefs[i].source;
if (resp.error) return source._error(new FetchFailure(resp));
source.emit('results', resp);
});
cb(void 0, resp);
});
};
/*****
* PUBLIC API
*****/
/**
* List of the editable state properties that turn into a
* chainable API
*
* @type {Array}
*/
SearchSource.prototype._methods = [
'index',
'indexInterval', // monthly, daily, etc
'type',
'query',
'filter',
'sort',
'highlight',
'aggs',
'from',
'size',
'source'
];
/**
* Set a searchSource that this source should inherit from
* @param {SearchSource} searchSource - the parent searchSource
* @return {this} - chainable
*/
SearchSource.prototype.inherits = function (parent) {
this._parent = parent;
return this;
};
/******
* PRIVATE APIS
******/
/**
* Gets the type of the DataSource
* @return {string}
*/
SearchSource.prototype._getType = function () {
return 'search';
};
/**
* Used to merge properties into the state within ._flatten().
* The state is passed in and modified by the function
*
* @param {object} state - the current merged state
* @param {*} val - the value at `key`
* @param {*} key - The key of `val`
* @return {undefined}
*/
SearchSource.prototype._mergeProp = function (state, val, key) {
switch (key) {
case 'filter':
state.filters = state.filters || [];
state.filters.push(val);
return;
case 'index':
case 'type':
case 'id':
if (key && state[key] == null) {
state[key] = val;
}
return;
case 'source':
key = '_source';
/* fall through */
default:
state.body = state.body || {};
if (key && state.body[key] == null) {
state.body[key] = val;
}
}
};
return SearchSource;
});

View file

@ -1,116 +0,0 @@
define(function (require) {
var listenerCount = require('utils/event_emitter').listenerCount;
var _ = require('lodash');
var errors = {};
var inherits = require('utils/inherits');
var canStack = (function () {
var err = new Error();
return !!err.stack;
}());
// abstract error class
function CourierError(msg, constructor) {
this.message = msg;
Error.call(this, this.message);
if (Error.captureStackTrace) {
Error.captureStackTrace(this, constructor || CourierError);
} else if (canStack) {
this.stack = (new Error()).stack;
} else {
this.stack = '';
}
}
errors.CourierError = CourierError;
inherits(CourierError, Error);
/**
* HastyRefresh error class
* @param {String} [msg] - An error message that will probably end up in a log.
*/
errors.HastyRefresh = function HastyRefresh() {
CourierError.call(this,
'Courier attempted to start a query before the previous had finished.',
errors.HastyRefresh);
};
inherits(errors.HastyRefresh, CourierError);
/**
* Request Failure - When an entire mutli request fails
* @param {Error} err - the Error that came back
* @param {Object} resp - optional HTTP response
*/
errors.RequestFailure = function RequestFailure(err, resp) {
CourierError.call(this,
'Request to Elasticsearch failed: ' + JSON.stringify(resp || err.message),
errors.RequestFailure);
this.origError = err;
this.resp = resp;
};
inherits(errors.RequestFailure, CourierError);
/**
* FetchFailure Error - when there is an error getting a doc or search within
* a multi-response response body
* @param {String} [msg] - An error message that will probably end up in a log.
*/
errors.FetchFailure = function FetchFailure(resp) {
CourierError.call(this,
'Failed to get the doc: ' + JSON.stringify(resp),
errors.FetchFailure);
this.resp = resp;
};
inherits(errors.FetchFailure, CourierError);
/**
* A doc was re-indexed but it was out of date.
* @param {Object} resp - The response from es (one of the multi-response responses).
*/
errors.VersionConflict = function VersionConflict(resp) {
CourierError.call(this,
'Failed to store document changes do to a version conflict.',
errors.VersionConflict);
this.resp = resp;
};
inherits(errors.VersionConflict, CourierError);
/**
* there was a conflict storing a doc
* @param {String} field - the fields which contains the conflict
*/
errors.MappingConflict = function MappingConflict(field) {
CourierError.call(this,
'Field ' + field + ' is defined as at least two different types in indices matching the pattern',
errors.MappingConflict);
};
inherits(errors.MappingConflict, CourierError);
/**
* a non-critical cache write to elasticseach failed
*/
errors.CacheWriteFailure = function CacheWriteFailure() {
CourierError.call(this,
'A Elasticsearch cache write has failed.',
errors.CacheWriteFailure);
};
inherits(errors.CacheWriteFailure, CourierError);
/**
* when a field mapping is requested for an unknown field
* @param {String} name - the field name
*/
errors.FieldNotFoundInCache = function FieldNotFoundInCache(name) {
CourierError.call(this,
'The ' + name + ' field was not found in the cached mappings',
errors.FieldNotFoundInCache);
};
inherits(errors.FieldNotFoundInCache, CourierError);
return errors;
});

View file

@ -1,273 +0,0 @@
define(function (require) {
var _ = require('lodash');
var Error = require('courier/errors');
var CacheWriteFailure = require('courier/errors').CacheWriteFailure;
var MappingConflict = require('courier/errors').MappingConflict;
var nextTick = require('utils/next_tick');
/**
* - Resolves index patterns
* - Fetches mappings from elasticsearch
* - casts result object fields using mappings
*
* @class Mapper
*/
function Mapper(courier, config) {
// Exclude anything wirh empty mapping except these
var reservedFields = {
'_id': { type: 'string' },
'_type': { type: 'string' },
'_index': { type: 'string' }
};
// Save a reference to this
var self = this;
config = _.defaults(config || {}, {
cacheIndex: 'kibana4-int',
cacheType: 'mappings'
});
// Store mappings we've already loaded from Elasticsearch
var mappings = {};
/**
* Gets an object containing all fields with their mappings
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
this.getFields = function (dataSource, callback) {
if (!dataSource.get('index')) {
// always callback async
nextTick(callback, new TypeError('dataSource must have an index before it\'s fields can be fetched'));
return;
}
// If we already have the fields in our object, use that
var cached = self.getFieldsFromObject(dataSource);
if (cached) {
// always callback async
nextTick(callback, void 0, cached);
} else {
// Otherwise, try to get fields from Elasticsearch cache
self.getFieldsFromCache(dataSource, function (err, fields) {
if (err) {
// If we are unable to get the fields from cache, get them from mapping
self.getFieldsFromMapping(dataSource, function (err, fields) {
if (err) return callback(err);
// And then cache them
cacheFieldsToElasticsearch(config, dataSource.get('index'), fields, function (err, response) {
// non-critical error, should not interupt function completion
if (err) courier._error(new CacheWriteFailure());
});
cacheFieldsToObject(dataSource, fields);
callback(err, fields);
});
} else {
cacheFieldsToObject(dataSource, fields);
callback(err, self.getFieldsFromObject(dataSource));
}
});
}
};
/**
* Gets an object containing the mapping for a field
* @param {dataSource} dataSource
* @param {String} field The dot notated name of a field to get the mapping for
* @param {Function} callback A function to be executed with the results.
*/
this.getFieldMapping = function (dataSource, field, callback) {
self.getFields(dataSource, function (err, fields) {
// TODO: errors should probably be passed to the callback
if (_.isUndefined(fields[field])) return courier._error(new Error.FieldNotFoundInCache(field));
callback(err, fields[field]);
});
};
/**
* Gets an object containing the mapping for a field
* @param {dataSource} dataSource
* @param {Array} fields The dot notated names of a fields to get the mapping for
* @param {Function} callback A function to be executed with the results.
*/
this.getFieldsMapping = function (dataSource, fields, callback) {
self.getFields(dataSource, function (err, map) {
// TODO: errors should probably be handled before _mapping is created
var _mapping = _.object(_.map(fields, function (field) {
// TODO: errors should probably be passed to the callback
if (_.isUndefined(map[field])) return courier._error(new Error.FieldNotFoundInCache(field));
return [field, map[field]];
}));
callback(err, _mapping);
});
};
/**
* Gets an object containing all fields with their mappings from kibana's cache in Elasticsearch
* @param {dataSource} dataSource
* @return {Object} An object containing fields with their mappings, or false if not found.
*/
this.getFieldsFromObject = function (dataSource) {
// don't pass pack our reference to truth, cloneDeep it
return !_.isUndefined(mappings[dataSource.get('index')]) ? _.cloneDeep(mappings[dataSource.get('index')]) : false;
};
/**
* Gets an object containing all fields with their mappings from kibana's cache in Elasticsearch
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
this.getFieldsFromCache = function (dataSource, callback) {
var client = courier._getClient();
var params = {
index: config.cacheIndex,
type: config.cacheType,
id: dataSource.get('index'),
};
client.getSource(params, callback);
};
/**
* Gets an object containing all fields with their mappings directly from Elasticsearch
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
this.getFieldsFromMapping = function (dataSource, callback) {
var client = courier._getClient();
var params = {
// TODO: Change index to be newest resolved index. Change _state.index to id().
index: dataSource.get('index'),
field: '*',
};
// TODO: Add week/month check
client.indices.getFieldMapping(params, function (err, response, status) {
var fields = {};
_.each(response, function (index, indexName) {
if (indexName === config.cacheIndex) return;
_.each(index.mappings, function (type) {
_.each(type, function (field, name) {
if (_.size(field.mapping) === 0 || name[0] === '_') return;
var mapping = field.mapping[_.keys(field.mapping)[0]];
mapping.type = castMappingType(mapping.type);
if (fields[name]) {
if (fields[name].type === mapping.type) return;
// TODO: errors should probably be passed to the callback
return courier._error(new MappingConflict(name));
}
fields[name] = field.mapping[_.keys(field.mapping)[0]];
});
});
});
// TODO if these are mapped differently this might cause problems
_.assign(fields, reservedFields);
callback(err, fields);
});
};
/**
* Stores processed mappings in Elasticsearch
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
var cacheFieldsToElasticsearch = function (config, index, fields, callback) {
var client = courier._getClient();
client.index({
index: config.cacheIndex,
type: config.cacheType,
id: index,
body: fields
}, callback);
};
/**
* Stores processed mappings in an object
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
var cacheFieldsToObject = function (dataSource, fields) {
mappings[dataSource.get('index')] = _.cloneDeep(fields);
return !_.isUndefined(mappings[dataSource.get('index')]) ? true : false;
};
/**
* Accepts a mapping type, and converts it into it's js equivilent
* @param {String} type - the type from the mapping's 'type' field
* @return {String} - the most specific type that we care for
*/
var castMappingType = function (type) {
switch (type) {
case 'float':
case 'double':
case 'integer':
case 'long':
case 'short':
case 'byte':
case 'token_count':
return 'number';
case 'date':
case 'boolean':
case 'ip':
case 'attachment':
case 'geo_point':
case 'geo_shape':
return type;
default: // including 'string'
return 'string';
}
};
/**
* Clears mapping caches from elasticsearch and from local object
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
this.clearCache = function (dataSource, callback) {
var client = courier._getClient();
if (!_.isUndefined(mappings[dataSource.get('index')])) {
delete mappings[dataSource.get('index')];
}
client.delete({
index: config.cacheIndex,
type: config.cacheType,
id: dataSource.get('index')
}, callback);
};
/**
* Sets a number of fields to be ignored in the mapping. Not sure this is a good idea?
* @param {dataSource} dataSource
* @param {Array} fields An array of fields to be ignored
* @param {Function} callback A function to be executed with the results.
*/
this.ignoreFields = function (dataSource, fields, callback) {
if (!_.isArray(fields)) fields = [fields];
var ignore = _.object(_.map(fields, function (field) {
return [field, {type: 'ignore'}];
}));
self.getFields(dataSource, function (err, mapping) {
_.assign(mapping, ignore);
callback(err, mapping);
});
};
}
return Mapper;
});

View file

@ -4,8 +4,8 @@ define(function (require) {
require('css!./styles/main.css');
require('directives/config');
require('services/courier');
require('services/config');
require('courier/courier');
require('config/config');
require('apps/dashboard/directives/grid');
require('apps/dashboard/directives/panel');
@ -15,20 +15,18 @@ define(function (require) {
'kibana/services'
]);
app.config(function ($routeProvider) {
$routeProvider
.when('/dashboard', {
templateUrl: 'kibana/apps/dashboard/index.html'
})
.when('/dashboard/:source', {
redirectTo: '/dashboard'
})
.when('/dashboard/:source/:path', {
templateUrl: 'kibana/apps/dashboard/index.html'
})
.when('/dashboard/:source/:path/:params', {
templateUrl: 'kibana/apps/dashboard/index.html'
});
require('routes')
.when('/dashboard', {
templateUrl: 'kibana/apps/dashboard/index.html'
})
.when('/dashboard/:source', {
redirectTo: '/dashboard'
})
.when('/dashboard/:source/:path', {
templateUrl: 'kibana/apps/dashboard/index.html'
})
.when('/dashboard/:source/:path/:params', {
templateUrl: 'kibana/apps/dashboard/index.html'
});
app.controller('dashboard', function ($scope, $routeParams, $rootScope, $location, courier, configFile) {
@ -44,37 +42,34 @@ define(function (require) {
dashboardSearch(newVal);
});
$scope.editingTitle = false;
var init = function () {
$scope.editingTitle = false;
// Passed in the grid attr to the directive so we can access the directive's function from
// the controller and view
$scope.gridControl = {};
// Passed in the grid attr to the directive so we can access the directive's function from
// the controller and view
$scope.gridControl = {};
// This must be setup to pass to $scope.configurable, even if we will overwrite it immediately
$scope.dashboard = {
title: 'New Dashboard',
panels: []
};
// All inputs go here.
$scope.input = {
search: ''
};
// Setup configurable values for config directive, after objects are initialized
$scope.configurable = {
dashboard: $scope.dashboard,
load: $scope.load,
input: {
search: $scope.input.search
}
};
$scope.$broadcast('application.load');
// This must be setup to pass to $scope.configurable, even if we will overwrite it immediately
$scope.dashboard = {
title: 'New Dashboard',
panels: []
};
// All inputs go here.
$scope.input = {
search: ''
};
// Setup configurable values for config directive, after objects are initialized
$scope.configurable = {
dashboard: $scope.dashboard,
load: $scope.load,
input: {
search: $scope.input.search
}
};
$rootScope.$broadcast('application.load');
var dashboardSearch = function (query) {
var search;
@ -89,14 +84,12 @@ define(function (require) {
.type('dashboard')
.size(10)
.query(query)
.$scope($scope)
.inherits(courier.rootSearchSource)
.on('results', function (res) {
$scope.configurable.searchResults = res.hits.hits;
});
search.fetch();
.inherits(courier.rootSearchSource);
search.onResults().then(function onResults(res) {
$scope.configurable.searchResults = res.hits.hits;
search.onResults().then(onResults);
});
};
var setConfigTemplate = function (template) {
@ -127,8 +120,7 @@ define(function (require) {
var doc = courier.createSource('doc')
.index(configFile.kibanaIndex)
.type('dashboard')
.id(title)
.$scope($scope);
.id(title);
doc.doIndex({title: title, panels: $scope.gridControl.serializeGrid()})
.then(function (res, err) {
@ -167,7 +159,5 @@ define(function (require) {
courier.fetch();
};
init();
});
});

View file

@ -8,6 +8,17 @@ define(function (require) {
require('services/state');
require('routes')
.when('/discover/:id?', {
templateUrl: 'kibana/apps/discover/index.html',
reloadOnSearch: false,
resolve: {
search: function (savedSearches, $route) {
return savedSearches.get($route.current.params.id);
}
}
});
var intervals = [
{ display: '', val: null },
{ display: 'Hourly', val: 'hourly' },
@ -21,13 +32,12 @@ define(function (require) {
var notify = createNotifier({
location: 'Discover'
});
var locals = $route.current.locals;
var search = locals.search;
var search = $route.current.locals.search;
if (!search) return notify.fatal('search failed to load');
/* Manage state & url state */
var initialQuery = search.get('query');
function loadState() {
@ -53,13 +63,13 @@ define(function (require) {
};
// track the initial state of the search
var searchIsPhantom = search.phantom;
var searchIsUnsaved = search.unsaved;
$scope.opts.saveDataSource = function () {
search.save()
.then(function () {
notify.info('Saved Data Source "' + search.details.name + '"');
if (searchIsPhantom) {
searchIsPhantom = false;
if (searchIsUnsaved) {
searchIsUnsaved = false;
$location.url('/discover/' + search.get('id'));
}
}, notify.error);
@ -81,14 +91,15 @@ define(function (require) {
}
});
search
.$scope($scope)
.inherits(courier.rootSearchSource)
.on('results', function (res) {
if (!$scope.fields) getFields();
search.inherits(courier.rootSearchSource);
$scope.rows = res.hits.hits;
});
search.onResults().then(function onResults(resp) {
if (!$scope.fields) getFields();
$scope.rows = resp.hits.hits;
return search.onResults(onResults);
});
$scope.$on('$destroy', _.bindKey(search, 'cancelPending'));
$scope.getSort = function () {

View file

@ -1,116 +0,0 @@
define(function (require) {
var app = require('modules').get('app/discover');
var bind = require('lodash').bind;
var assign = require('lodash').assign;
var nextTick = require('utils/next_tick');
app.factory('SavedSearch', function (configFile, courier, $q) {
function SavedSearch(id) {
var search = courier.createSource('search');
search._doc = courier.createSource('doc')
.index(configFile.kibanaIndex)
.type('saved_searches')
.id(id || void 0)
.on('results', function onResults(resp) {
if (!resp.found) {
search._doc.removeListener('results', onResults);
search.emit('noconfig', new Error('Unable to find that Saved Search...'));
}
search.set(resp._source.state);
search.details = resp._source.details;
assign(search.deatils, resp._source.details);
if (!id) {
id = resp._id;
// it's no longer a phantom
search.phantom = false;
}
if (!search.ready()) {
search.ready(true);
// allow the search to be fetched automatically
search.enableAuthFetch();
}
});
search._dynamicState.id = function () {
return search._doc.get('id');
};
search.phantom = true;
search.details = {
name: '',
hits: 0
};
search.ready = (function () {
var queue = id ? [] : false;
var err;
return function (cb) {
switch (typeof cb) {
// check if we are ready yet
case 'undefined':
return !queue;
// call or queue a function once ready
case 'function':
if (queue) {
// queue will be false once complete
queue.push(cb);
} else {
// always callback async
nextTick(cb, err);
}
return;
// indicate that we are ready, or there was a failure loading
default:
if (queue && cb) {
if (cb instanceof Error) {
err = cb;
}
// if queued functions are confused, and ask us if
// we are ready, we should tell them yes
var fns = queue;
queue = false;
// be sure to send out the error we got if there was one
fns.forEach(function (fn) { fn(err); });
}
}
};
}());
search.save = function () {
var defer = $q.defer();
search._doc.doIndex({
details: search.details,
state: search.toJSON()
}, function (err, id) {
if (err) return defer.reject(err);
search._doc.id(id);
defer.resolve();
});
return defer.promise;
};
if (!id) {
// we have nothing left to load
search.ready(true);
} else {
// before this search is fetched, it's config needs to be loaded
search.disableAutoFetch();
// get the config doc now
search._doc.fetch();
}
return search;
}
return SavedSearch;
});
});

View file

@ -1,21 +1,7 @@
define(function (require, module, exports) {
require('directives/table');
require('./field_chooser');
require('./services/saved_searches');
require('./saved_searches/service');
require('./timechart');
require('./controllers/discover');
var app = require('modules').get('app/discover');
app.config(function ($routeProvider) {
$routeProvider.when('/discover/:id?', {
templateUrl: 'kibana/apps/discover/index.html',
reloadOnSearch: false,
resolve: {
search: function (savedSearches, $route) {
return savedSearches.get($route.current.params.id);
}
}
});
});
});

View file

@ -0,0 +1,78 @@
define(function (require) {
var module = require('modules').get('discover/saved_searches');
var _ = require('lodash');
var inherits = require('utils/inherits');
module.factory('SavedSearch', function (configFile, courier, Promise, createNotifier, CouriersSearchSource) {
var notify = createNotifier({
location: 'Saved Search'
});
function SavedSearch(id) {
CouriersSearchSource.call(this, courier);
var search = this;
var doc = courier.createSource('doc')
.index(configFile.kibanaIndex)
.type('saved_searches')
.id(id || void 0);
search._dynamicState.id = function () {
return doc.get('id');
};
search.phantom = true;
search.details = {
name: '',
hits: 0
};
function processDocResp(resp) {
if (!resp.found) {
throw new Error('Unable to find that Saved Search...');
}
if (!doc.get('id')) {
search.phantom = false;
doc.set('id', resp._id);
}
search.set(resp._source.state);
_.assign(search.details, resp._source.details);
}
function autoUpdateDoc() {
return doc.onUpdate(processDocResp)
.then(function onUpdate(resp) {
processDocResp(resp);
return doc.onUpdate(onUpdate);
});
}
this.save = function () {
return doc.doIndex({
details: search.details,
state: search.toJSON()
}).then(function (id) {
doc.set('id', id);
return id;
});
};
search.init = _.once(function () {
// nothing to do unless the doc is actually saved
if (!doc.get('id')) return Promise.resolved(search);
return doc.fetch().then(function (resp) {
processDocResp(resp);
autoUpdateDoc().catch(notify.fatal);
return search;
});
});
}
inherits(SavedSearch, CouriersSearchSource);
return SavedSearch;
});
});

View file

@ -0,0 +1,17 @@
define(function (require) {
var module = require('modules').get('discover/saved_searches');
var _ = require('lodash');
require('./saved_search');
module.service('savedSearches', function (courier, configFile, createNotifier, SavedSearch) {
var notify = createNotifier({
location: 'Saved Searches'
});
this.get = function (id) {
return (new SavedSearch(id)).init();
};
});
});

View file

@ -1,25 +0,0 @@
define(function (require) {
var module = require('modules').get('kibana/services');
var _ = require('lodash');
require('../factories/saved_search');
module.service('savedSearches', function (courier, configFile, $q, createNotifier, SavedSearch) {
var notify = createNotifier({
location: 'Saved Searches'
});
this.get = function (id) {
var defer = $q.defer();
var search = new SavedSearch(id);
search.ready(function (err) {
if (err) defer.reject(err);
else defer.resolve(search);
});
return defer.promise;
};
});
});

View file

@ -5,11 +5,9 @@ define(function (require) {
var app = require('modules').get('app/examples');
app.config(function ($routeProvider) {
$routeProvider
.when('/examples', {
templateUrl: 'kibana/apps/examples/index.html'
});
require('routes')
.when('/examples', {
templateUrl: 'kibana/apps/examples/index.html'
});
// main controller for the examples
@ -70,12 +68,13 @@ define(function (require) {
.type($scope.type)
.source({
include: 'country'
})
.$scope($scope)
.on('results', function (resp) {
$scope.count ++;
$scope.json = JSON.stringify(resp.hits, null, ' ');
});
source.onResults().then(function onResults(resp) {
$scope.count ++;
$scope.json = JSON.stringify(resp.hits, null, ' ');
return source.onResults().then(onResults);
});
}
};
});
@ -101,13 +100,14 @@ define(function (require) {
var source = courier.createSource('doc')
.id($scope.id)
.type($scope.type)
.index($scope.index)
.$scope($scope)
.on('results', function (doc) {
currentSource = doc._source;
$scope.count ++;
$scope.json = JSON.stringify(doc, null, ' ');
});
.index($scope.index);
source.onResults().then(function onResults(doc) {
currentSource = doc._source;
$scope.count ++;
$scope.json = JSON.stringify(doc, null, ' ');
return source.onResults().then(onResults);
});
}
};
});
@ -131,13 +131,15 @@ define(function (require) {
.type($scope.type)
.source({
include: 'geo'
})
.$scope($scope)
.on('results', function (resp) {
$scope.count ++;
$scope.json = JSON.stringify(resp.hits, null, ' ');
});
source.onResults().then(function onResults(resp) {
$scope.count ++;
$scope.json = JSON.stringify(resp.hits, null, ' ');
return source.onResults().then(onResults);
});
var fields = $scope.fields.split(',');
fields.forEach(function (field) {

View file

@ -5,11 +5,9 @@ define(function (require) {
require('../factories/vis');
require('../services/aggs');
app.config(function ($routeProvider) {
$routeProvider
.when('/visualize', {
templateUrl: 'kibana/apps/visualize/index.html'
});
require('routes')
.when('/visualize', {
templateUrl: 'kibana/apps/visualize/index.html'
});
app.controller('Visualize', function ($scope, courier, createNotifier, Vis, Aggs) {
@ -94,8 +92,6 @@ define(function (require) {
}
});
vis.dataSource.$scope($scope);
$scope.refreshFields = function () {
$scope.fields = null;
vis.dataSource.clearFieldCache().then(getFields, notify.error);

View file

@ -2,7 +2,7 @@ define(function (require) {
var converters = require('../resp_converters/index');
// var K4D3 = require('K4D3');
function VisualizationDirective() {
function VisualizationDirective(createNotifier) {
return {
restrict: 'E',
template: '<div class="chart"><pre>{{ results | json }}</pre></div>',
@ -12,16 +12,14 @@ define(function (require) {
link: function ($scope, $el) {
var vis = $scope.vis;
vis
.dataSource
.on('results', function (resp) {
$scope.results = vis.buildChartDataFromResponse(resp);
});
var notify = createNotifier({
location: vis.type + ' visualization'
});
if (!vis.dataSource._$scope) {
// only link if the dataSource isn't already linked
vis.dataSource.$scope($scope);
}
vis.dataSource.onResults().then(function onResults(resp) {
$scope.results = vis.buildChartDataFromResponse(resp);
return vis.dataSource.onResults(onResults);
}).catch(notify.fatal);
}
};
}

View file

@ -0,0 +1,115 @@
define(function (require) {
var _ = require('lodash');
var nextTick = require('utils/next_tick');
var configFile = require('../../../config');
var defaults = require('./defaults');
// guid within this window
var nextId = (function () {
var i = 0;
return function () {
return ++i;
};
}());
var module = require('modules').get('kibana/config', ['kibana']);
// allow the rest of the app to get the configFile easily
module.constant('configFile', configFile);
// service for delivering config variables to everywhere else
module.service('config', function ($rootScope, createNotifier, courier, kbnVersion, configFile, Promise) {
var config = this;
var notify = createNotifier({
location: 'Config'
});
var doc = courier.createSource('doc')
.index(configFile.kibanaIndex)
.type('config')
.id(kbnVersion);
var vals = {};
/******
* PUBLIC API
******/
/**
* Executes once and returns a promise that is resolved once the
* config has loaded for the first time.
*
* @return {Promise} - Resolved when the config loads initially
*/
config.init = _.once(function () {
notify.lifecycle('config init');
return doc.fetch()
.then(function (resp) {
if (!resp.found) config._unsaved = true;
vals = resp._source || vals;
notify.lifecycle('config init', true);
});
});
config.get = function (key, defaultVal) {
return vals[key] == null ? vals[key] = _.cloneDeep(defaultVal) : vals[key];
};
config.set = function (key, val) {
// sets a value in the config
// the es doc must be updated successfully for the update to reflect in the get api.
if (vals[key] === val) return Promise.resolved(true);
var update = {};
update[key] = val;
return doc.doUpdate(update)
.then(function () {
config._change(key, val);
return true;
});
};
config.$watch = function (key, listener) {
config.init().then(function () {
$rootScope.$on('change:config.' + key, listener);
listener(config.get(key));
});
};
config.close = function () {};
/*****
* PRIVATE API
*****/
var change = function (key, val) {
notify.log('config change: ' + key + ': ' + vals[key] + ' -> ' + val);
vals[key] = val;
$rootScope.$broadcast('change:config.' + key, val, vals[key]);
};
var trackDocChanges = function () {
notify.log('tracking changes to the config doc source');
var config = this;
doc.onUpdate()
.then(function processUpdate(resp) {
// _change() will not emit changes unless they really changed
_.forOwn(resp._source, function (val, key) {
config._change(key, val);
});
return doc.onUpdate().then(processUpdate);
})
.catch(function (err) {
// filter out abort errors
if (!(err instanceof courier.errors.Abort)) {
notify.error(err);
} else {
notify.log('aborted change tracking for config doc source');
}
});
};
});
});

View file

@ -0,0 +1,7 @@
define(function (require) {
var _ = require('utils/mixins');
return _.flattenWith('.', {
refreshInterval: 10000
});
});

View file

@ -0,0 +1,111 @@
define(function (require) {
var _ = require('lodash');
require('./data_source/doc');
require('./data_source/search');
require('./fetch/fetch');
require('./errors');
require('./looper');
require('services/es');
require('services/promises');
var module = require('modules').get('courier');
module.service('courier', [
'es',
'$rootScope',
'couriersFetch',
'Promise',
'Looper',
'couriersErrors',
'CouriersMapper',
'CouriersDocSource',
'CouriersSearchSource',
function (client, $rootScope, fetch, Promise, Looper, errors, Mapper, DocSource, SearchSource) {
var HastyRefresh = errors.HastyRefresh;
function Courier() {
var courier = this;
courier.errors = errors;
/**
* Queue of pending requests, requests are removed as
* they are processed by fetch.[sourceType]().
* @type {Array}
*/
courier._pendingRequests = [];
var processDocRequests = _.partial(fetch.docs, courier);
var docInterval = new Looper().fn(processDocRequests);
// track the currently executing search resquest
var _activeAutoSearch = null;
function processSearchRequests() {
// fatal if refreshes take longer then the refresh interval
if (_activeAutoSearch) Promise.rejected(new HastyRefresh());
return _activeAutoSearch = fetch.searches(courier).finally(function (res) {
_activeAutoSearch = null;
});
}
var searchInterval = new Looper().fn(processSearchRequests);
courier._mapper = new Mapper(courier);
/**
* update the time between automatic search requests
*
* @chainable
*/
courier.fetchInterval = function (ms) {
searchInterval.ms(ms);
return this;
};
courier.start = function () {
searchInterval.start();
};
/**
* is the currior currently fetching search
* results automatically?
*
* @return {boolean}
*/
courier.started = function () {
return searchInterval.started();
};
/**
* stop the courier from fetching more search
* results, does not stop vaidating docs.
*
* @chainable
*/
courier.stop = function () {
searchInterval.stop();
return this;
};
/**
* create a source object that is a child of this courier
*/
courier.createSource = function (type) {
switch (type) {
case 'doc':
return new DocSource(courier);
case 'search':
return new SearchSource(courier);
}
};
}
return new Courier();
}
]);
});

View file

@ -0,0 +1,218 @@
define(function (require) {
var inherits = require('utils/inherits');
var _ = require('lodash');
var Mapper = require('courier/mapper');
var nextTick = require('utils/next_tick');
var module = require('modules').get('courier/data_sources');
module.factory('CouriersSourceAbstract', function (couriersFetch, Promise) {
function SourceAbstract(courier, initialState) {
this._state = (function () {
// state can be serialized as JSON, and passed back in to restore
if (initialState) {
if (typeof initialState === 'string') {
return JSON.parse(initialState);
} else {
return _.cloneDeep(initialState);
}
} else {
return {};
}
}());
this._dynamicState = this._dynamicState || {};
this._courier = courier;
// get/set internal state values
this._methods.forEach(function (name) {
this[name] = function (val) {
if (val == null) {
delete this._state[name];
} else {
this._state[name] = val;
}
return this;
};
}, this);
}
/*****
* PUBLIC API
*****/
/**
* Get values from the state
* @param {string} name - The name of the property desired
*/
SourceAbstract.prototype.get = function (name) {
var current = this;
while (current) {
if (current._state[name] !== void 0) return current._state[name];
if (current._dynamicState[name] !== void 0) return current._dynamicState[name]();
current = current._parent;
}
};
/**
* Change the entire state of a SourceAbstract
* @param {object|string} state - The SourceAbstract's new state, or a
* string of the state value to set
*/
SourceAbstract.prototype.set = function (state, val) {
if (typeof state === 'string') {
return this[state](val);
}
this._state = state;
return this;
};
/**
* Set the courier for this dataSource
* @chainable
* @param {Courier} newCourier
*/
SourceAbstract.prototype.courier = function (newCourier) {
this._courier = newCourier;
return this;
};
/**
* Create a new dataSource object of the same type
* as this, which inherits this dataSource's properties
* @return {SourceAbstract}
*/
SourceAbstract.prototype.extend = function () {
return this._courier
.createSource(this._getType())
.inherits(this);
};
/**
* fetch the field names for this SourceAbstract
* @param {Function} cb
* @callback {Error, Array} - calls cb with a possible error or an array of field names
*/
SourceAbstract.prototype.getFields = function () {
return this._courier._mapper.getFields(this);
};
/**
* clear the field list cache
* @param {Function} cb
* @callback {Error, Array} - calls cb with a possible error
*/
SourceAbstract.prototype.clearFieldCache = function () {
return this._courier._mapper.clearCache(this);
};
/**
* return a simple, encodable object representing the state of the SourceAbstract
* @return {[type]} [description]
*/
SourceAbstract.prototype.toJSON = function () {
return _.clone(this._state);
};
/**
* Create a string representation of the object
* @return {[type]} [description]
*/
SourceAbstract.prototype.toString = function () {
return JSON.stringify(this.toJSON());
};
/**
* Put a request in to the courier that this Source should
* be fetched on the next run of the courier
* @return {[type]} [description]
*/
SourceAbstract.prototype.onResults = function () {
var defer = Promise.defer();
this._courier._pendingRequests.push({
source: this,
defer: defer
});
return defer.promise;
};
/**
* Fetch just this source ASAP
* @param {Function} cb - callback
*/
SourceAbstract.prototype.fetch = function () {
return couriersFetch[this._getType()](this);
};
/**
* Cancel all pending requests for this dataSource
* @return {undefined}
*/
SourceAbstract.prototype.cancelPending = function () {
var pending = _.where(this._courier._pendingRequests, { source: this});
_.pull.apply(_, [this._courier._pendingRequests].concat(pending));
};
/*****
* PRIVATE API
*****/
/**
* Walk the inheritance chain of a source and return it's
* flat representaion (taking into account merging rules)
* @return {object} - the flat state of the SourceAbstract
*/
SourceAbstract.prototype._flatten = function () {
var type = this._getType();
// the merged state of this dataSource and it's ancestors
var flatState = {};
var collectProp = _.partial(this._mergeProp, flatState);
// walk the chain and merge each property
var current = this;
while (current) {
// stop processing if this or one of it's parents is disabled
if (current._fetchDisabled) return;
// merge the properties from the state into the flattened copy
_.forOwn(current._state, collectProp);
// move to this sources parent
current = current._parent;
}
current = null;
if (type === 'search') {
// defaults for the query
_.forOwn({
query: {
'match_all': {}
}
}, collectProp);
// switch to filtered query if there are filters
if (flatState.filters) {
if (flatState.filters.length) {
flatState.body.query = {
filtered: {
query: flatState.body.query,
filter: {
bool: {
must: flatState.filters
}
}
}
};
}
delete flatState.filters;
}
}
return flatState;
};
return SourceAbstract;
});
});

View file

@ -0,0 +1,183 @@
define(function (require) {
var _ = require('lodash');
var inherits = require('utils/inherits');
var listenerCount = require('utils/event_emitter').listenerCount;
require('./abstract');
var module = require('modules').get('courier/data_sources');
module.factory('CouriersDocSource', function (couriersErrors, CouriersSourceAbstract, Promise) {
var VersionConflict = couriersErrors.VersionConflict;
var RequestFailure = couriersErrors.RequestFailure;
function DocSource(courier, initialState) {
CouriersSourceAbstract.call(this, courier, initialState);
// move onResults over to onUpdate, because that makes more sense
this.onUpdate = this.onResults;
this.onResults = void 0;
}
inherits(DocSource, CouriersSourceAbstract);
/*****
* PUBLIC API
*****/
/**
* List of methods that is turned into a chainable API in the constructor
* @type {Array}
*/
DocSource.prototype._methods = [
'index',
'type',
'id',
'sourceInclude',
'sourceExclude'
];
/**
* Applies a partial update to the document
* @param {object} fields - The fields to change and their new values (es doc field)
* @return {undefined}
*/
DocSource.prototype.doUpdate = function (fields) {
if (!this._state.id) return this.doIndex(fields);
return this._sendToEs('update', true, { doc: fields });
};
/**
* Update the document stored
* @param {[type]} body [description]
* @return {[type]} [description]
*/
DocSource.prototype.doIndex = function (body) {
return this._sendToEs('index', false, body);
};
/*****
* PRIVATE API
*****/
/**
* Get the type of this SourceAbstract
* @return {string} - 'doc'
*/
DocSource.prototype._getType = function () {
return 'doc';
};
/**
* Used to merge properties into the state within ._flatten().
* The state is passed in and modified by the function
*
* @param {object} state - the current merged state
* @param {*} val - the value at `key`
* @param {*} key - The key of `val`
* @return {undefined}
*/
DocSource.prototype._mergeProp = function (state, val, key) {
key = '_' + key;
if (val != null && state[key] == null) {
state[key] = val;
}
};
/**
* Creates a key based on the doc's index/type/id
* @return {string}
*/
DocSource.prototype._versionKey = function () {
var state = this._state;
return 'DocVersion:' + (
[
state.index,
state.type,
state.id
]
.map(encodeURIComponent)
.join('/')
);
};
/**
* Get the cached version number, not the version that is
* stored/shared with other tabs
*
* @return {number} - the version number, or undefined
*/
DocSource.prototype._getVersion = function () {
if (this._version) return this._version;
else return this._getStoredVersion();
};
/**
* Fetches the stored version from localStorage
* @return {[type]} [description]
*/
DocSource.prototype._getStoredVersion = function () {
var v = localStorage.getItem(this._versionKey());
this._version = v ? _.parseInt(v) : void 0;
return this._version;
};
/**
* Stores the version into localStorage
* @param {number, NaN} version - the current version number, NaN works well forcing a refresh
* @return {undefined}
*/
DocSource.prototype._storeVersion = function (version) {
if (!version) return this._clearVersion();
var id = this._versionKey();
localStorage.setItem(id, version);
};
/**
* Clears the stored version for a DocSource
*/
DocSource.prototype._clearVersion = function () {
var id = this._versionKey();
localStorage.removeItem(id);
};
/**
* Backend for doUpdate and doIndex
* @param {String} method - the client method to call
* @param {Boolean} validateVersion - should our knowledge
* of the the docs current version be sent to es?
* @param {String} body - HTTP request body
*/
DocSource.prototype._sendToEs = function (method, validateVersion, body) {
var source = this;
var courier = this._courier;
var client = courier._getClient();
// straight assignment will causes undefined values
var params = _.pick(this._state, ['id', 'type', 'index']);
params.body = body;
params.ignore = [409];
if (validateVersion) {
params.version = source._getVersion();
}
return client[method](params)
.then(function (resp) {
if (resp.status === 409) throw new VersionConflict(resp);
source._storeVersion(resp._version);
courier._docUpdated(source);
return resp._id;
})
.catch(function (err) {
// cast the error
return new RequestFailure(err);
});
};
return DocSource;
});
});

View file

@ -0,0 +1,100 @@
define(function (require) {
var inherits = require('utils/inherits');
var _ = require('lodash');
require('./abstract');
var module = require('modules').get('courier/data_sources');
module.factory('CouriersSearchSource', function (couriersErrors, CouriersSourceAbstract) {
var FetchFailure = couriersErrors.FetchFailure;
var RequestFailure = couriersErrors.RequestFailure;
function SearchSource(courier, initialState) {
CouriersSourceAbstract.call(this, courier, initialState);
}
inherits(SearchSource, CouriersSourceAbstract);
/*****
* PUBLIC API
*****/
/**
* List of the editable state properties that turn into a
* chainable API
*
* @type {Array}
*/
SearchSource.prototype._methods = [
'index',
'indexInterval', // monthly, daily, etc
'type',
'query',
'filter',
'sort',
'highlight',
'aggs',
'from',
'size',
'source'
];
/**
* Set a searchSource that this source should inherit from
* @param {SearchSource} searchSource - the parent searchSource
* @return {this} - chainable
*/
SearchSource.prototype.inherits = function (parent) {
this._parent = parent;
return this;
};
/******
* PRIVATE APIS
******/
/**
* Gets the type of the DataSource
* @return {string}
*/
SearchSource.prototype._getType = function () {
return 'search';
};
/**
* Used to merge properties into the state within ._flatten().
* The state is passed in and modified by the function
*
* @param {object} state - the current merged state
* @param {*} val - the value at `key`
* @param {*} key - The key of `val`
* @return {undefined}
*/
SearchSource.prototype._mergeProp = function (state, val, key) {
switch (key) {
case 'filter':
state.filters = state.filters || [];
state.filters.push(val);
return;
case 'index':
case 'type':
case 'id':
if (key && state[key] == null) {
state[key] = val;
}
return;
case 'source':
key = '_source';
/* fall through */
default:
state.body = state.body || {};
if (key && state.body[key] == null) {
state.body[key] = val;
}
}
};
return SearchSource;
});
});

View file

@ -0,0 +1,131 @@
define(function (require) {
var _ = require('lodash');
var module = require('modules').get('courier/errors');
var inherits = require('utils/inherits');
var canStack = (function () {
var err = new Error();
return !!err.stack;
}());
module.service('couriersErrors', function () {
var errors = this;
// abstract error class
function CourierError(msg, constructor) {
this.message = msg;
Error.call(this, this.message);
if (Error.captureStackTrace) {
Error.captureStackTrace(this, constructor || CourierError);
} else if (canStack) {
this.stack = (new Error()).stack;
} else {
this.stack = '';
}
}
errors.CourierError = CourierError;
inherits(CourierError, Error);
/**
* HastyRefresh error class
* @param {String} [msg] - An error message that will probably end up in a log.
*/
errors.HastyRefresh = function HastyRefresh() {
CourierError.call(this,
'Courier attempted to start a query before the previous had finished.',
errors.HastyRefresh);
};
inherits(errors.HastyRefresh, CourierError);
/**
* Request Failure - When an entire mutli request fails
* @param {Error} err - the Error that came back
* @param {Object} resp - optional HTTP response
*/
errors.RequestFailure = function RequestFailure(err, resp) {
CourierError.call(this,
'Request to Elasticsearch failed: ' + JSON.stringify(resp || err.message),
errors.RequestFailure);
this.origError = err;
this.resp = resp;
};
inherits(errors.RequestFailure, CourierError);
/**
* FetchFailure Error - when there is an error getting a doc or search within
* a multi-response response body
* @param {String} [msg] - An error message that will probably end up in a log.
*/
errors.FetchFailure = function FetchFailure(resp) {
CourierError.call(this,
'Failed to get the doc: ' + JSON.stringify(resp),
errors.FetchFailure);
this.resp = resp;
};
inherits(errors.FetchFailure, CourierError);
/**
* A doc was re-indexed but it was out of date.
* @param {Object} resp - The response from es (one of the multi-response responses).
*/
errors.VersionConflict = function VersionConflict(resp) {
CourierError.call(this,
'Failed to store document changes do to a version conflict.',
errors.VersionConflict);
this.resp = resp;
};
inherits(errors.VersionConflict, CourierError);
/**
* there was a conflict storing a doc
* @param {String} field - the fields which contains the conflict
*/
errors.MappingConflict = function MappingConflict(field) {
CourierError.call(this,
'Field ' + field + ' is defined as at least two different types in indices matching the pattern',
errors.MappingConflict);
};
inherits(errors.MappingConflict, CourierError);
/**
* a field mapping was using a restricted fields name
* @param {String} field - the fields which contains the conflict
*/
errors.RestrictedMapping = function RestrictedMapping(field, index) {
var msg = field + ' is a restricted field name';
if (index) msg += ', found it while attempting to fetch mapping for index pattern: ' + index;
CourierError.call(this,
msg,
errors.RestrictedMapping);
};
inherits(errors.RestrictedMapping, CourierError);
/**
* a non-critical cache write to elasticseach failed
*/
errors.CacheWriteFailure = function CacheWriteFailure() {
CourierError.call(this,
'A Elasticsearch cache write has failed.',
errors.CacheWriteFailure);
};
inherits(errors.CacheWriteFailure, CourierError);
/**
* when a field mapping is requested for an unknown field
* @param {String} name - the field name
*/
errors.FieldNotFoundInCache = function FieldNotFoundInCache(name) {
CourierError.call(this,
'The ' + name + ' field was not found in the cached mappings',
errors.FieldNotFoundInCache);
};
inherits(errors.FieldNotFoundInCache, CourierError);
});
});

View file

@ -0,0 +1,85 @@
define(function (require) {
var module = require('modules').get('courier/fetch');
var _ = require('lodash');
var docStrategy = require('./strategy/doc');
var searchStrategy = require('./strategy/search');
module.service('couriersFetch', function (es, Promise, couriersErrors) {
var flattenRequest = function (req) {
return req.source._flatten();
};
var fetchThese = function (strategy, requests) {
var all = requests.splice(0);
return es[strategy.clientMethod]({
body: strategy.requestStatesToBody(all.map(flattenRequest))
})
.then(function (resp) {
strategy.getResponses(resp).forEach(function (resp) {
var req = all.shift();
if (resp.error) return req.defer.reject(new couriersErrors.FetchFailure(resp));
else strategy.resolveRequest(req, resp);
});
// pass the response along to the next promise
return resp;
})
.catch(function (err) {
all.forEach(function (req) {
req.defer.reject(err);
});
throw err;
});
};
var fetchPending = function (strategy, courier) {
var requests = strategy.getPendingRequests(courier._pendingRequests);
if (!requests.length) return Promise.resolved();
else return fetchThese(strategy, requests);
};
var fetchASource = function (strategy, source) {
var defer = Promise.defer();
fetchThese(strategy, [
{
source: source,
defer: defer
}
]);
return defer.promise;
};
/**
* Fetch all pending docs that are ready to be fetched
* @param {Courier} courier - The courier to read the pending
* requests from
* @async
*/
this.docs = _.partial(fetchPending, docStrategy);
/**
* Fetch all pending search requests
* @param {Courier} courier - The courier to read the pending
* requests from
* @async
*/
this.searches = _.partial(fetchPending, searchStrategy);
/**
* Fetch a single doc source
* @param {DocSource} source - The DocSource to request
* @async
*/
this.doc = _.partial(fetchASource, docStrategy);
/**
* Fetch a single search source
* @param {SearchSource} source - The SearchSource to request
* @async
*/
this.search = _.partial(fetchASource, searchStrategy);
});
});

View file

@ -0,0 +1,71 @@
define(function (require) {
return {
clientMethod: 'mget',
/**
* Flatten a series of requests into as ES request body
* @param {array} requests - an array of flattened requests
* @return {string} - the request body
*/
requestStatesToBody: function (states) {
return {
docs: states
};
},
/**
* Fetch the multiple responses from the ES Response
* @param {object} resp - The response sent from Elasticsearch
* @return {array} - the list of responses
*/
getResponses: function (resp) {
return resp.docs;
},
/**
* Resolve a single request using a single response from an msearch
* @param {object} req - The request object, with a defer and source property
* @param {object} resp - An object from the mget response's "docs" array
* @return {Promise} - the promise created by responding to the request
*/
resolveRequest: function (req, resp) {
if (resp.found) {
req.source._storeVersion(resp._version);
} else {
req.source._clearVersion();
}
return req.defer.resolve(resp);
},
/**
* Get the doc requests from the courier that are ready to be fetched
* @return {array} - The filtered request list, pulled from
* the courier's _pendingRequests queue
*/
getPendingRequests: function (pendingRequests) {
return pendingRequests.splice(0).filter(function (req) {
// filter by type first
if (req.source._getType() !== 'doc') {
pendingRequests.push(req);
return;
}
// _getStoredVersion updates the internal
// cache returned by _getVersion, so _getVersion
// must be called first
var version = req.source._getVersion();
var storedVersion = req.source._getStoredVersion();
// conditions that equal "fetch This DOC!"
var unknownVersion = !version && !storedVersion;
var versionMismatch = version !== storedVersion;
var localVersionCleared = version && !storedVersion;
if (unknownVersion || versionMismatch || localVersionCleared) return true;
else pendingRequests.push(req);
});
}
};
});

View file

@ -0,0 +1,59 @@
define(function (require) {
return {
clientMethod: 'msearch',
/**
* Flatten a series of requests into as ES request body
* @param {array} requests - the requests to serialize
* @return {string} - the request body
*/
requestStatesToBody: function (states) {
return states.map(function (state) {
return JSON.stringify({
index: state.index,
type: state.type
})
+ '\n'
+ JSON.stringify(state.body);
}).join('\n') + '\n';
},
/**
* Fetch the multiple responses from the ES Response
* @param {object} resp - The response sent from Elasticsearch
* @return {array} - the list of responses
*/
getResponses: function (resp) {
return resp.responses;
},
/**
* Resolve a single request using a single response from an msearch
* @param {object} req - The request object, with a defer and source property
* @param {object} resp - An object from the mget response's "docs" array
* @return {Promise} - the promise created by responding to the request
*/
resolveRequest: function (req, resp) {
req.defer.resolve(resp);
},
/**
* Get the doc requests from the courier that are ready to be fetched
* @param {array} pendingRequests - The list of pending requests, from
* which the requests to make should be
* removed
* @return {array} - The filtered request list, pulled from
* the courier's _pendingRequests queue
*/
getPendingRequests: function (pendingRequests) {
return pendingRequests.splice(0).filter(function (req) {
// filter by type first
if (req.source._getType() === 'search') return true;
else pendingRequests.push(req);
});
}
};
});

View file

@ -0,0 +1,35 @@
define(function (require) {
var module = require('modules').get('courier/localcache');
var _ = require('lodash');
module.factory('LocalCache', function () {
function LocalCache(opts) {
var _id = opts.id || _.identity;
var _cache = {};
this.get = function (obj) {
var id = _id(obj);
return _.isObject(_cache[id]) ? _.cloneDeep(_cache[id]) : _cache[id];
};
this.set = function (obj, val) {
var id = _id(obj);
var clean = !_cache.hasOwnProperty(id);
_cache[id] = _.isObject(val) ? _.cloneDeep(val) : val;
return clean;
};
this.clear = function (obj) {
if (!obj) {
_cache = {};
return;
}
var id = _id(obj);
delete _cache[id];
};
}
return LocalCache;
});
});

View file

@ -0,0 +1,115 @@
define(function (require) {
var _ = require('lodash');
var module = require('modules').get('courier/looper');
module.factory('Looper', function ($timeout) {
function Looper(ms, fn) {
var _ms = ms === void 0 ? 1500 : ms;
var _fn = fn || _.noop;
var _timerId;
var _started = false;
var looper = this;
/**
* Set the number of milliseconds between
* each loop
*
* @param {integer} ms
* @chainable
*/
looper.ms = function (ms) {
_ms = ms;
looper.restart();
return this;
};
/**
* Set the function that will be executed at the
* end of each looper.
*
* @param {function} fn
* @chainable
*/
looper.fn = function (fn) {
_fn = fn;
return this;
};
/**
* Start the looping madness
*
* @chainable
*/
looper.start = function () {
looper.stop();
_started = true;
// start with a run of the loop, which sets the next run
looper._looperOver();
return this;
};
/**
* ...
*
* @chainable
*/
looper.stop = function () {
if (_timerId) _timerId = $timeout.cancel(_timerId);
_started = false;
return this;
};
/**
* Restart the looper only if it is already started.
* Called automatically when ms is changed
*
* @chainable
*/
looper.restart = function () {
if (looper.started()) {
looper.stop();
looper.start();
}
return this;
};
/**
* Is the looper currently started/running/scheduled/going to execute
*
* @return {boolean}
*/
looper.started = function () {
return !!_started;
};
/**
* Wraps _fn so that _fn can be changed
* without rescheduling and schedules
* the next itteration
*
* @private
* @return {undefined}
*/
looper._looperOver = function () {
try {
_fn();
} catch (e) {
looper.stop();
if (typeof console === 'undefined' || !console.error) {
throw e;
} else {
console.error(e.stack || e.message || e);
}
}
_timerId = _ms ? $timeout(looper._looperOver, _ms) : null;
};
}
return Looper;
});
});

View file

@ -0,0 +1,190 @@
define(function (require) {
var _ = require('lodash');
require('./local_cache');
// these fields are the only _ prefixed fields that will
// be found in the field list. All others are filtered
var reservedFields = {
_id: { type: 'string' },
_type: { type: 'string' },
_index: { type: 'string' }
};
var module = require('modules').get('courier/mapper');
module.factory('CouriersMapper', function (Promise, es, configFile, LocalCache, couriersErrors) {
var CacheWriteFailure = couriersErrors.CacheWriteFailure;
var MappingConflict = couriersErrors.MappingConflict;
var RestrictedMapping = couriersErrors.RestrictedMapping;
var FieldNotFoundInCache = couriersErrors.FieldNotFoundInCache;
function CouriersMapper(courier) {
// Save a reference to mapper
var mapper = this;
var fieldCache = new LocalCache({
id: function (dataSource) {
return dataSource.get('index');
}
});
/**
* Gets an object containing all fields with their mappings
* @param {dataSource} dataSource
* @async
*/
mapper.getFields = function (dataSource) {
if (!dataSource.get('index')) {
// always callback async
return Promise.rejected(new TypeError('dataSource must have an index before it\'s fields can be fetched'));
}
return mapper.getCachedFieldsFor(dataSource)
.catch(function () {
// If we are unable to get the fields from cache, get them from mapping instead
return mapper.getFieldsFromEsFor(dataSource)
.then(function (fields) {
fieldCache.set(dataSource, fields);
});
});
};
/**
* Gets an object containing all fields with their mappings from kibana's cache in Elasticsearch
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
mapper.getCachedFieldsFor = function (dataSource) {
// If we already have the fields in the local cache, use those
var cached = fieldCache.get(dataSource);
if (cached) return Promise.resolved(cached);
return es.getSource({
index: configFile.kibanaIndex,
type: 'mapping',
id: dataSource.get('index'),
}).then(function (fields) {
fieldCache.set(dataSource, fields);
return fieldCache.get(dataSource);
});
};
/**
* Gets an object containing all fields with their mappings directly from Elasticsearch _mapping API
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
mapper.getFieldsFromEsFor = function (dataSource, callback) {
return es.indices.getFieldMapping({
// TODO: Change index to be the resolved in some way, last three months, last hour, last year, whatever
index: dataSource.get('index'),
field: '*',
})
.then(transformFieldMappingResponse)
.then(function (fields) {
return mapper.writeFieldsToCaches(dataSource, fields);
})
.then(function () {
return fieldCache.get(dataSource);
});
};
/**
* Stores processed mappings in Elasticsearch, and in local cache
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
* @async
*/
mapper.writeFieldsToCaches = function (dataSource, fields) {
return es.index({
index: configFile.kibanaIndex,
type: 'mapping',
id: dataSource.get('index'),
body: fields
})
.then(function () {
fieldCache.set(dataSource, fields);
});
};
/**
* Clears mapping caches from elasticsearch and from local object
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
mapper.clearCache = function (dataSource) {
fieldCache.clear(dataSource);
return es.delete({
index: configFile.kibanaIndex,
type: 'mapping',
id: dataSource.get('index')
});
};
/**
* Convert the ES response into the simple map for fields to
* mappings which we will cache
*
* @param {object} response - complex, excessively nested
* object returned from ES
* @return {object} - simple object that works for all of kibana
* use-cases
*/
var transformFieldMappingResponse = function (response) {
var fields = _.cloneDeep(reservedFields);
_.each(response, function (index, indexName) {
if (indexName === configFile.kibanaIndex) return;
_.each(index.mappings, function (mappings, typeName) {
_.each(mappings, function (field, name) {
if (_.size(field.mapping) === 0 || name[0] === '_') return;
var mapping = field.mapping[_.keys(field.mapping)[0]];
mapping.type = castMappingType(mapping.type);
if (fields[name]) {
if (fields[name].type !== mapping.type) {
throw new MappingConflict(name);
}
return;
}
fields[name] = mapping;
});
});
});
return fields;
};
/**
* Accepts a mapping type, and converts it into it's js equivilent
* @param {String} type - the type from the mapping's 'type' field
* @return {String} - the most specific type that we care for
*/
var castMappingType = function (type) {
switch (type) {
case 'float':
case 'double':
case 'integer':
case 'long':
case 'short':
case 'byte':
case 'token_count':
return 'number';
case 'date':
case 'boolean':
case 'ip':
case 'attachment':
case 'geo_point':
case 'geo_shape':
return type;
default: // including 'string'
return 'string';
}
};
}
return CouriersMapper;
});
});

View file

@ -1,13 +0,0 @@
define(function (require) {
/**
* broke this out so that it could be loaded before the application is
*/
require('modules')
.get('kibana/constants')
// This stores the Kibana revision number, @REV@ is replaced by grunt.
.constant('kbnVersion', '@REV@')
// Use this for cache busting partials
.constant('cacheBust', 'cache-bust=' + Date.now());
});

View file

@ -2,9 +2,11 @@ define(function (require) {
var _ = require('lodash');
var $ = require('jquery');
var moment = require('moment');
var nextTick = require('utils/next_tick');
require('services/config');
require('services/courier');
require('config/config');
require('courier/courier');
require('notify/notify');
require('directives/info');
require('angular-bootstrap');
@ -19,11 +21,11 @@ define(function (require) {
});
})
.controller('kibana', function ($rootScope, $scope, courier, config, configFile, createNotifier, $timeout, $location) {
var notify = createNotifier({
location: 'Kibana Controller'
});
var notify = createNotifier();
$scope.apps = configFile.apps;
$rootScope.rootDataSource = courier.createSource('search')
.index('logstash-*');
function updateAppData() {
var route = $location.path().split(/\//);
@ -39,103 +41,77 @@ define(function (require) {
$scope.$on('$routeChangeSuccess', updateAppData);
$scope.$on('$routeUpdate', updateAppData);
$rootScope.rootDataSource = courier.createSource('search')
.index('logstash-*');
// this is the only way to handle uncaught route.resolve errors
$scope.$on('$routeChangeError', notify.fatal);
$scope.opts = {
activeFetchInterval: void 0,
fetchIntervals: [
{ display: '5s', val: 5000 },
{ display: '10s', val: 10000 },
{ display: '30s', val: 30000 },
{ display: '1m', val: 60000 },
{ display: '5m', val: 300000 },
{ display: '15m', val: 900000 },
{ display: '30m', val: 1.8e+6 },
{ display: '1h', val: 3.6e+6 },
{ display: '2h', val: 7.2e+6 },
{ display: '1d', val: 8.64e+7 }
]
};
$scope.configure = function () {
$scope.configureTemplateUrl = require('text!../partials/global_config.html');
};
// expose the notification services list of notifs on the $scope so that the
// notification directive can show them on the screen
$scope.notifList = notify._notifs;
// provide alternate methods for setting timeouts, which will properly trigger digest cycles
notify._setTimerFns($timeout, $timeout.cancel);
(function TODO_REMOVE() {
// stuff for testing notifications
$scope.levels = [
{ name: 'info', icon: 'info' },
{ name: 'warning', icon: 'info-circle' },
{ name: 'error', icon: 'warning' },
{ name: 'fatal', icon: 'fire' },
];
$scope.notifTest = function (type) {
var arg = 'Something happened, just thought you should know.';
var cb;
if (type === 'fatal' || type === 'error') {
arg = new Error('Ah fuck');
}
if (type === 'error') {
cb = function (resp) {
if (resp !== 'report') return;
$timeout(function () {
notify.info('Report sent, thank you for your help.');
}, 750);
};
}
notify[type](arg, cb);
};
}());
/**
* Persist current settings
* @return {[type]} [description]
*/
$scope.saveOpts = function () {
config.set('refreshInterval', $scope.opts.activeFetchInterval.val);
};
$scope.setFetchInterval = function (option) {
var opts = $scope.opts;
if (option && typeof option !== 'object') {
var val = option;
option = _.find($scope.opts.fetchIntervals, { val: val });
if (!option) {
// create a custom option for this value
option = { display: moment.duration(val).humanize(), val: val };
$scope.opts.unshift(option);
}
}
if (option === opts.activeFetchInterval) return;
opts.activeFetchInterval = option;
if (option) {
courier.fetchInterval(option.val);
} else {
courier.stop();
}
};
config.$watch('refreshInterval', $scope.setFetchInterval);
$scope.$watch('opts.activeFetchInterval', $scope.setFetchInterval);
// setup the courier
courier.on('error', function (err) {
$scope[$scope.$$phase ? '$eval' : '$apply'](function () {
notify.error(err);
});
});
$scope.$on('application.load', function () {
courier.start();
});
config.init()
.then(function () {
$scope.opts = {
activeFetchInterval: void 0,
fetchIntervals: [
{ display: 'none', val: null},
{ display: '5s', val: 5000 },
{ display: '10s', val: 10000 },
{ display: '30s', val: 30000 },
{ display: '1m', val: 60000 },
{ display: '5m', val: 300000 },
{ display: '15m', val: 900000 },
{ display: '30m', val: 1.8e+6 },
{ display: '1h', val: 3.6e+6 },
{ display: '2h', val: 7.2e+6 },
{ display: '1d', val: 8.64e+7 }
]
};
$scope.configure = function () {
$scope.configureTemplateUrl = require('text!../partials/global_config.html');
};
// expose the notification services list of notifs on the $scope so that the
// notification directive can show them on the screen
$scope.notifList = notify._notifs;
// provide alternate methods for setting timeouts, which will properly trigger digest cycles
notify._setTimerFns($timeout, $timeout.cancel);
/**
* Persist current settings
* @return {[type]} [description]
*/
$scope.saveOpts = function () {
config.set('refreshInterval', $scope.opts.activeFetchInterval.val);
};
$scope.setActiveFetchInterval = function (val) {
var option = _.find($scope.opts.fetchIntervals, { val: val });
if (option) {
$scope.opts.activeFetchInterval = option;
return;
}
// create a custom option for this value
option = { display: moment.duration(val).humanize(), val: val };
$scope.opts.fetchIntervals.unshift(option);
$scope.opts.activeFetchInterval = option;
};
$scope.activeFetchIntervalChanged = function (option, prev) {
var opts = $scope.opts;
if (option && typeof option !== 'object') {
$scope.setActiveFetchInterval(option);
return;
}
courier.fetchInterval(option.val);
};
$scope.setActiveFetchInterval(config.get('fetchInterval', null));
$scope.$on('change:config.refreshInterval', $scope.setActiveFetchInterval);
$scope.$watch('opts.activeFetchInterval', $scope.activeFetchIntervalChanged);
});
});
});

View file

@ -4,12 +4,9 @@
define(function (require) {
var angular = require('angular');
var $ = require('jquery');
var _ = require('lodash');
var scopedRequire = require('require');
var setup = require('./setup');
var configFile = require('../config');
var modules = require('modules');
var notify = require('notify/notify');
var routes = require('routes');
require('utils/rison');
require('elasticsearch');
@ -23,37 +20,35 @@ define(function (require) {
'ngRoute'
]);
kibana
// This stores the Kibana revision number, @REV@ is replaced by grunt.
.constant('kbnVersion', '@REV@')
// Use this for cache busting partials
.constant('cacheBust', 'cache-bust=' + Date.now())
// attach the route manager's known routes
.config(routes.config);
// setup routes
routes
.otherwise({
redirectTo: '/' + configFile.defaultAppId
});
// tell the modules util to add it's modules as requirements for kibana
modules.link(kibana);
// proceed once setup is complete
setup(function (err) {
kibana
// setup default routes
.config(function ($routeProvider, $provide) {
$routeProvider
.otherwise({
redirectTo: '/' + configFile.defaultAppId
});
});
require([
'controllers/kibana'
].concat(configFile.apps.map(function (app) {
return 'apps/' + app.id + '/index';
})), function bootstrap() {
$(function () {
notify.lifecycle('bootstrap');
angular
.bootstrap(document, ['kibana'])
.invoke(function () {
notify.lifecycle('bootstrap', true);
$(document.body).children().show();
});
});
require([
'controllers/kibana'
].concat(configFile.apps.map(function (app) {
return 'apps/' + app.id + '/index';
})), function bootstrap() {
$(function () {
angular
.bootstrap(document, ['kibana'])
.invoke(function ($rootScope, $route) {
$(document.body).children().show();
});
});
});
return kibana;

View file

@ -1,8 +1,17 @@
require.config({
baseUrl: './kibana',
paths: {
// components
kibana: './index',
courier: '../courier',
courier: './components/courier',
config: './components/config',
notify: './components/notify',
// special utils
routes: 'utils/routes',
modules: 'utils/modules',
// bower_components
angular: '../bower_components/angular/angular',
'angular-mocks': '../bower_components/angular-mocks/angular-mocks',
'angular-route': '../bower_components/angular-route/angular-route',
@ -18,7 +27,6 @@ require.config({
moment: '../bower_components/moment/moment',
gridster: '../bower_components/gridster/dist/jquery.gridster',
stacktrace: '../bower_components/stacktrace.js/stacktrace',
modules: 'utils/modules',
jsonpath: '../bower_components/jsonpath/lib/jsonpath',
K4D3: '../bower_components/K4D3/k4.d3',
bower_components: '../bower_components'

View file

@ -1,171 +0,0 @@
define(function (require) {
var _ = require('lodash');
var nextTick = require('utils/next_tick');
var configFile = require('../../config');
var notify = require('notify/notify');
require('services/courier');
// share doc and val cache between apps
var doc;
var vals = {};
require('modules')
.get('kibana/services')
.constant('configFile', configFile)
.service('config', function ($q, $rootScope, courier, kbnVersion, configFile) {
var watchers = {};
var unwatchers = [];
if (!doc) {
doc = courier.createSource('doc')
.index(configFile.kibanaIndex)
.type('config')
.id(kbnVersion);
} else {
// clean up after previous app
doc
.removeAllListeners('results')
.courier(courier);
}
doc.on('results', function (resp) {
if (!resp.found) return; // init should ensure it exists
_.forOwn(resp._source, function (val, key) {
if (vals[key] !== val) _change(key, val);
});
});
/******
* PUBLIC API
******/
function init() {
notify.lifecycle('config init');
var defer = $q.defer();
doc.fetch();
doc.on('results', function completeInit(resp) {
// ONLY ACT IF !resp.found
if (!resp.found) {
doc.doIndex({});
return;
}
notify.lifecycle('config init', !!resp);
doc.removeListener('results', completeInit);
defer.resolve();
});
return defer.promise;
}
function get(key) {
return vals[key];
}
function set(key, val) {
// sets a value in the config
// the es doc must be updated successfully for the update to reflect in the get api.
if (vals[key] === val) {
var defer = $q.defer();
defer.resolve(true);
return defer.promise;
}
var update = {};
update[key] = val;
return doc.doUpdate(update)
.then(function () {
_change(key, val);
return true;
})
.catch(function (err) {
throw err;
});
}
function $watch(key, onChange) {
// probably a horrible idea
if (!watchers[key]) watchers[key] = [];
watchers[key].push(onChange);
triggerWatchers(onChange, vals[key]);
return function un$watcher() {
_.pull(watchers[key], onChange);
};
}
function $bindToScope($scope, key, opts) {
var configWatcher = function (val) {
if (opts && val === void 0) val = opts['default'];
$scope[key] = val;
};
var first = true;
var scopeWatcher = function (newVal) {
if (first) return first = false;
set(key, newVal);
};
// collect unwatch/listen functions and automatically
// run them when $scope is destroyed
var unwatchScope = $scope.$watch(key, scopeWatcher);
var unwatchConfig = $watch(key, configWatcher);
var unlisten = $scope.$on('$destroy', unwatch);
unwatchers.push(unwatch);
function unwatch() {
unwatchScope();
unwatchConfig();
unlisten();
_.pull(unwatchers, unwatch);
}
// return the unwatch function so users can unwatch manually
return unwatch;
}
function close() {
watchers = null;
unwatchers.forEach(function (unwatcher) {
unwatcher();
});
}
// expose public API on the instance
this.init = init;
this.close = close;
this.get = get;
this.set = set;
this.$bind = $bindToScope;
this.$watch = $watch;
/*******
* PRIVATE API
*******/
function _change(key, val) {
notify.log('config change: ' + key + ': ' + vals[key] + ' -> ' + val);
triggerWatchers(watchers[key], val, vals[key]);
vals[key] = val;
}
function triggerWatchers(fns, cur, prev) {
if ($rootScope.$$phase) {
// reschedule for next tick
nextTick(triggerWatchers, fns, cur, prev);
return;
}
var isArr = _.isArray(fns);
if (!fns || (isArr && !fns.length)) return;
$rootScope.$apply(function () {
if (!isArr) return fns(cur, prev);
fns.forEach(function (onChange) {
onChange(cur, prev);
});
});
}
});
});

View file

@ -1,40 +0,0 @@
define(function (require) {
var Courier = require('courier/courier');
var DataSource = require('courier/data_source/data_source');
var DocSource = require('courier/data_source/doc');
var errors = require('courier/errors');
require('services/promises');
require('services/es');
var courier; // share the courier amoungst all of the apps
require('modules')
.get('kibana/services')
.service('courier', function (es, $rootScope, promises, configFile) {
if (courier) return courier;
promises.playNice(DataSource.prototype, [
'getFields',
'clearFieldCache'
]);
promises.playNice(DocSource.prototype, [
'doUpdate',
'doIndex'
]);
courier = new Courier({
fetchInterval: 15000,
client: es,
internalIndex: configFile.kibanaIndex
});
courier.errors = errors;
courier.rootSearchSource = courier
.createSource('search')
.$scope($rootScope);
return courier;
});
});

View file

@ -1,39 +1,68 @@
define(function (require, module, exports) {
define(function (require) {
var _ = require('lodash');
require('modules')
.get('kibana/services')
.service('promises', function ($q) {
var module = require('modules').get('kibana/promises');
function playNice(fn, fns) {
if (fns && _.isArray(fns) && _.isObject(fn)) {
fns.forEach(function (method) {
fn[method] = playNice(fn[method]);
});
return fn;
}
return function playNiceWrapper() {
// if the last arg is a callback then don't do anything
if (typeof arguments[arguments.length - 1] === 'function') {
return fn.apply(this, arguments);
}
// otherwise create a callback and pass it in
var args = Array.prototype.slice.call(arguments);
var defer = $q.defer();
args.push(function (err, result) {
if (err) return defer.reject(err);
defer.resolve(result);
});
fn.apply(this, args);
return defer.promise;
};
module.service('promises', function ($q) {
function playNice(fn, fns) {
if (fns && _.isArray(fns) && _.isObject(fn)) {
fns.forEach(function (method) {
fn[method] = playNice(fn[method]);
});
return fn;
}
return function playNiceWrapper() {
// if the last arg is a callback then don't do anything
if (typeof arguments[arguments.length - 1] === 'function') {
return fn.apply(this, arguments);
}
return {
playNice: playNice
// otherwise create a callback and pass it in
var args = Array.prototype.slice.call(arguments);
var defer = $q.defer();
args.push(function (err, result) {
if (err) return defer.reject(err);
defer.resolve(result);
});
fn.apply(this, args);
return defer.promise;
};
});
}
return {
playNice: playNice
};
});
// Provides a tiny subset of the excelent API from
// bluebird, reimplemented using the $q service
module.service('Promise', function ($q) {
function Promise(fn) {
var defer = $q.defer();
try {
fn(defer.resolve, defer.reject, defer);
} catch (e) {
defer.reject(e);
}
return defer.promise;
}
Promise.all = $q.all;
Promise.resolved = function (val) {
var defer = $q.defer();
defer.resolve(val);
return defer.promise;
};
Promise.rejected = function (reason) {
var defer = $q.defer();
defer.reject(reason);
return defer.promise;
};
Promise.cast = $q.when;
Promise.defer = $q.defer;
return Promise;
});
});

View file

@ -0,0 +1,91 @@
define(function (require) {
var _ = require('lodash');
var $ = require('jquery');
var module = require('modules').get('kibana/setup');
module.service('setup', function (Promise, createNotifier, es, config, configFile) {
var notify = createNotifier({
location: 'Setup'
});
this.bootstrap = _.once(function () {
notify.lifecycle('bootstrap');
var kibanaIndexExists = false;
function tmplError(err, tmpl) {
var err2 = new Error(_.template(tmpl, { configFile: configFile }));
err2.origError = err;
return err2;
}
function checkForES() {
notify.lifecycle('es check');
return es.ping()
.catch(function () {
throw new Error('Unable to connect to Elasticsearch at "' + configFile.elasticsearch + '"');
})
.finally(function () {
notify.lifecycle('es check', true);
});
}
function checkForKibanaIndex() {
notify.lifecycle('kibana index check');
return es.indices.exists({
index: configFile.kibanaIndex
})
.then(function (exists) {
kibanaIndexExists = !!exists;
})
.catch(function (err) {
throw tmplError(err, 'Unable to check for Kibana index "<%= configFile.kibanaIndex %>"');
})
.finally(function () {
notify.lifecycle('kibana index check', kibanaIndexExists);
});
}
// create the index if it doens't exist already
function createKibanaIndex() {
if (kibanaIndexExists) return Promise.resolved();
notify.lifecycle('create kibana index');
return es.indices.create({
index: configFile.kibanaIndex,
body: {
settings: {
mappings: {
mappings: {
_source: {
enabled: false
},
properties: {
type: {
type: 'string',
index: 'not_analyzed'
}
}
}
}
}
}
})
.catch(function (err) {
throw tmplError(err, 'Unable to create Kibana index "<%= configFile.kibanaIndex %>"');
})
.finally(function () {
notify.lifecycle('create kibana index', true);
});
}
return checkForES()
.then(checkForKibanaIndex)
.then(createKibanaIndex)
.then(config.init)
.finally(function () {
notify.lifecycle('bootstrap', true);
});
});
});
});

View file

@ -1,139 +0,0 @@
define(function (require) {
var angular = require('angular');
var async = require('async');
var $ = require('jquery');
var _ = require('lodash');
var configFile = require('../config');
var nextTick = require('utils/next_tick');
var modules = require('modules');
/**
* Setup the kibana application, ensuring that the kibanaIndex exists,
* and perform any migration of data that is required.
*
* @param {function} done - callback
*/
return function prebootSetup(done) {
// load angular deps
require([
'notify/notify',
'services/es',
'services/config',
'constants/base'
], function (notify) {
$(function () {
// create the setup module, it should require the same things
// that kibana currently requires, which should only include the
// loaded modules
var setup = modules.get('setup');
var appEl = document.createElement('div');
var kibanaIndexExists;
modules.link(setup);
setup
.value('configFile', configFile);
angular
.bootstrap(appEl, ['setup'])
.invoke(function (es, config) {
// init the setup module
async.series([
async.apply(checkForES, es),
async.apply(checkForKibanaIndex, es),
async.apply(createKibanaIndex, es),
async.apply(checkForCurrentConfigDoc, es),
async.apply(initConfig, config)
], function (err) {
// ready to go, remove the appEl, close services and boot be done
angular.element(appEl).remove();
// linked modules should no longer depend on this module
setup.close();
if (err) throw err;
return done(err);
});
});
function wrapError(err, tmpl) {
// if we pass a callback
if (typeof err === 'function') {
var cb = err; // wrap it
return function (err) {
cb(wrapError(err, tmpl));
};
}
// if an error didn't actually occur
if (!err) return void 0;
var err2 = new Error(_.template(tmpl, { configFile: configFile }));
err2.origError = err;
return err2;
}
function checkForES(es, done) {
notify.lifecycle('es check');
es.ping(function (err, alive) {
notify.lifecycle('es check', alive);
done(alive ? void 0 : new Error('Unable to connect to Elasticsearch at "' + configFile.elasticsearch + '"'));
});
}
function checkForKibanaIndex(es, done) {
notify.lifecycle('kibana index check');
es.indices.exists({
index: configFile.kibanaIndex
}, function (err, exists) {
notify.lifecycle('kibana index check', !!exists);
kibanaIndexExists = exists;
done(wrapError(err, 'Unable to check for Kibana index "<%= configFile.kibanaIndex %>"'));
});
}
// create the index if it doens't exist already
function createKibanaIndex(es, done) {
if (kibanaIndexExists) return done();
notify.lifecycle('create kibana index');
es.indices.create({
index: configFile.kibanaIndex,
body: {
settings: {
mappings: {
mappings: {
_source: {
enabled: false
},
properties: {
type: {
type: 'string',
index: 'not_analyzed'
}
}
}
}
}
}
}, function (err) {
notify.lifecycle('create kibana index', !err);
done(wrapError(err, 'Unable to create Kibana index "<%= configFile.kibanaIndex %>"'));
});
}
// if the index is brand new, no need to see if it is out of data
function checkForCurrentConfigDoc(es, done) {
if (!kibanaIndexExists) return done();
// callbacks should always be called async
nextTick(done);
}
function initConfig(config, done) {
config.init().then(function () { done(); }, done);
}
});
});
};
});

View file

@ -2,16 +2,6 @@ define(function (require) {
var _ = require('lodash');
_.mixin({
// return an object, which is an indexed version of the list
// using the property as the key
indexBy: function (list, prop) {
return _.transform(list, function (indexed, obj) {
var key = obj && obj[prop];
if (obj) {
indexed[key] = obj;
}
}, {});
},
move: function (array, fromIndex, toIndex) {
array.splice(toIndex, 0, array.splice(fromIndex, 1)[0]);
return array;
@ -31,6 +21,20 @@ define(function (require) {
array.push(value);
}
return array;
},
flattenWith: function (dot, nestedObj) {
var key; // original key
var stack = []; // track key stack
var flatObj = {};
(function flattenObj(obj) {
_.keys(obj).forEach(function (key) {
stack.push(key);
if (typeof obj[key] === 'object') flattenObj(obj[key]);
else flatObj[stack.join(dot)] = obj[key];
stack.pop();
});
}(nestedObj));
return flatObj;
}
});

View file

@ -0,0 +1,45 @@
define(function (require) {
var angular = require('angular');
var _ = require('lodash');
var deferReady;
var when = {};
var otherwise;
require('services/setup');
return {
when: function (path, route) {
if (route.resolve) {
route.resolve = _.mapValues(route.resolve, function (expr, name) {
return function (setup, $injector) {
return setup.bootstrap()
.then(function () {
return $injector[angular.isString(expr) ? 'get': 'invoke'](expr);
});
};
});
} else if (!route.redirectTo) {
route.resolve = {
bootstrap: function (setup) {
return setup.bootstrap();
}
};
}
when[path] = route;
return this;
},
otherwise: function (route) {
otherwise = route;
},
ready: function () {
return deferReady();
},
config: function ($routeProvider, $injector) {
_.forOwn(when, function (route, path) {
$routeProvider.when(path, route);
});
if (otherwise) {
$routeProvider.otherwise(otherwise);
}
}
};
});

View file

@ -1,6 +1,6 @@
define(function (require) {
var Courier = require('courier/courier');
var DataSource = require('courier/data_source/data_source');
var DataSource = require('courier/data_source/abstract');
var DocSource = require('courier/data_source/doc');
var SearchSource = require('courier/data_source/search');