mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
commit
9d7ed10aaf
15 changed files with 805 additions and 621 deletions
50
scratch.js
50
scratch.js
|
@ -1,50 +0,0 @@
|
|||
/* jshint node: true */
|
||||
var elasticsearch = require('../elasticsearch-js');
|
||||
var async = require('async');
|
||||
|
||||
var es = new elasticsearch.Client({
|
||||
host: 'localhost:9200',
|
||||
sniffOnStart: true,
|
||||
sniffInterval: 3000,
|
||||
apiVersion: '1.0',
|
||||
log: 'trace'
|
||||
});
|
||||
|
||||
var rl = require('readline').createInterface({
|
||||
input: process.stdin,
|
||||
output: process.stdout,
|
||||
terminal: true
|
||||
});
|
||||
|
||||
async.series([
|
||||
function (done) {
|
||||
setTimeout(done, 50);
|
||||
},
|
||||
function (done) {
|
||||
console.log(es.transport.connectionPool._conns.index);
|
||||
es.indices.create({
|
||||
index: 'index_name'
|
||||
}, done);
|
||||
},
|
||||
function (done) {
|
||||
rl.question('Is the master down?', function () {
|
||||
done();
|
||||
});
|
||||
},
|
||||
function (done) {
|
||||
console.log(es.transport.connectionPool._conns.index);
|
||||
es.search({ index: 'index_name' }, done);
|
||||
},
|
||||
function (done) {
|
||||
rl.question('Is the slave down now?', function () {
|
||||
es.search({ body: { query: { match_all: {} } } }, done);
|
||||
});
|
||||
},
|
||||
function (done) {
|
||||
rl.question('Is the master back up?', function () {
|
||||
es.search({ body: { query: { match_all: {} } } }, done);
|
||||
});
|
||||
}
|
||||
], function (err) {
|
||||
console.log(err);
|
||||
});
|
|
@ -1,287 +1,257 @@
|
|||
define(function (require) {
|
||||
|
||||
var DataSource = require('courier/data_source');
|
||||
var Docs = require('courier/docs');
|
||||
var EventEmitter = require('utils/event_emitter');
|
||||
var inherits = require('utils/inherits');
|
||||
var errors = require('courier/errors');
|
||||
var _ = require('lodash');
|
||||
var angular = require('angular');
|
||||
|
||||
function chain(cntx, method) {
|
||||
return function () {
|
||||
method.apply(cntx, arguments);
|
||||
return this;
|
||||
};
|
||||
}
|
||||
var DocSource = require('courier/data_source/doc');
|
||||
var SearchSource = require('courier/data_source/search');
|
||||
var HastyRefresh = require('courier/errors').HastyRefresh;
|
||||
|
||||
function emitError(source, courier, error) {
|
||||
if (EventEmitter.listenerCount(source, 'error')) {
|
||||
source.emit('error', error);
|
||||
} else {
|
||||
courier.emit('error', error);
|
||||
}
|
||||
}
|
||||
// map constructors to type keywords
|
||||
var sourceTypes = {
|
||||
doc: DocSource,
|
||||
search: SearchSource
|
||||
};
|
||||
|
||||
function mergeProp(state, filters, val, key) {
|
||||
switch (key) {
|
||||
case 'inherits':
|
||||
case '_type':
|
||||
// ignore
|
||||
return;
|
||||
case 'filter':
|
||||
filters.push(val);
|
||||
return;
|
||||
case 'index':
|
||||
case 'type':
|
||||
if (key && state[key] == null) {
|
||||
state[key] = val;
|
||||
// fetch process for the two source types
|
||||
var onFetch = {
|
||||
// execute a search right now
|
||||
search: function (courier) {
|
||||
if (courier._activeSearchRequest) {
|
||||
return courier._error(new HastyRefresh());
|
||||
}
|
||||
return;
|
||||
case 'source':
|
||||
key = '_source';
|
||||
/* fall through */
|
||||
}
|
||||
courier._activeSearchRequest = SearchSource.fetch(
|
||||
courier,
|
||||
courier._refs.search,
|
||||
function (err) {
|
||||
if (err) return courier._error(err);
|
||||
});
|
||||
},
|
||||
|
||||
if (key && state.body[key] == null) {
|
||||
state.body[key] = val;
|
||||
}
|
||||
}
|
||||
|
||||
function flattenDataSource(source) {
|
||||
var state = {
|
||||
body: {}
|
||||
};
|
||||
|
||||
// all of the filters from the source chain
|
||||
var filters = [];
|
||||
var collectProp = _.partial(mergeProp, state, filters);
|
||||
|
||||
// walk the chain and merge each property
|
||||
var current = source;
|
||||
var currentState;
|
||||
while (current) {
|
||||
currentState = current._state();
|
||||
_.forOwn(currentState, collectProp);
|
||||
current = currentState.inherits;
|
||||
}
|
||||
|
||||
// defaults for the query
|
||||
_.forOwn({
|
||||
query: {
|
||||
'match_all': {}
|
||||
}
|
||||
}, collectProp);
|
||||
|
||||
// switch to filtered query if there are filters
|
||||
if (filters.length) {
|
||||
state.body.query = {
|
||||
filtered: {
|
||||
query: state.body.query,
|
||||
filter: {
|
||||
bool: {
|
||||
must: filters
|
||||
}
|
||||
}
|
||||
// validate that all of the DocSource objects are up to date
|
||||
// then fetch the onces that are not
|
||||
doc: function (courier) {
|
||||
DocSource.validate(courier, courier._refs.doc, function (err, invalid) {
|
||||
if (err) {
|
||||
courier.stop();
|
||||
return courier.emit('error', err);
|
||||
}
|
||||
};
|
||||
|
||||
// if all of the docs are up to date we don't need to do anything else
|
||||
if (invalid.length === 0) return;
|
||||
|
||||
DocSource.fetch(courier, invalid, function (err) {
|
||||
if (err) return courier._error(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
function fetchSearchResults(courier, client, sources, cb) {
|
||||
if (!client) {
|
||||
this.emit('error', new Error('Courier does not have a client yet, unable to fetch queries.'));
|
||||
return;
|
||||
}
|
||||
|
||||
var all = [];
|
||||
var body = '';
|
||||
_.each(sources, function (source) {
|
||||
if (source.getType() !== 'search') {
|
||||
return;
|
||||
}
|
||||
all.push(source);
|
||||
|
||||
var state = flattenDataSource(source);
|
||||
var header = JSON.stringify({
|
||||
index: state.index,
|
||||
type: state.type
|
||||
});
|
||||
var doc = JSON.stringify(state.body);
|
||||
|
||||
body += header + '\n' + doc + '\n';
|
||||
});
|
||||
|
||||
return client.msearch({ body: body }, function (err, resp) {
|
||||
if (err) return cb(err);
|
||||
|
||||
_.each(resp.responses, function (resp, i) {
|
||||
var source = sources[i];
|
||||
if (resp.error) return emitError(source, courier, resp);
|
||||
source.emit('results', resp);
|
||||
});
|
||||
|
||||
cb(err, resp);
|
||||
});
|
||||
}
|
||||
|
||||
function fetchDocs(courier, client, sources, cb) {
|
||||
if (!client) {
|
||||
this.emit('error', new Error('Courier does not have a client yet, unable to fetch queries.'));
|
||||
return;
|
||||
}
|
||||
|
||||
var all = [];
|
||||
var body = {
|
||||
docs: []
|
||||
};
|
||||
|
||||
_.each(sources, function (source) {
|
||||
if (source.getType() !== 'get') {
|
||||
return;
|
||||
}
|
||||
|
||||
all.push(source);
|
||||
|
||||
var state = flattenDataSource(source);
|
||||
body.docs.push({
|
||||
index: state.index,
|
||||
type: state.type,
|
||||
id: state.id
|
||||
});
|
||||
});
|
||||
|
||||
return client.mget({ body: body }, function (err, resp) {
|
||||
if (err) return cb(err);
|
||||
|
||||
_.each(resp.responses, function (resp, i) {
|
||||
var source = sources[i];
|
||||
if (resp.error) return emitError(source, courier, resp);
|
||||
source.emit('results', resp);
|
||||
});
|
||||
|
||||
cb(err, resp);
|
||||
});
|
||||
}
|
||||
|
||||
function saveUpdate(source, fields) {
|
||||
|
||||
}
|
||||
// default config values
|
||||
var defaults = {
|
||||
fetchInterval: 30000,
|
||||
docInterval: 2500
|
||||
};
|
||||
|
||||
/**
|
||||
* Federated query service, supports data sources that inherit properties
|
||||
* from one another and automatically emit results.
|
||||
* Federated query service, supports two data source types: doc and search.
|
||||
*
|
||||
* search:
|
||||
* - inherits filters, and other query properties
|
||||
* - automatically emit results on a set interval
|
||||
* doc:
|
||||
* - tracks doc versions
|
||||
* - emits same results event when the doc is updated
|
||||
* - helps seperate versions of kibana running on the same machine stay in sync
|
||||
* - (NI) tracks version and uses it when new versions of a doc are reindexed
|
||||
* - (NI) helps deal with conflicts
|
||||
*
|
||||
* @param {object} config
|
||||
* @param {Client} config.client - The elasticsearch.js client to use for querying. Should be setup and ready to go.
|
||||
* @param {integer} [config.fetchInterval=30000] - The amount in ms between each fetch (deafult is 30 seconds)
|
||||
* @param {Client} config.client - The elasticsearch.js client to use for querying. Should be
|
||||
* setup and ready to go.
|
||||
* @param {EsClient} [config.client] - The elasticsearch client that the courier should use
|
||||
* (can be set at a later time with the `.client()` method)
|
||||
* @param {integer} [config.fetchInterval=30000] - The amount in ms between each fetch (deafult
|
||||
* is 30 seconds)
|
||||
*/
|
||||
function Courier(config) {
|
||||
if (!(this instanceof Courier)) return new Courier(config);
|
||||
var opts = {
|
||||
fetchInterval: 30000
|
||||
};
|
||||
var fetchTimer;
|
||||
var activeRequest;
|
||||
var courier = this;
|
||||
var sources = {
|
||||
search: [],
|
||||
get: []
|
||||
};
|
||||
|
||||
function doSearch() {
|
||||
if (!opts.client) {
|
||||
this.emit('error', new Error('Courier does not have a client, pass it ' +
|
||||
'in to the constructor or set it with the .client() method'));
|
||||
return;
|
||||
}
|
||||
if (activeRequest) {
|
||||
activeRequest.abort();
|
||||
stopFetching();
|
||||
this.emit('error', new errors.HastyRefresh());
|
||||
return;
|
||||
}
|
||||
config = _.defaults(config || {}, defaults);
|
||||
|
||||
// we need to catch the original promise in order to keep it's abort method
|
||||
activeRequest = fetchSearchResults(courier, opts.client, sources.search, function (err, resp) {
|
||||
activeRequest = null;
|
||||
setFetchTimeout();
|
||||
this._client = config.client;
|
||||
|
||||
if (err) {
|
||||
window.console && console.log(err);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function setFetchTimeout() {
|
||||
clearTimeout(fetchTimer);
|
||||
if (opts.fetchInterval) {
|
||||
fetchTimer = setTimeout(doSearch, opts.fetchInterval);
|
||||
} else {
|
||||
fetchTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
function stopFetching(type) {
|
||||
clearTimeout(fetchTimer);
|
||||
}
|
||||
|
||||
// start using a DataSource in fetches/updates
|
||||
function openDataSource(source) {
|
||||
var type = source.getType();
|
||||
if (~sources[type].indexOf(source)) return false;
|
||||
sources[type].push(source);
|
||||
}
|
||||
|
||||
// stop using a DataSource in fetches/updates
|
||||
function closeDataSource(source) {
|
||||
var type = source.getType();
|
||||
var i = sources[type].indexOf(source);
|
||||
if (i === -1) return;
|
||||
sources[type].slice(i, 1);
|
||||
// only search DataSources get fetched automatically
|
||||
if (type === 'search' && sources.search.length === 0) stopFetching();
|
||||
}
|
||||
|
||||
// has the courier been started?
|
||||
function isRunning() {
|
||||
return !!fetchTimer;
|
||||
}
|
||||
|
||||
// chainable public api
|
||||
this.start = chain(this, doSearch);
|
||||
this.running = chain(this, isRunning);
|
||||
this.stop = chain(this, stopFetching);
|
||||
this.close = chain(this, function () { _(sources.search).each(closeDataSource); });
|
||||
this.openDataSource = chain(this, openDataSource);
|
||||
this.closeDataSource = chain(this, closeDataSource);
|
||||
|
||||
// setters
|
||||
this.client = chain(this, function (client) {
|
||||
opts.client = client;
|
||||
// array's to store references to individual sources of each type
|
||||
// wrapped in some metadata
|
||||
this._refs = _.transform(sourceTypes, function (refs, fn, type) {
|
||||
refs[type] = [];
|
||||
});
|
||||
this.fetchInterval = function (val) {
|
||||
opts.fetchInterval = val;
|
||||
if (isRunning()) setFetchTimeout();
|
||||
return this;
|
||||
};
|
||||
|
||||
// factory
|
||||
this.createSource = function (type, initialState) {
|
||||
return new DataSource(this, type, initialState);
|
||||
};
|
||||
// stores all timer ids
|
||||
this._timer = {};
|
||||
|
||||
// apply the passed in config
|
||||
_.each(config || {}, function (val, key) {
|
||||
if (typeof this[key] !== 'function') throw new TypeError('invalid config "' + key + '"');
|
||||
this[key](val);
|
||||
// interval times for each type
|
||||
this._interval = {};
|
||||
|
||||
// interval hook/fn for each type
|
||||
this._onInterval = {};
|
||||
|
||||
_.each(sourceTypes, function (fn, type) {
|
||||
var courier = this;
|
||||
// the name used outside of this module
|
||||
var publicName;
|
||||
if (type === 'search') {
|
||||
publicName = 'fetchInterval';
|
||||
} else {
|
||||
publicName = type + 'Interval';
|
||||
}
|
||||
|
||||
// store the config value passed in for this interval
|
||||
this._interval[type] = config[publicName];
|
||||
|
||||
// store a quick "bound" method for triggering
|
||||
this._onInterval[type] = function () {
|
||||
onFetch[type](courier);
|
||||
courier._schedule(type);
|
||||
};
|
||||
|
||||
// create a public setter for this interval type
|
||||
this[publicName] = function (val) {
|
||||
courier._interval[type] = val;
|
||||
courier._schedule(type);
|
||||
return this;
|
||||
};
|
||||
}, this);
|
||||
}
|
||||
inherits(Courier, EventEmitter);
|
||||
|
||||
// private api, exposed for testing
|
||||
Courier._flattenDataSource = flattenDataSource;
|
||||
/**
|
||||
* PUBLIC API
|
||||
*/
|
||||
|
||||
// start fetching results on an interval
|
||||
Courier.prototype.start = function () {
|
||||
if (!this.running()) {
|
||||
this._schedule('doc');
|
||||
this._schedule('search');
|
||||
this.fetch();
|
||||
}
|
||||
return this;
|
||||
};
|
||||
|
||||
// is the courier currently running?
|
||||
Courier.prototype.running = function () {
|
||||
return !!this._fetchTimer;
|
||||
};
|
||||
|
||||
// stop the courier from fetching more results
|
||||
Courier.prototype.stop = function () {
|
||||
this._clearScheduled('search');
|
||||
this._clearScheduled('doc');
|
||||
};
|
||||
|
||||
// close the courier, stopping it from refreshing and
|
||||
// closing all of the sources
|
||||
Courier.prototype.close = function () {
|
||||
_.each(sourceTypes, function (fn, type) {
|
||||
this._refs[type].forEach(function (ref) {
|
||||
this._closeDataSource(ref.source);
|
||||
}, this);
|
||||
}, this);
|
||||
};
|
||||
|
||||
// force a fetch of all datasources right now
|
||||
Courier.prototype.fetch = function () {
|
||||
_.forOwn(onFetch, function (method, type) {
|
||||
method(this);
|
||||
}, this);
|
||||
};
|
||||
|
||||
// data source factory
|
||||
Courier.prototype.createSource = function (type, initialState) {
|
||||
type = type || 'search';
|
||||
if ('function' !== typeof sourceTypes[type]) throw new TypeError(
|
||||
'Invalid source type ' + type
|
||||
);
|
||||
var Constructor = sourceTypes[type];
|
||||
return new Constructor(this, initialState);
|
||||
};
|
||||
|
||||
/*****
|
||||
* PRIVATE API
|
||||
*****/
|
||||
|
||||
// handle errors in a standard way. The only errors that should make it here are
|
||||
// - issues with msearch/mget syntax
|
||||
// - unable to reach ES
|
||||
// - HastyRefresh
|
||||
Courier.prototype._error = function (err) {
|
||||
this.stop();
|
||||
return this.emit('error', err);
|
||||
};
|
||||
|
||||
// every time a child object (DataSource, Mapper) needs the client, it should
|
||||
// call _getClient
|
||||
Courier.prototype._getClient = function () {
|
||||
if (!this._client) throw new Error('Client is not set on the Courier yet.');
|
||||
return this._client;
|
||||
};
|
||||
|
||||
// start using a DocSource in fetches/updates
|
||||
Courier.prototype._openDataSource = function (source) {
|
||||
var refs = this._refs[source._getType()];
|
||||
if (!_.find(refs, { source: source })) {
|
||||
refs.push({
|
||||
source: source
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// stop using a DataSource in fetches/updates
|
||||
Courier.prototype._closeDataSource = function (source) {
|
||||
var type = source._getType();
|
||||
var refs = this._refs[type];
|
||||
_(refs).where({ source: source }).each(_.partial(_.pull, refs));
|
||||
if (refs.length === 0) this._clearScheduled(type);
|
||||
};
|
||||
|
||||
// schedule a fetch after fetchInterval
|
||||
Courier.prototype._schedule = function (type) {
|
||||
this._clearScheduled(type);
|
||||
if (this._interval[type]) {
|
||||
this._timer[type] = setTimeout(this._onInterval[type], this._interval[type]);
|
||||
}
|
||||
};
|
||||
|
||||
// properly clear scheduled fetches
|
||||
Courier.prototype._clearScheduled = function (type) {
|
||||
this._timer[type] = clearTimeout(this._timer[type]);
|
||||
};
|
||||
|
||||
// alert the courior that a doc has been updated
|
||||
// and that it should update matching docs
|
||||
Courier.prototype._docUpdated = function (source) {
|
||||
var updated = source._state;
|
||||
|
||||
_.each(this._refs.doc, function (ref) {
|
||||
var state = ref.source._state;
|
||||
if (
|
||||
state === updated
|
||||
|| (
|
||||
state.id === updated.id
|
||||
&& state.type === updated.type
|
||||
&& state.index === updated.index
|
||||
)
|
||||
) {
|
||||
delete ref.version;
|
||||
}
|
||||
});
|
||||
|
||||
onFetch.doc(this);
|
||||
};
|
||||
|
||||
return Courier;
|
||||
});
|
|
@ -1,128 +0,0 @@
|
|||
define(function (require) {
|
||||
var inherits = require('utils/inherits');
|
||||
var _ = require('lodash');
|
||||
var EventEmitter = require('utils/event_emitter');
|
||||
var Mapper = require('courier/mapper');
|
||||
var IndexPattern = require('courier/index_pattern');
|
||||
|
||||
// polyfill for older versions of node
|
||||
function listenerCount(emitter, event) {
|
||||
if (EventEmitter.listenerCount) {
|
||||
return EventEmitter.listenerCount(emitter, event);
|
||||
} else {
|
||||
return this.listeners(event).length;
|
||||
}
|
||||
}
|
||||
|
||||
var apiMethods = {
|
||||
search: [
|
||||
'index',
|
||||
'type',
|
||||
'query',
|
||||
'filter',
|
||||
'sort',
|
||||
'highlight',
|
||||
'aggs',
|
||||
'from',
|
||||
'size',
|
||||
'source',
|
||||
'inherits'
|
||||
],
|
||||
get: [
|
||||
'index',
|
||||
'type',
|
||||
'id',
|
||||
'sourceInclude',
|
||||
'sourceExclude'
|
||||
]
|
||||
};
|
||||
|
||||
function DataSource(courier, type, initialState) {
|
||||
var state;
|
||||
|
||||
if (initialState) {
|
||||
// state can be serialized as JSON, and passed back in to restore
|
||||
if (typeof initialState === 'string') {
|
||||
state = JSON.parse(initialState);
|
||||
} else {
|
||||
state = _.cloneDeep(initialState);
|
||||
}
|
||||
if (state._type) {
|
||||
if (type && type !== state._type) {
|
||||
throw new Error('Initial state is not of the type specified for this DataSource');
|
||||
} else {
|
||||
type = state._type;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
state = {};
|
||||
}
|
||||
|
||||
type = type || 'search';
|
||||
if (!_.has(apiMethods, type)) {
|
||||
throw new TypeError('Invalid DataSource type ' + type);
|
||||
}
|
||||
state._type = type;
|
||||
|
||||
var mapper = new Mapper();
|
||||
|
||||
var onNewListener = _.bind(function (name) {
|
||||
// new newListener is emitted before it is added, count will be 0
|
||||
if (name !== 'results' || listenerCount(this, 'results') !== 0) return;
|
||||
courier.openDataSource(this);
|
||||
this.removeListener('newListener', onNewListener);
|
||||
this.on('removeListener', onRemoveListener);
|
||||
}, this);
|
||||
|
||||
var onRemoveListener = _.bind(function () {
|
||||
if (listenerCount(this, 'results') > 0) return;
|
||||
courier.closeDataSource(this);
|
||||
this.removeListener('removeListener', onRemoveListener);
|
||||
this.on('newListener', onNewListener);
|
||||
}, this);
|
||||
|
||||
this.on('newListener', onNewListener);
|
||||
|
||||
/**
|
||||
* Used to flatten a chain of DataSources
|
||||
* @return {object} - simple object containing all of the
|
||||
* sources internal state
|
||||
*/
|
||||
this._state = function () {
|
||||
return state;
|
||||
};
|
||||
|
||||
// public api
|
||||
this.toJSON = function () {
|
||||
return _.omit(state, 'inherits');
|
||||
};
|
||||
this.toString = function () {
|
||||
return JSON.stringify(this.toJSON());
|
||||
};
|
||||
this.getFieldNames = function (cb) {
|
||||
mapper.getMapping(state.index, state.type, function (mapping) {
|
||||
return _.keys(mapping);
|
||||
});
|
||||
};
|
||||
this.getType = function () {
|
||||
return state._type;
|
||||
};
|
||||
this.extend = function () {
|
||||
return courier.createSource(type).inherits(this);
|
||||
};
|
||||
|
||||
// get/set internal state values
|
||||
apiMethods[type].forEach(function (name) {
|
||||
this[name] = function (val) {
|
||||
state[name] = val;
|
||||
if (name === 'index' && arguments[1]) {
|
||||
state.index = new IndexPattern(val, arguments[1]);
|
||||
}
|
||||
return this;
|
||||
};
|
||||
}, this);
|
||||
}
|
||||
inherits(DataSource, EventEmitter);
|
||||
|
||||
return DataSource;
|
||||
});
|
160
src/courier/data_source/data_source.js
Normal file
160
src/courier/data_source/data_source.js
Normal file
|
@ -0,0 +1,160 @@
|
|||
define(function (require) {
|
||||
var inherits = require('utils/inherits');
|
||||
var _ = require('lodash');
|
||||
var EventEmitter = require('utils/event_emitter');
|
||||
var Mapper = require('courier/mapper');
|
||||
var IndexPattern = require('courier/index_pattern');
|
||||
|
||||
function DataSource(courier, initialState) {
|
||||
var state;
|
||||
|
||||
EventEmitter.call(this);
|
||||
|
||||
// state can be serialized as JSON, and passed back in to restore
|
||||
if (initialState) {
|
||||
if (typeof initialState === 'string') {
|
||||
state = JSON.parse(initialState);
|
||||
} else {
|
||||
state = _.cloneDeep(initialState);
|
||||
}
|
||||
} else {
|
||||
state = {};
|
||||
}
|
||||
|
||||
this._state = state;
|
||||
this._courier = courier;
|
||||
|
||||
var onNewListener = _.bind(function (name) {
|
||||
// new newListener is emitted before it is added, count will be 0
|
||||
if (name !== 'results' || EventEmitter.listenerCount(this, 'results') !== 0) return;
|
||||
courier._openDataSource(this);
|
||||
this.removeListener('newListener', onNewListener);
|
||||
this.on('removeListener', onRemoveListener);
|
||||
}, this);
|
||||
|
||||
var onRemoveListener = _.bind(function () {
|
||||
if (EventEmitter.listenerCount(this, 'results') > 0) return;
|
||||
courier._closeDataSource(this);
|
||||
this.removeListener('removeListener', onRemoveListener);
|
||||
this.on('newListener', onNewListener);
|
||||
}, this);
|
||||
|
||||
this.on('newListener', onNewListener);
|
||||
|
||||
this.extend = function () {
|
||||
return courier.createSource(this._getType()).inherits(this);
|
||||
};
|
||||
|
||||
// get/set internal state values
|
||||
this._methods.forEach(function (name) {
|
||||
this[name] = function (val) {
|
||||
state[name] = val;
|
||||
if (name === 'index' && arguments[1]) {
|
||||
state.index = new IndexPattern(val, arguments[1]);
|
||||
}
|
||||
return this;
|
||||
};
|
||||
}, this);
|
||||
}
|
||||
inherits(DataSource, EventEmitter);
|
||||
|
||||
/*****
|
||||
* PUBLIC API
|
||||
*****/
|
||||
|
||||
/**
|
||||
* fetch the field names for this DataSource
|
||||
* @param {Function} cb
|
||||
* @callback {Error, Array} - calls cb with a possible error or an array of field names
|
||||
* @todo
|
||||
*/
|
||||
DataSource.prototype.getFieldNames = function (cb) {
|
||||
throw new Error('not implemented');
|
||||
};
|
||||
|
||||
/**
|
||||
* flatten an object to a simple encodable object
|
||||
* @return {[type]} [description]
|
||||
*/
|
||||
DataSource.prototype.toJSON = function () {
|
||||
return _.omit(this._state, 'inherits');
|
||||
};
|
||||
|
||||
/**
|
||||
* Create a string representation of the object
|
||||
* @return {[type]} [description]
|
||||
*/
|
||||
DataSource.prototype.toString = function () {
|
||||
return JSON.stringify(this.toJSON());
|
||||
};
|
||||
|
||||
/*****
|
||||
* PRIVATE API
|
||||
*****/
|
||||
|
||||
/**
|
||||
* Handle errors by allowing them to bubble from the DataSource
|
||||
* to the courior. Maybe we should walk the inheritance chain too.
|
||||
* @param {Error} err - The error that occured
|
||||
* @return {undefined}
|
||||
*/
|
||||
DataSource.prototype._error = function (err) {
|
||||
if (EventEmitter.listenerCount(this, 'error')) {
|
||||
this.emit('error', err);
|
||||
} else {
|
||||
this._courier.emit('error', err);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Walk the inheritance chain of a source and return it's
|
||||
* flat representaion (taking into account merging rules)
|
||||
* @return {object} - the flat state of the DataSource
|
||||
*/
|
||||
DataSource.prototype._flatten = function () {
|
||||
var type = this._getType();
|
||||
// the merged state of this dataSource and it's ancestors
|
||||
var flatState = {};
|
||||
|
||||
var collectProp = _.partial(this._mergeProp, flatState);
|
||||
|
||||
// walk the chain and merge each property
|
||||
var current = this;
|
||||
var currentState;
|
||||
while (current) {
|
||||
currentState = current._state;
|
||||
_.forOwn(currentState, collectProp);
|
||||
current = currentState.inherits;
|
||||
}
|
||||
|
||||
if (type === 'search') {
|
||||
// defaults for the query
|
||||
_.forOwn({
|
||||
query: {
|
||||
'match_all': {}
|
||||
}
|
||||
}, collectProp);
|
||||
|
||||
// switch to filtered query if there are filters
|
||||
if (flatState.filters) {
|
||||
if (flatState.filters.length) {
|
||||
flatState.body.query = {
|
||||
filtered: {
|
||||
query: flatState.body.query,
|
||||
filter: {
|
||||
bool: {
|
||||
must: flatState.filters
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
delete flatState.filters;
|
||||
}
|
||||
}
|
||||
|
||||
return flatState;
|
||||
};
|
||||
|
||||
return DataSource;
|
||||
});
|
181
src/courier/data_source/doc.js
Normal file
181
src/courier/data_source/doc.js
Normal file
|
@ -0,0 +1,181 @@
|
|||
define(function (require) {
|
||||
var DataSource = require('courier/data_source/data_source');
|
||||
var inherits = require('utils/inherits');
|
||||
var errors = require('courier/errors');
|
||||
var _ = require('lodash');
|
||||
|
||||
function DocSource(courier, initialState) {
|
||||
DataSource.call(this, courier, initialState);
|
||||
}
|
||||
inherits(DocSource, DataSource);
|
||||
|
||||
/**
|
||||
* Method used by the Courier to fetch multiple DocSource objects at one time.
|
||||
* Those objects come in via the refs array, which is a list of objects containing
|
||||
* at least `source` and `version` keys.
|
||||
*
|
||||
* @param {Courier} courier - The courier requesting the records
|
||||
* @param {array} refs - The list of refs
|
||||
* @param {Function} cb - Callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
DocSource.fetch = function (courier, refs, cb) {
|
||||
var client = courier._getClient();
|
||||
var allRefs = [];
|
||||
var body = {
|
||||
docs: []
|
||||
};
|
||||
|
||||
_.each(refs, function (ref) {
|
||||
var source = ref.source;
|
||||
if (source._getType() !== 'doc') return;
|
||||
|
||||
allRefs.push(ref);
|
||||
body.docs.push(source._flatten());
|
||||
});
|
||||
|
||||
return client.mget({ body: body }, function (err, resp) {
|
||||
if (err) return cb(err);
|
||||
|
||||
_.each(resp.docs, function (resp, i) {
|
||||
var ref = allRefs[i];
|
||||
var source = ref.source;
|
||||
|
||||
if (resp.error) return this._error(resp);
|
||||
if (ref.version === resp._version) return; // no change
|
||||
ref.version = resp._version;
|
||||
source._storeVersion(resp._version);
|
||||
source.emit('results', resp);
|
||||
});
|
||||
|
||||
cb(err, resp);
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* Method used to check if a list of refs is
|
||||
* @param {[type]} courier [description]
|
||||
* @param {[type]} refs [description]
|
||||
* @param {Function} cb [description]
|
||||
* @return {[type]} [description]
|
||||
*/
|
||||
DocSource.validate = function (courier, refs, cb) {
|
||||
var invalid = _.filter(refs, function (ref) {
|
||||
var storedVersion = ref.source._getStoredVersion();
|
||||
if (ref.version !== storedVersion) return true;
|
||||
});
|
||||
setTimeout(function () {
|
||||
cb(void 0, invalid);
|
||||
});
|
||||
};
|
||||
|
||||
/*****
|
||||
* PUBLIC API
|
||||
*****/
|
||||
|
||||
/**
|
||||
* List of methods that is turned into a chainable API in the constructor
|
||||
* @type {Array}
|
||||
*/
|
||||
DocSource.prototype._methods = [
|
||||
'index',
|
||||
'type',
|
||||
'id',
|
||||
'sourceInclude',
|
||||
'sourceExclude'
|
||||
];
|
||||
|
||||
/**
|
||||
* Applies a partial update to the document
|
||||
* @param {object} fields - The fields to change and their new values (es doc field)
|
||||
* @param {Function} cb - Callback to know when the update is complete
|
||||
* @return {undefined}
|
||||
*/
|
||||
DocSource.prototype.update = function (fields, cb) {
|
||||
var source = this;
|
||||
var courier = this._courier;
|
||||
var client = courier._getClient();
|
||||
var state = this._state;
|
||||
|
||||
client.update({
|
||||
id: state.id,
|
||||
type: state.type,
|
||||
index: state.index,
|
||||
body: {
|
||||
doc: fields
|
||||
}
|
||||
}, function (err, resp) {
|
||||
if (err) return cb(err);
|
||||
|
||||
courier._docUpdated(source);
|
||||
return cb();
|
||||
});
|
||||
};
|
||||
|
||||
/*****
|
||||
* PRIVATE API
|
||||
*****/
|
||||
|
||||
/**
|
||||
* Get the type of this DataSource
|
||||
* @return {string} - 'doc'
|
||||
*/
|
||||
DocSource.prototype._getType = function () {
|
||||
return 'doc';
|
||||
};
|
||||
|
||||
/**
|
||||
* Used to merge properties into the state within ._flatten().
|
||||
* The state is passed in and modified by the function
|
||||
*
|
||||
* @param {object} state - the current merged state
|
||||
* @param {*} val - the value at `key`
|
||||
* @param {*} key - The key of `val`
|
||||
* @return {undefined}
|
||||
*/
|
||||
DocSource.prototype._mergeProp = function (state, val, key) {
|
||||
key = '_' + key;
|
||||
|
||||
if (val != null && state[key] == null) {
|
||||
state[key] = val;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Creates a key based on the doc's index/type/id
|
||||
* @return {string}
|
||||
*/
|
||||
DocSource.prototype._versionKey = function () {
|
||||
var state = this._state;
|
||||
return 'DocVersion:' + (
|
||||
[
|
||||
state.index,
|
||||
state.type,
|
||||
state.id
|
||||
]
|
||||
.map(encodeURIComponent)
|
||||
.join('/')
|
||||
);
|
||||
};
|
||||
|
||||
/**
|
||||
* Fetches the stored version from localStorage
|
||||
* @return {number} - the version number, or NaN
|
||||
*/
|
||||
DocSource.prototype._getStoredVersion = function () {
|
||||
var id = this._versionKey();
|
||||
return _.parseInt(localStorage.getItem(id));
|
||||
};
|
||||
|
||||
/**
|
||||
* Stores the version into localStorage
|
||||
* @param {number, NaN} version - the current version number, NaN works well forcing a refresh
|
||||
* @return {undefined}
|
||||
*/
|
||||
DocSource.prototype._storeVersion = function (version) {
|
||||
var id = this._versionKey();
|
||||
localStorage.setItem(id, version);
|
||||
};
|
||||
|
||||
return DocSource;
|
||||
});
|
129
src/courier/data_source/search.js
Normal file
129
src/courier/data_source/search.js
Normal file
|
@ -0,0 +1,129 @@
|
|||
define(function (require) {
|
||||
var DataSource = require('courier/data_source/data_source');
|
||||
var inherits = require('utils/inherits');
|
||||
var errors = require('courier/errors');
|
||||
var _ = require('lodash');
|
||||
|
||||
function SearchSource(courier, initialState) {
|
||||
DataSource.call(this, courier, initialState);
|
||||
}
|
||||
inherits(SearchSource, DataSource);
|
||||
|
||||
/**
|
||||
* Method used by the Courier to fetch multiple SearchSource request at a time.
|
||||
* Those objects come in via the refs array, which is a list of objects containing
|
||||
* a `source` keys.
|
||||
*
|
||||
* @param {Courier} courier - The courier requesting the results
|
||||
* @param {array} refs - The list of refs
|
||||
* @param {Function} cb - Callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
SearchSource.fetch = function (courier, refs, cb) {
|
||||
var client = courier._getClient();
|
||||
var allRefs = [];
|
||||
var body = '';
|
||||
|
||||
_.each(refs, function (ref) {
|
||||
var source = ref.source;
|
||||
if (source._getType() !== 'search') {
|
||||
return;
|
||||
}
|
||||
allRefs.push(source);
|
||||
|
||||
var state = source._flatten();
|
||||
body +=
|
||||
JSON.stringify({ index: state.index, type: state.type })
|
||||
+ '\n'
|
||||
+ JSON.stringify(state.body)
|
||||
+ '\n';
|
||||
});
|
||||
|
||||
return client.msearch({ body: body }, function (err, resp) {
|
||||
if (err) return cb(err);
|
||||
|
||||
_.each(resp.responses, function (resp, i) {
|
||||
var source = allRefs[i];
|
||||
if (resp.error) return errors.emit(source, courier, resp);
|
||||
source.emit('results', resp);
|
||||
});
|
||||
|
||||
cb(void 0, resp);
|
||||
});
|
||||
};
|
||||
|
||||
/*****
|
||||
* PUBLIC API
|
||||
*****/
|
||||
|
||||
/**
|
||||
* List of the editable state properties that turn into a
|
||||
* chainable API
|
||||
*
|
||||
* @type {Array}
|
||||
*/
|
||||
SearchSource.prototype._methods = [
|
||||
'index',
|
||||
'type',
|
||||
'query',
|
||||
'filter',
|
||||
'sort',
|
||||
'highlight',
|
||||
'aggs',
|
||||
'from',
|
||||
'size',
|
||||
'source',
|
||||
'inherits'
|
||||
];
|
||||
|
||||
/******
|
||||
* PRIVATE APIS
|
||||
******/
|
||||
|
||||
/**
|
||||
* Gets the type of the DataSource
|
||||
* @return {string}
|
||||
*/
|
||||
SearchSource.prototype._getType = function () {
|
||||
return 'search';
|
||||
};
|
||||
|
||||
/**
|
||||
* Used to merge properties into the state within ._flatten().
|
||||
* The state is passed in and modified by the function
|
||||
*
|
||||
* @param {object} state - the current merged state
|
||||
* @param {*} val - the value at `key`
|
||||
* @param {*} key - The key of `val`
|
||||
* @return {undefined}
|
||||
*/
|
||||
SearchSource.prototype._mergeProp = function (state, val, key) {
|
||||
switch (key) {
|
||||
case 'inherits':
|
||||
case '_type':
|
||||
// ignore
|
||||
return;
|
||||
case 'filter':
|
||||
state.filters = state.filters || [];
|
||||
state.filters.push(val);
|
||||
return;
|
||||
case 'index':
|
||||
case 'type':
|
||||
case 'id':
|
||||
if (key && state[key] == null) {
|
||||
state[key] = val;
|
||||
}
|
||||
return;
|
||||
case 'source':
|
||||
key = '_source';
|
||||
/* fall through */
|
||||
default:
|
||||
state.body = state.body || {};
|
||||
if (key && state.body[key] == null) {
|
||||
state.body[key] = val;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return SearchSource;
|
||||
});
|
|
@ -1,135 +0,0 @@
|
|||
define(function (require) {
|
||||
var _ = require('lodash');
|
||||
|
||||
function Docs(courier) {
|
||||
// docs that we have let loose, and want to track
|
||||
var tracking = {};
|
||||
var watchers = {};
|
||||
|
||||
function respId(getResp) {
|
||||
return [
|
||||
encodeURIComponent(getResp._index),
|
||||
encodeURIComponent(getResp._type),
|
||||
encodeURIComponent(getResp._id)
|
||||
].join('/');
|
||||
}
|
||||
|
||||
function change(id, updated) {
|
||||
if (watchers[id]) {
|
||||
var notify = function () {
|
||||
var oldVal = tracking[id]._source;
|
||||
tracking[id] = _.cloneDeep(update);
|
||||
watchers[id].forEach(function (watcher) {
|
||||
try {
|
||||
watcher(updated, oldVal);
|
||||
} catch (e) { console.error(e); }
|
||||
});
|
||||
};
|
||||
|
||||
if (updated) {
|
||||
notify();
|
||||
} else {
|
||||
courier.get('client').get({
|
||||
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function track(resp) {
|
||||
var id = respId(resp);
|
||||
var tracker = _.pick(resp, '_id', '_type', '_index', '_source');
|
||||
if (tracking[id] && equal(tracking[id]._source, resp)) return false;
|
||||
change(id, resp);
|
||||
}
|
||||
|
||||
/**
|
||||
* add a function to be called when objects matching
|
||||
* this resp are changed
|
||||
* @param {object} resp - Response like object, should contain _id, _type, and _index keys
|
||||
* @param {[type]} onChange - Function to be called when changes are noticed
|
||||
*/
|
||||
function watch(resp, onChange) {
|
||||
var id = respId(resp);
|
||||
if (!watchers[id]) watchers[id] = [];
|
||||
watchers[id].push(onChange);
|
||||
}
|
||||
|
||||
function get(args, cb, onChange) {
|
||||
var client = courier.get('client');
|
||||
client.get(args, function (err, getResp) {
|
||||
if (err) return cb(err);
|
||||
watch(getResp, onChange);
|
||||
return cb(void 0, getResp);
|
||||
});
|
||||
}
|
||||
|
||||
function index(args, cb) {
|
||||
var client = courier.get('client');
|
||||
|
||||
client.index(args, function (err, indexResp) {
|
||||
if (err) return cb(err);
|
||||
delete indexResp.created;
|
||||
indexResp._source = args.body;
|
||||
track(indexResp);
|
||||
return cb(void 0, indexResp);
|
||||
});
|
||||
}
|
||||
|
||||
function update(args, cb) {
|
||||
var client = courier.get('client');
|
||||
client.update(args, function (err, updateResp) {
|
||||
if (err) return cb(err);
|
||||
return cb(void 0, updateResp);
|
||||
});
|
||||
}
|
||||
|
||||
this.watch = watch;
|
||||
this.get = get;
|
||||
this.index = index;
|
||||
this.set = index;
|
||||
this.update = update;
|
||||
}
|
||||
|
||||
function equal(o1, o2) {
|
||||
/* jshint eqeqeq:false, forin:false */
|
||||
if (o1 === o2) return true;
|
||||
if (o1 === null || o2 === null) return false;
|
||||
if (o1 !== o1 && o2 !== o2) return true; // NaN === NaN
|
||||
var t1 = typeof o1, t2 = typeof o2, length, key, keySet;
|
||||
if (t1 == t2) {
|
||||
if (t1 == 'object') {
|
||||
if (_.isArray(o1)) {
|
||||
if (!_.isArray(o2)) return false;
|
||||
if ((length = o1.length) == o2.length) {
|
||||
for (key = 0; key < length; key++) {
|
||||
if (!equal(o1[key], o2[key])) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
} else if (_.isDate(o1)) {
|
||||
return _.isDate(o2) && o1.getTime() == o2.getTime();
|
||||
} else if (_.isRegExp(o1) && _.isRegExp(o2)) {
|
||||
return o1.toString() == o2.toString();
|
||||
} else {
|
||||
if (_.isArray(o2)) return false;
|
||||
keySet = {};
|
||||
for (key in o1) {
|
||||
if (_.isFunction(o1[key])) continue;
|
||||
if (!equal(o1[key], o2[key])) return false;
|
||||
keySet[key] = true;
|
||||
}
|
||||
for (key in o2) {
|
||||
if (!keySet.hasOwnProperty(key) &&
|
||||
o2[key] !== undefined &&
|
||||
!_.isFunction(o2[key])) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
return Docs;
|
||||
});
|
|
@ -1,12 +1,12 @@
|
|||
define(function (require) {
|
||||
define(function (require, module, exports) {
|
||||
var listenerCount = require('utils/event_emitter').listenerCount;
|
||||
|
||||
// caused by a refresh attempting to start before the prevous is done
|
||||
function HastyRefresh() {
|
||||
this.name = 'HastyRefresh';
|
||||
this.message = 'Courier attempted to start a query before the previous had finished.';
|
||||
}
|
||||
HastyRefresh.prototype = new Error();
|
||||
HastyRefresh.prototype.constructor = HastyRefresh;
|
||||
|
||||
return {
|
||||
HastyRefresh: HastyRefresh
|
||||
};
|
||||
exports.HastyRefresh = HastyRefresh;
|
||||
});
|
|
@ -1,16 +0,0 @@
|
|||
var elasticsearch = require('elasticsearch');
|
||||
var es = elasticsearch.Client();
|
||||
|
||||
es.msearch({
|
||||
body: [
|
||||
{
|
||||
index: 'logstash-2014.02.1111'
|
||||
},
|
||||
{
|
||||
query: { 'match_all': {} }
|
||||
}
|
||||
]
|
||||
}, function (err, resp) {
|
||||
console.log(resp);
|
||||
es.close();
|
||||
});
|
|
@ -1,3 +1,3 @@
|
|||
<courier-test type="apache" fields="extension,response,request"></courier-test>
|
||||
<courier-test type="nginx" fields=""></courier-test>
|
||||
<courier-doc-test index="" ></courier-doc-test>
|
||||
<courier-doc-test index="logstash-2014.02.11" type="apache" id="6434"></courier-doc-test>
|
||||
<courier-doc-test index="logstash-2014.02.11" type="apache" id="6434"></courier-doc-test>
|
||||
<courier-test type="apache" fields="extension,response,request"></courier-test>
|
|
@ -1,5 +1,6 @@
|
|||
define(function (require) {
|
||||
var angular = require('angular');
|
||||
var _ = require('lodash');
|
||||
|
||||
angular.module('kibana/controllers')
|
||||
.controller('Kibana', function (courier, $scope, $rootScope) {
|
||||
|
@ -8,7 +9,7 @@ define(function (require) {
|
|||
.size(5);
|
||||
|
||||
// this should be triggered from within the controlling application
|
||||
setTimeout(courier.start, 15);
|
||||
setTimeout(_.bindKey(courier, 'start'), 15);
|
||||
});
|
||||
|
||||
angular.module('kibana/directives')
|
||||
|
@ -18,19 +19,53 @@ define(function (require) {
|
|||
scope: {
|
||||
type: '@'
|
||||
},
|
||||
template: '<strong style="float:left">{{count}} : </strong><pre>{{json}}</pre>',
|
||||
controller: function ($rootScope, $scope) {
|
||||
$scope.count = 0;
|
||||
|
||||
var source = $rootScope.dataSource.extend()
|
||||
.type($scope.type)
|
||||
.source({
|
||||
include: 'country'
|
||||
})
|
||||
.on('results', function (resp) {
|
||||
$scope.count ++;
|
||||
$scope.json = JSON.stringify(resp.hits, null, ' ');
|
||||
});
|
||||
|
||||
$scope.$watch('type', source.type);
|
||||
}
|
||||
};
|
||||
})
|
||||
.directive('courierDocTest', function () {
|
||||
return {
|
||||
restrict: 'E',
|
||||
scope: {
|
||||
id: '@',
|
||||
type: '@',
|
||||
index: '@'
|
||||
},
|
||||
template: '<pre>{{json}}</pre>'
|
||||
template: '<strong style="float:left">{{count}} : <button ng-click="click()">reindex</button> : </strong><pre>{{json}}</pre>',
|
||||
controller: function (courier, $scope) {
|
||||
$scope.count = 0;
|
||||
|
||||
var currentSource;
|
||||
$scope.click = function () {
|
||||
if (currentSource) {
|
||||
source.update(currentSource);
|
||||
}
|
||||
};
|
||||
|
||||
var source = courier.createSource('doc')
|
||||
.id($scope.id)
|
||||
.type($scope.type)
|
||||
.index($scope.index)
|
||||
.on('results', function (doc) {
|
||||
currentSource = doc._source;
|
||||
$scope.count ++;
|
||||
$scope.json = JSON.stringify(doc, null, ' ');
|
||||
});
|
||||
}
|
||||
};
|
||||
});
|
||||
});
|
|
@ -1,16 +1,22 @@
|
|||
define(function (require) {
|
||||
var angular = require('angular');
|
||||
var Courier = require('courier/courier');
|
||||
var DocSource = require('courier/data_source/doc');
|
||||
|
||||
require('services/promises');
|
||||
|
||||
angular.module('kibana/services')
|
||||
.service('courier', function (es) {
|
||||
.service('courier', function (es, promises) {
|
||||
|
||||
promises.playNice(DocSource.prototype, [
|
||||
'update',
|
||||
'index'
|
||||
]);
|
||||
|
||||
var courier = new Courier({
|
||||
fetchInterval: 15000,
|
||||
client: es
|
||||
});
|
||||
|
||||
courier.on('error', function (err) {
|
||||
console.error(err);
|
||||
client: es,
|
||||
promises: promises
|
||||
});
|
||||
|
||||
return courier;
|
||||
|
|
39
src/kibana/services/promises.js
Normal file
39
src/kibana/services/promises.js
Normal file
|
@ -0,0 +1,39 @@
|
|||
define(function (require, module, exports) {
|
||||
var _ = require('lodash');
|
||||
var angular = require('angular');
|
||||
|
||||
angular.module('kibana/services')
|
||||
.service('promises', function ($q) {
|
||||
|
||||
function playNice(fn, fns) {
|
||||
if (fns && _.isArray(fns) && _.isObject(fn)) {
|
||||
fns.forEach(function (method) {
|
||||
fn[method] = playNice(fn[method]);
|
||||
});
|
||||
return fn;
|
||||
}
|
||||
|
||||
return function playNiceWrapper() {
|
||||
// if the last arg is a callback then don't do anything
|
||||
if (typeof arguments[arguments.length - 1] === 'function') {
|
||||
return fn.apply(this, arguments);
|
||||
}
|
||||
|
||||
// otherwise create a callback and pass it in
|
||||
var args = Array.prototype.slice.call(arguments);
|
||||
var defer = $q.defer();
|
||||
args.push(function (err, result) {
|
||||
if (err) return defer.reject(err);
|
||||
defer.resolve(result);
|
||||
});
|
||||
fn.apply(this, args);
|
||||
return defer.promise;
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
return {
|
||||
playNice: playNice
|
||||
};
|
||||
});
|
||||
});
|
|
@ -2,6 +2,9 @@ define(function (require) {
|
|||
var Courier = require('courier/courier');
|
||||
var _ = require('lodash');
|
||||
var sinon = require('sinon/sinon');
|
||||
var DataSource = require('courier/data_source/data_source');
|
||||
var DocSource = require('courier/data_source/doc');
|
||||
var SearchSource = require('courier/data_source/search');
|
||||
|
||||
describe('Courier Module', function () {
|
||||
|
||||
|
@ -39,13 +42,13 @@ define(function (require) {
|
|||
it('creates an empty search DataSource object', function () {
|
||||
courier = new Courier();
|
||||
var source = courier.createSource();
|
||||
expect(source._state()).to.eql({ _type: 'search' });
|
||||
expect(source._state).to.eql({});
|
||||
});
|
||||
it('optionally accepts a type for the DataSource', function () {
|
||||
var courier = new Courier();
|
||||
expect(courier.createSource()._state()._type).to.eql('search');
|
||||
expect(courier.createSource('search')._state()._type).to.eql('search');
|
||||
expect(courier.createSource('get')._state()._type).to.eql('get');
|
||||
expect(courier.createSource()).to.be.a(SearchSource);
|
||||
expect(courier.createSource('search')).to.be.a(SearchSource);
|
||||
expect(courier.createSource('doc')).to.be.a(DocSource);
|
||||
expect(function () {
|
||||
courier.createSource('invalid type');
|
||||
}).to.throwError(TypeError);
|
||||
|
@ -53,12 +56,12 @@ define(function (require) {
|
|||
it('optionally accepts a json object/string that will populate the DataSource object with settings', function () {
|
||||
courier = new Courier();
|
||||
var savedState = JSON.stringify({
|
||||
_type: 'get',
|
||||
_type: 'doc',
|
||||
index: 'logstash-[YYYY-MM-DD]',
|
||||
type: 'nginx',
|
||||
id: '1'
|
||||
});
|
||||
var source = courier.createSource('get', savedState);
|
||||
var source = courier.createSource('doc', savedState);
|
||||
expect(source + '').to.eql(savedState);
|
||||
});
|
||||
});
|
||||
|
@ -83,7 +86,7 @@ define(function (require) {
|
|||
source.on('results', _.noop);
|
||||
source.index('the index name');
|
||||
|
||||
expect(Courier._flattenDataSource(source).index).to.eql('the index name');
|
||||
expect(source._flatten().index).to.eql('the index name');
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -111,7 +114,7 @@ define(function (require) {
|
|||
})
|
||||
.on('results', _.noop);
|
||||
|
||||
var query = Courier._flattenDataSource(math);
|
||||
var query = math._flatten();
|
||||
expect(query.index).to.eql('people');
|
||||
expect(query.type).to.eql('students');
|
||||
expect(query.body).to.eql({
|
||||
|
|
|
@ -1,30 +1,20 @@
|
|||
define(function (require) {
|
||||
var Courier = require('courier/courier');
|
||||
var DataSource = require('courier/data_source');
|
||||
var DataSource = require('courier/data_source/data_source');
|
||||
var DocSource = require('courier/data_source/doc');
|
||||
var SearchSource = require('courier/data_source/search');
|
||||
|
||||
describe('DataSource class', function () {
|
||||
var courier = new Courier();
|
||||
describe('::new', function () {
|
||||
it('accepts and validates a type', function () {
|
||||
var source = new DataSource(courier, 'get');
|
||||
expect(source._state()._type).to.eql('get');
|
||||
|
||||
source = new DataSource(courier, 'search');
|
||||
expect(source._state()._type).to.eql('search');
|
||||
|
||||
expect(function () {
|
||||
source = new DataSource(courier, 'invalid Type');
|
||||
}).to.throwError(TypeError);
|
||||
});
|
||||
|
||||
it('optionally accepts a json object/string that will populate the DataSource object with settings', function () {
|
||||
var savedState = JSON.stringify({
|
||||
_type: 'get',
|
||||
_type: 'doc',
|
||||
index: 'logstash-[YYYY-MM-DD]',
|
||||
type: 'nginx',
|
||||
id: '1'
|
||||
});
|
||||
var source = new DataSource(courier, 'get', savedState);
|
||||
var source = new DocSource(courier, savedState);
|
||||
expect(source + '').to.eql(savedState);
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue