added base for angular app, incomplete courier still included

This commit is contained in:
Spencer Alger 2014-02-11 13:58:29 -07:00
parent e290b94927
commit d082d7a358
11 changed files with 683 additions and 165 deletions

View file

@ -5,9 +5,11 @@
"globals": {
"define": true,
"require": true
"require": true,
"console": true
},
"camelcase": true,
"white": true,
"bitwise": false,
"eqnull": true,
@ -17,7 +19,7 @@
"expr": true,
"indent": 2,
"latedef": "nofunc",
"newcap": true,
"newcap": false,
"noarg": true,
"noempty": true,
"undef": true,

View file

@ -12,5 +12,6 @@
<script>require(['main'], function () {});</script>
</head>
<body>
<div ng-view></div>
</body>
</html>

View file

@ -2,64 +2,74 @@ define(function (require) {
var DataSource = require('courier/data_source');
var _ = require('lodash');
var angular = require('angular');
var optionNames = [
'fetchInterval',
'client'
];
require('courier/data_source');
require('courier/mapper');
function Courier(config) {
config = config || {};
var module = angular.module('kbn.services.courier', []);
var opts = {};
var fetchTimer;
var activeRequest;
module.directive('courierTest', function (Courier) {
var courier = Courier();
});
var sources = [];
module.factory('Courier', function (es) {
var optionNames = [
'fetchInterval',
'client'
];
var mergeInheritance = function (source) {
var mergeProp = function (state, filters, val, key) {
switch (key) {
case 'inherits':
// ignore
break;
case 'filter':
filters.push(val);
break;
case 'index':
case 'type':
if (key && state[key] == null) {
state[key] = val;
}
break;
default:
if (key && state.body[key] == null) {
state.body[key] = val;
}
break;
}
};
var flattenDataSource = function (source) {
var state = {
body: {}
};
// all of the filters from the source chain,
// no particular order
// all of the filters from the source chain
var filters = [];
var mergeProp = function (val, key) {
switch (key) {
case 'filters':
filters.push(val);
break;
case 'index':
case 'type':
if (key && state[key] == null) {
state[key] = val;
}
break;
default:
if (key && state.body[key] == null) {
state.body[key] = val;
}
break;
}
};
var collectProp = _.partial(mergeProp, state, filters);
// walk the chain and merge each property
var current = source;
var currentState;
while (current) {
_.forOwn(current, mergeProp);
current = current.inherits;
currentState = current._state();
_.forOwn(currentState, collectProp);
current = currentState.inherits;
}
// defaults for the query
_.forOwn({
query: {
match_all: {}
'match_all': {}
},
index: '_all',
type: '_all'
}, mergeProp);
}, collectProp);
// switch to filtered query if there are filters
if (filters.length) {
state.body.query = {
filtered: {
@ -76,99 +86,120 @@ define(function (require) {
return state;
};
var writeSourceRequest = function (source) {
var state = mergeInheritance(source.state());
return JSON.stringify({
index: state.index,
type: state.type
}) +
'\n' +
JSON.stringify(state.body);
};
function Courier(config) {
var opts = {};
var fetchTimer;
var activeRequest;
var onSourceUpdate = _.bind(function (source) {
var existing = _.find(sources, { source: source });
if (!existing) {
this.stopFetchingSource(source);
}
existing.req = writeSourceRequest(source);
}, this);
var sources = [];
var setFetchTimeout = function () {
clearTimeout(fetchTimer);
if (opts.fetchInterval) {
fetchTimer = setTimeout(onFetch, opts.fetchInterval);
} else {
fetchTimer = null;
}
};
var stopFetching = function () {
clearTimeout(fetchTimer);
return this;
};
var onFetch = _.bind(function () {
if (!opts.client) {
throw new Error('Courier does not have a client yet, unable to fetch queries');
}
var requests = _.pluck(sources, 'req');
activeRequest = opts.client.msearch({
body: requests
});
}, this);
var startFetchingSource = function (source) {
var existing = _.find(sources, { source: source });
if (existing) return false;
sources.push({
source: source,
req: writeSourceRequest(source)
});
source.on('change', onSourceUpdate);
return this;
};
var stopFetchingSource = function (source) {
source.removeListener('change', onSourceUpdate);
_.remove(sources, { source: source });
if (sources.length === 0) clearTimeout(fetchTimer);
};
// public api
this.start = setFetchTimeout;
this.startFetchingSource = startFetchingSource;
this.stop = stopFetching;
this.stopFetchingSource = stopFetchingSource;
this.close = _.partial(_.each, sources, stopFetchingSource);
this.define = function (state) {
return new DataSource(this, state);
};
// chainable settings/getters for state stuff
optionNames.forEach(function chainableOptions(name) {
this[name] = function (val) {
if (val === void 0) {
return opts[name];
var onFetch = _.bind(function () {
if (!opts.client) {
throw new Error('Courier does not have a client yet, unable to fetch queries');
}
opts[name] = val;
switch (name) {
case 'fetchInterval':
if (fetchTimer) setFetchTimeout();
var all = [];
var body = '';
_.each(sources, function (source) {
all.push(source);
var state = flattenDataSource(source);
var header = JSON.stringify({
index: state.index,
type: state.type
});
var body = JSON.stringify(state.body);
body += header + '\n' + body + '\n';
});
if (activeRequest) {
activeRequest.abort();
}
activeRequest = opts.client.msearch({
body: body
}).then(function (resp) {
_.each(resp.responses, function (resp, i) {
sources[i].emit('results', resp);
});
}, function (err) {
console.error(err);
});
}, this);
var setFetchTimeout = function () {
clearTimeout(fetchTimer);
if (opts.fetchInterval) {
fetchTimer = setTimeout(onFetch, opts.fetchInterval);
} else {
fetchTimer = null;
}
};
var stopFetching = function () {
clearTimeout(fetchTimer);
return this;
};
}, this);
// private api, exposed for testing
this._state = function () { return opts; };
this._writeSourceRequest = writeSourceRequest;
}
var startFetchingSource = function (source) {
var existing = _.find(sources, { source: source });
if (existing) return false;
sources.push(source);
return Courier;
return this;
};
var stopFetchingSource = function (source) {
var i = sources.indexOf(source);
if (i !== -1) {
sources.slice(i, 1);
}
if (sources.length === 0) stopFetching();
};
// public api
this.start = setFetchTimeout;
this.startFetchingSource = startFetchingSource;
this.stop = stopFetching;
this.stopFetchingSource = stopFetchingSource;
this.close = _.partial(_.each, sources, stopFetchingSource);
this.define = function (state) {
return new DataSource(this, state);
};
this.isStarted = function () {
return !!fetchTimer;
};
// chainable settings/getters for state stuff
optionNames.forEach(function chainableOptions(name) {
this[name] = function (val) {
if (val === void 0) {
return opts[name];
}
opts[name] = val;
switch (name) {
case 'fetchInterval':
if (fetchTimer) setFetchTimeout();
}
return this;
};
}, this);
// private api, exposed for testing
this._flattenDataSource = flattenDataSource;
this._getQueryForSource = function (source) {
var existing = _.find(sources, { source: source });
if (existing) return existing.req;
};
_.each(config || {}, function (val, key) {
if (typeof this[key] !== 'function') throw new TypeError('invalid config "' + key + '"');
this[key](val);
}, this);
}
return Courier;
});
});

View file

@ -33,7 +33,7 @@ define(function (require) {
if (initialState) {
// state can be serialized as JSON, and passed back in to restore
if (typeof initialState === 'string') {
state = JSON.parse(state);
state = JSON.parse(initialState);
} else {
state = _.cloneDeep(initialState);
}
@ -43,21 +43,22 @@ define(function (require) {
var mapper = new Mapper();
var makeActive = _.bind(function () {
if (listenerCount(this, 'results') === 0) return;
var onNewListener = _.bind(function (name) {
// new newListener is emitted before it is added, count will be 0
if (name !== 'results' || listenerCount(this, 'results') !== 0) return;
courier.startFetchingSource(this);
this.removeListener('newListener', makeActive);
this.on('removeListener', makeInactive);
this.removeListener('newListener', onNewListener);
this.on('removeListener', onRemoveListener);
}, this);
var makeInactive = _.bind(function () {
var onRemoveListener = _.bind(function () {
if (listenerCount(this, 'results') > 0) return;
courier.stopFetchingSource(this);
this.removeListener('removeListener', makeInactive);
this.on('newListener', makeActive);
this.removeListener('removeListener', onRemoveListener);
this.on('newListener', onNewListener);
}, this);
this.on('newListener', makeActive);
this.on('newListener', onNewListener);
/**
* Used to flatten a chain of DataSources
@ -70,9 +71,11 @@ define(function (require) {
// public api
this.toJSON = function () {
return JSON.stringify(_.omit(state, 'inherits'));
return _.omit(state, 'inherits');
};
this.toString = function () {
return JSON.stringify(this.toJSON());
};
this.getFieldNames = function (cb) {
mapper.getMapping(state.index, state.type, function (mapping) {
return _.keys(mapping);
@ -85,10 +88,7 @@ define(function (require) {
if (val === void 0) {
return state[name];
}
if (state[name] !== val) {
state[name] = val;
this.emit('change', this, name);
}
state[name] = val;
return this;
};
}, this);

View file

@ -1,12 +1,21 @@
define(function (require) {
/**
* Fetches mappings from elasticsearch and casts result objects
* based on those mappings
* - Resolves index patterns
* - Fetches mappings from elasticsearch
* - casts result object fields using mappings
*
* @class Mapper
*/
function Mapper() {
function Mapper(index, type) {
this.indices = function () {
return new Promise(function (resolve, reject) {
});
};
this.getFields = function () {
};
}
return Mapper;

View file

@ -1,9 +1,97 @@
/**
* main app level module
*/
define(function (require) {
var angular = require('angular');
var $ = require('jquery');
var _ = require('lodash');
var scopedRequire = require('require');
require('elasticsearch');
require('angular-route');
var dependencies = [
'kibana',
'ngRoute'
];
var app = angular.module('kibana', dependencies);
// keep a reference to each module defined before boot, so that
// after boot it can define new features. Also serves as a flag.
var preBootModules = [];
// the functions needed to register different
// features defined after boot
var registerFns = {};
// This stores the Kibana revision number, @REV@ is replaced by grunt.
app.constant('kbnVersion', '@REV@');
// Use this for cache busting partials
app.constant('cacheBust', 'cache-bust=' + Date.now());
/**
* Bootstrap require with the needed config, then setup routing and bootstrap the app
* Modules that need to register components within the application after
* bootstrapping is complete need to pass themselves to this method.
*
* @param {object} module - The Angular module
* @return {object} module
*/
app.useModule = function (module) {
if (preBootModules) {
preBootModules.push(module);
} else {
_.extend(module, registerFns);
}
return module;
};
return {};
app.config(function ($routeProvider, $controllerProvider, $compileProvider, $filterProvider, $provide) {
$routeProvider
.when('/courier-test', {
templateUrl: 'kibana/partials/courier-test.html',
})
.otherwise({
redirectTo: 'courier-test'
});
// this is how the internet told me to dynamically add modules :/
registerFns.controller = $controllerProvider.register;
registerFns.directive = $compileProvider.directive;
registerFns.factory = $provide.factory;
registerFns.service = $provide.service;
registerFns.filter = $filterProvider.register;
});
// load the core components
require([
'courier/courier'
], function () {
var dependencies = [];
_('controllers directives factories services filters'.split(' '))
.map(function (type) { return 'kibana.' + type; })
.each(function (name) {
app.useModule(angular.module(name, []));
dependencies.push(name);
});
// bootstrap the app
$(function () {
angular
.bootstrap(document, dependencies)
.invoke(function ($rootScope) {
_.each(preBootModules, function (module) {
_.extend(module, registerFns);
});
preBootModules = false;
});
});
});
return app;
});

View file

@ -0,0 +1 @@
<courierTest/>

View file

@ -1,15 +1,7 @@
(function () {
function bower(p) { return '../bower_components/' + p; }
require.config({
var config = {
baseUrl: 'kibana',
paths: {
d3: bower('d3/d3'),
lodash: bower('lodash/dist/lodash'),
jquery: bower('jquery/jquery'),
angular: bower('angular/angular'),
'utils/event_emitter': bower('eventEmitter/EventEmitter')
},
paths: {},
shim: {
angular: {
deps: ['jquery'],
@ -17,5 +9,33 @@
}
},
waitSeconds: 60
};
var bowerComponents = [
'd3',
['lodash', 'dist/lodash'],
'jquery',
'angular',
'angular-route',
'elasticsearch'
];
bowerComponents.forEach(function (name) {
var path = '../bower_components/';
if (typeof name === 'object') {
path += name[0] + '/' + name[1];
name = name[0];
} else {
path += name + '/' + name;
}
config.paths[name] = path;
if (name.match(/^angular-/)) {
config.shim[name] = {
deps: ['angular']
};
}
});
require.config(config);
}());

View file

@ -0,0 +1,304 @@
define(function () {
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
function EventEmitter() {
this._events = this._events || {};
this._maxListeners = this._maxListeners || undefined;
}
// Backwards-compat with node 0.10.x
EventEmitter.EventEmitter = EventEmitter;
EventEmitter.prototype._events = undefined;
EventEmitter.prototype._maxListeners = undefined;
// By default EventEmitters will print a warning if more than 10 listeners are
// added to it. This is a useful default which helps finding memory leaks.
EventEmitter.defaultMaxListeners = 10;
// Obviously not all Emitters should be limited to 10. This function allows
// that to be increased. Set to zero for unlimited.
EventEmitter.prototype.setMaxListeners = function (n) {
if (!isNumber(n) || n < 0 || isNaN(n))
throw new TypeError('n must be a positive number');
this._maxListeners = n;
return this;
};
EventEmitter.prototype.emit = function (type) {
var er, handler, len, args, i, listeners;
if (!this._events)
this._events = {};
// If there is no 'error' event listener then throw.
if (type === 'error') {
if (!this._events.error ||
(isObject(this._events.error) && !this._events.error.length)) {
er = arguments[1];
if (er instanceof Error) {
throw er; // Unhandled 'error' event
} else {
throw new TypeError('Uncaught, unspecified "error" event.');
}
return false;
}
}
handler = this._events[type];
if (isUndefined(handler))
return false;
if (isFunction(handler)) {
switch (arguments.length) {
// fast cases
case 1:
handler.call(this);
break;
case 2:
handler.call(this, arguments[1]);
break;
case 3:
handler.call(this, arguments[1], arguments[2]);
break;
// slower
default:
len = arguments.length;
args = new Array(len - 1);
for (i = 1; i < len; i++)
args[i - 1] = arguments[i];
handler.apply(this, args);
}
} else if (isObject(handler)) {
len = arguments.length;
args = new Array(len - 1);
for (i = 1; i < len; i++)
args[i - 1] = arguments[i];
listeners = handler.slice();
len = listeners.length;
for (i = 0; i < len; i++)
listeners[i].apply(this, args);
}
return true;
};
EventEmitter.prototype.addListener = function (type, listener) {
var m;
if (!isFunction(listener))
throw new TypeError('listener must be a function');
if (!this._events)
this._events = {};
// To avoid recursion in the case that type === "newListener"! Before
// adding it to the listeners, first emit "newListener".
if (this._events.newListener)
this.emit('newListener', type,
isFunction(listener.listener) ?
listener.listener : listener);
if (!this._events[type])
// Optimize the case of one listener. Don't need the extra array object.
this._events[type] = listener;
else if (isObject(this._events[type]))
// If we've already got an array, just append.
this._events[type].push(listener);
else
// Adding the second element, need to change to array.
this._events[type] = [this._events[type], listener];
// Check for listener leak
if (isObject(this._events[type]) && !this._events[type].warned) {
if (!isUndefined(this._maxListeners)) {
m = this._maxListeners;
} else {
m = EventEmitter.defaultMaxListeners;
}
if (m && m > 0 && this._events[type].length > m) {
this._events[type].warned = true;
window.console && console.log(
'(node) warning: possible EventEmitter memory ' +
'leak detected. %d listeners added. ' +
'Use emitter.setMaxListeners() to increase limit.',
this._events[type].length);
window.console && console.trace && console.trace();
}
}
return this;
};
EventEmitter.prototype.on = EventEmitter.prototype.addListener;
EventEmitter.prototype.once = function (type, listener) {
if (!isFunction(listener))
throw new TypeError('listener must be a function');
var fired = false;
function g() {
this.removeListener(type, g);
if (!fired) {
fired = true;
listener.apply(this, arguments);
}
}
g.listener = listener;
this.on(type, g);
return this;
};
// emits a 'removeListener' event iff the listener was removed
EventEmitter.prototype.removeListener = function (type, listener) {
var list, position, length, i;
if (!isFunction(listener))
throw new TypeError('listener must be a function');
if (!this._events || !this._events[type])
return this;
list = this._events[type];
length = list.length;
position = -1;
if (list === listener ||
(isFunction(list.listener) && list.listener === listener)) {
delete this._events[type];
if (this._events.removeListener)
this.emit('removeListener', type, listener);
} else if (isObject(list)) {
for (i = length; i-- > 0;) {
if (list[i] === listener ||
(list[i].listener && list[i].listener === listener)) {
position = i;
break;
}
}
if (position < 0)
return this;
if (list.length === 1) {
list.length = 0;
delete this._events[type];
} else {
list.splice(position, 1);
}
if (this._events.removeListener)
this.emit('removeListener', type, listener);
}
return this;
};
EventEmitter.prototype.removeAllListeners = function (type) {
var key, listeners;
if (!this._events)
return this;
// not listening for removeListener, no need to emit
if (!this._events.removeListener) {
if (arguments.length === 0)
this._events = {};
else if (this._events[type])
delete this._events[type];
return this;
}
// emit removeListener for all listeners on all events
if (arguments.length === 0) {
for (key in this._events) {
if (key !== 'removeListener') {
this.removeAllListeners(key);
}
}
this.removeAllListeners('removeListener');
this._events = {};
return this;
}
listeners = this._events[type];
if (isFunction(listeners)) {
this.removeListener(type, listeners);
} else {
// LIFO order
while (listeners.length)
this.removeListener(type, listeners[listeners.length - 1]);
}
delete this._events[type];
return this;
};
EventEmitter.prototype.listeners = function (type) {
var ret;
if (!this._events || !this._events[type])
ret = [];
else if (isFunction(this._events[type]))
ret = [this._events[type]];
else
ret = this._events[type].slice();
return ret;
};
EventEmitter.listenerCount = function (emitter, type) {
var ret;
if (!emitter._events || !emitter._events[type])
ret = 0;
else if (isFunction(emitter._events[type]))
ret = 1;
else
ret = emitter._events[type].length;
return ret;
};
function isFunction(arg) {
return typeof arg === 'function';
}
function isNumber(arg) {
return typeof arg === 'number';
}
function isObject(arg) {
return typeof arg === 'object' && arg !== null;
}
function isUndefined(arg) {
return arg === void 0;
}
return EventEmitter;
});

View file

@ -29,7 +29,7 @@
window.mochaRunner = mocha.run().on('end', function(){
window.mochaResults = this.stats;
});
})
});
</script>
</body>
</html>

View file

@ -1,5 +1,6 @@
define(function (require) {
var Courier = require('courier/courier');
var _ = require('lodash');
describe('Courier Module', function () {
@ -11,10 +12,6 @@ define(function (require) {
it('knows when a DataSource object has event listeners for the results event');
it('executes queries on the interval for searches that have listeners for results');
describe('::new', function () {
it('requires a config object which will be passed to the .');
});
describe('events', function () {
describe('error', function () {
it('emits when any request comes back with an error');
@ -36,7 +33,7 @@ define(function (require) {
describe('#fetchInterval', function () {
it('sets the interval in milliseconds that queries will be fetched', function () {
courier.fetchInterval(1000);
expect(courier._state()).to.have.property('fetchInterval', 1000);
expect(courier.fetchInterval()).to.eql(1000);
});
});
@ -45,15 +42,80 @@ define(function (require) {
var source = courier.define();
expect(source._state()).to.eql({});
});
it('optionally accepts a json object/string that will populate the DataSource object with settings');
it('optionally accepts a json object/string that will populate the DataSource object with settings', function () {
var savedState = JSON.stringify({
index: 'logstash-[YYYY-MM-DD]'
});
var source = courier.define(savedState);
expect(source + '').to.eql(savedState);
});
});
describe('#start', function () {
it('triggers a fetch and begins the fetch cycle');
});
describe('#stop', function () {
it('cancels current and future fetches');
});
});
describe('async API', function () {
describe('#fetch', function () {
it('crawls the DataSource objects which are listening for results.');
it('uses aggregated filter/aggs/etc to create serialized es-DSL request');
it('sends the serialized es-dsl requests in a single /msearch request.');
describe('source req tracking', function () {
it('updates the stored query when the data source is updated', function () {
var courier = new Courier();
var source = courier.define();
source.on('results', _.noop);
source.index('the index name');
expect(courier._getQueryForSource(source)).to.match(/the index name/);
});
});
describe('source merging', function () {
describe('basically', function () {
it('merges the state of one data source with it\'s parents', function () {
var courier = new Courier();
var root = courier.define()
.index('people')
.type('students')
.filter({
term: {
school: 'high school'
}
});
var math = courier.define()
.inherits(root)
.filter({
terms: {
classes: ['algebra', 'calculus', 'geometry'],
execution: 'or'
}
})
.on('results', _.noop);
expect(courier._writeQueryForSource(math))
.to.eql(JSON.stringify({
index: 'people',
type: 'students'
}) + '\n' +
JSON.stringify({
query: {
filtered: {
query: { match_all: {} },
filter: { bool: {
must: [
{ terms: { classes: ['algebra', 'calculus', 'geometry'], execution: 'or' } },
{ term: { school: 'high school' } }
]
} }
}
}
})
);
});
});
});
});