moved the courier to the same level as kibana, being that it really is external to the angular app

This commit is contained in:
Spencer Alger 2014-02-12 14:09:47 -07:00
parent 8e3aee427d
commit 082426acc9
16 changed files with 406 additions and 260 deletions

View file

@ -6,22 +6,18 @@
"main": "Gulpfile.js",
"dependencies": {},
"devDependencies": {
"connect": "~2.12.0",
"gulp": "~3.5.2",
"lodash": "~2.4.1",
"expect.js": "~0.2.0",
"gulp-jshint": "git://github.com/spenceralger/gulp-jshint.git#relative_jshintrc",
"jshint-stylish": "~0.1.5",
"grunt": "~0.4.2",
"grunt-contrib-connect": "~0.6.0",
"grunt-contrib-jshint": "~0.8.0",
"mocha": "~1.17.1",
"grunt-mocha": "~0.4.10",
"load-grunt-config": "~0.7.0",
"grunt-mocha": "~0.4.10"
"mocha": "~1.17.1"
},
"scripts": {
"test": "gulp test",
"server": "gulp server"
"test": "grunt test",
"server": "grunt server"
},
"repository": {
"type": "git",

50
scratch.js Normal file
View file

@ -0,0 +1,50 @@
/* jshint node: true */
var elasticsearch = require('../elasticsearch-js');
var async = require('async');
var es = new elasticsearch.Client({
host: 'localhost:9200',
sniffOnStart: true,
sniffInterval: 3000,
apiVersion: '1.0',
log: 'trace'
});
var rl = require('readline').createInterface({
input: process.stdin,
output: process.stdout,
terminal: true
});
async.series([
function (done) {
setTimeout(done, 50);
},
function (done) {
console.log(es.transport.connectionPool._conns.index);
es.indices.create({
index: 'index_name'
}, done);
},
function (done) {
rl.question('Is the master down?', function () {
done();
});
},
function (done) {
console.log(es.transport.connectionPool._conns.index);
es.search({ index: 'index_name' }, done);
},
function (done) {
rl.question('Is the slave down now?', function () {
es.search({ body: { query: { match_all: {} } } }, done);
});
},
function (done) {
rl.question('Is the master back up?', function () {
es.search({ body: { query: { match_all: {} } } }, done);
});
}
], function (err) {
console.log(err);
});

229
src/courier/courier.js Normal file
View file

@ -0,0 +1,229 @@
define(function (require) {
var DataSource = require('courier/data_source');
var EventEmitter = require('utils/event_emitter');
var inherits = require('utils/inherits');
var errors = require('courier/errors');
var _ = require('lodash');
var angular = require('angular');
function chain(cntx, method) {
return function () {
method.apply(cntx, arguments);
return this;
};
}
function mergeProp(state, filters, val, key) {
switch (key) {
case 'inherits':
// ignore
return;
case 'filter':
filters.push(val);
return;
case 'index':
case 'type':
if (key && state[key] == null) {
state[key] = val;
}
return;
case 'source':
key = '_source';
/* fall through */
}
if (key && state.body[key] == null) {
state.body[key] = val;
}
}
function flattenDataSource(source) {
var state = {
body: {}
};
// all of the filters from the source chain
var filters = [];
var collectProp = _.partial(mergeProp, state, filters);
// walk the chain and merge each property
var current = source;
var currentState;
while (current) {
currentState = current._state();
_.forOwn(currentState, collectProp);
current = currentState.inherits;
}
// defaults for the query
_.forOwn({
query: {
'match_all': {}
}
}, collectProp);
// switch to filtered query if there are filters
if (filters.length) {
state.body.query = {
filtered: {
query: state.body.query,
filter: {
bool: {
must: filters
}
}
}
};
}
return state;
}
function fetch(client, sources, cb) {
if (!client) {
this.emit('error', new Error('Courier does not have a client yet, unable to fetch queries.'));
return;
}
var all = [];
var body = '';
_.each(sources, function (source) {
all.push(source);
var state = flattenDataSource(source);
var header = JSON.stringify({
index: state.index,
type: state.type
});
var doc = JSON.stringify(state.body);
body += header + '\n' + doc + '\n';
});
return client.msearch({ body: body }, function (err, resp) {
if (err) return cb(err);
_.each(resp.responses, function (resp, i) {
sources[i].emit('results', resp);
});
cb(err, resp);
});
}
/**
* Federated query service, supports data sources that inherit properties
* from one another and automatically emit results.
* @param {object} config
* @param {Client} config.client - The elasticsearch.js client to use for querying. Should be setup and ready to go.
* @param {integer} [config.fetchInterval=30000] - The amount in ms between each fetch (deafult is 30 seconds)
*/
function Courier(config) {
if (!(this instanceof Courier)) return new Courier(config);
var opts = {
fetchInterval: 30000
};
var fetchTimer;
var activeRequest;
var sources = [];
function doFetch() {
if (!opts.client) {
this.emit('error', new Error('Courier does not have a client, pass it ' +
'in to the constructor or set it with the .client() method'));
return;
}
if (activeRequest) {
activeRequest.abort();
stopFetching();
this.emit('error', new errors.HastyRefresh());
return;
}
// we need to catch the original promise in order to keep it's abort method
activeRequest = fetch(opts.client, sources, function (err, resp) {
activeRequest = null;
setFetchTimeout();
if (err) {
window.console && console.log(err);
}
});
}
function setFetchTimeout() {
clearTimeout(fetchTimer);
if (opts.fetchInterval) {
fetchTimer = setTimeout(doFetch, opts.fetchInterval);
} else {
fetchTimer = null;
}
}
function stopFetching() {
clearTimeout(fetchTimer);
}
function startFetchingSource(source) {
var existing = _.find(sources, { source: source });
if (existing) return false;
sources.push(source);
}
function stopFetchingSource(source) {
var i = sources.indexOf(source);
if (i !== -1) {
sources.slice(i, 1);
}
if (sources.length === 0) stopFetching();
}
// is there a scheduled request?
function isStarted() {
return !!fetchTimer;
}
// chainable public api
this.isStarted = chain(this, isStarted);
this.start = chain(this, doFetch);
this.startFetchingSource = chain(this, startFetchingSource);
this.stop = chain(this, stopFetching);
this.stopFetchingSource = chain(this, stopFetchingSource);
this.close = chain(this, function stopFetchingAllSources() {
_.each(sources, stopFetchingSource);
});
// setter
this.client = chain(this, function (client) {
opts.client = client;
});
// setter/getter
this.fetchInterval = function (val) {
opts.fetchInterval = val;
if (isStarted()) setFetchTimeout();
return this;
};
// factory
this.createSource = function (state) {
return new DataSource(this, state);
};
// apply the passed in config
_.each(config || {}, function (val, key) {
if (typeof this[key] !== 'function') throw new TypeError('invalid config "' + key + '"');
this[key](val);
}, this);
}
// private api, exposed for testing
Courier._flattenDataSource = flattenDataSource;
inherits(Courier, EventEmitter);
return Courier;
});

View file

@ -24,6 +24,7 @@ define(function (require) {
'aggs',
'from',
'size',
'source',
'inherits'
];
@ -81,13 +82,13 @@ define(function (require) {
return _.keys(mapping);
});
};
this.extend = function () {
return courier.createSource().inherits(this);
};
// get/set internal state values
optionNames.forEach(function chainableOptions(name) {
optionNames.forEach(function (name) {
this[name] = function (val) {
if (val === void 0) {
return state[name];
}
state[name] = val;
return this;
};

12
src/courier/errors.js Normal file
View file

@ -0,0 +1,12 @@
define(function (require) {
function HastyRefresh() {
this.name = 'HastyRefresh';
this.message = 'Courier attempted to start a query before the previous had finished.';
}
HastyRefresh.prototype = new Error();
HastyRefresh.prototype.constructor = HastyRefresh;
return {
HastyRefresh: HastyRefresh
};
});

2
src/courier/test.html Normal file
View file

@ -0,0 +1,2 @@
<courier-test type="apache" fields="extension,response,request"></courier-test>
<courier-test type="nginx" fields=""></courier-test>

View file

@ -12,6 +12,8 @@
<script>require(['main'], function () {});</script>
</head>
<body>
<div ng-view></div>
<div ng-controller="Kibana">
<div ng-view></div>
</div>
</body>
</html>

View file

@ -0,0 +1,35 @@
define(function (require) {
var angular = require('angular');
angular.module('kibana/controllers')
.controller('Kibana', function (courier, $scope, $rootScope) {
$rootScope.dataSource = courier.createSource()
.index('logstash-2014.02.13')
.size(5);
setTimeout(courier.start, 15);
});
angular.module('kibana/directives')
.directive('courierTest', function () {
return {
restrict: 'E',
scope: {
type: '@'
},
controller: function ($rootScope, $scope) {
var source = $rootScope.dataSource.extend()
.type($scope.type)
.source({
include: 'country'
})
.on('results', function (resp) {
$scope.json = JSON.stringify(resp.hits, null, ' ');
});
$scope.$watch('type', source.type);
},
template: '<pre>{{json}}</pre>'
};
});
});

View file

@ -1,205 +0,0 @@
define(function (require) {
var DataSource = require('courier/data_source');
var _ = require('lodash');
var angular = require('angular');
require('courier/data_source');
require('courier/mapper');
var module = angular.module('kbn.services.courier', []);
module.directive('courierTest', function (Courier) {
var courier = Courier();
});
module.factory('Courier', function (es) {
var optionNames = [
'fetchInterval',
'client'
];
var mergeProp = function (state, filters, val, key) {
switch (key) {
case 'inherits':
// ignore
break;
case 'filter':
filters.push(val);
break;
case 'index':
case 'type':
if (key && state[key] == null) {
state[key] = val;
}
break;
default:
if (key && state.body[key] == null) {
state.body[key] = val;
}
break;
}
};
var flattenDataSource = function (source) {
var state = {
body: {}
};
// all of the filters from the source chain
var filters = [];
var collectProp = _.partial(mergeProp, state, filters);
// walk the chain and merge each property
var current = source;
var currentState;
while (current) {
currentState = current._state();
_.forOwn(currentState, collectProp);
current = currentState.inherits;
}
// defaults for the query
_.forOwn({
query: {
'match_all': {}
},
index: '_all',
type: '_all'
}, collectProp);
// switch to filtered query if there are filters
if (filters.length) {
state.body.query = {
filtered: {
query: state.body.query,
filter: {
bool: {
must: filters
}
}
}
};
}
return state;
};
function Courier(config) {
var opts = {};
var fetchTimer;
var activeRequest;
var sources = [];
var onFetch = _.bind(function () {
if (!opts.client) {
throw new Error('Courier does not have a client yet, unable to fetch queries');
}
var all = [];
var body = '';
_.each(sources, function (source) {
all.push(source);
var state = flattenDataSource(source);
var header = JSON.stringify({
index: state.index,
type: state.type
});
var body = JSON.stringify(state.body);
body += header + '\n' + body + '\n';
});
if (activeRequest) {
activeRequest.abort();
}
activeRequest = opts.client.msearch({
body: body
}).then(function (resp) {
_.each(resp.responses, function (resp, i) {
sources[i].emit('results', resp);
});
}, function (err) {
console.error(err);
});
}, this);
var setFetchTimeout = function () {
clearTimeout(fetchTimer);
if (opts.fetchInterval) {
fetchTimer = setTimeout(onFetch, opts.fetchInterval);
} else {
fetchTimer = null;
}
};
var stopFetching = function () {
clearTimeout(fetchTimer);
return this;
};
var startFetchingSource = function (source) {
var existing = _.find(sources, { source: source });
if (existing) return false;
sources.push(source);
return this;
};
var stopFetchingSource = function (source) {
var i = sources.indexOf(source);
if (i !== -1) {
sources.slice(i, 1);
}
if (sources.length === 0) stopFetching();
};
// public api
this.start = setFetchTimeout;
this.startFetchingSource = startFetchingSource;
this.stop = stopFetching;
this.stopFetchingSource = stopFetchingSource;
this.close = _.partial(_.each, sources, stopFetchingSource);
this.define = function (state) {
return new DataSource(this, state);
};
this.isStarted = function () {
return !!fetchTimer;
};
// chainable settings/getters for state stuff
optionNames.forEach(function chainableOptions(name) {
this[name] = function (val) {
if (val === void 0) {
return opts[name];
}
opts[name] = val;
switch (name) {
case 'fetchInterval':
if (fetchTimer) setFetchTimeout();
}
return this;
};
}, this);
// private api, exposed for testing
this._flattenDataSource = flattenDataSource;
this._getQueryForSource = function (source) {
var existing = _.find(sources, { source: source });
if (existing) return existing.req;
};
_.each(config || {}, function (val, key) {
if (typeof this[key] !== 'function') throw new TypeError('invalid config "' + key + '"');
this[key](val);
}, this);
}
return Courier;
});
});

View file

@ -11,13 +11,6 @@ define(function (require) {
require('elasticsearch');
require('angular-route');
var dependencies = [
'kibana',
'ngRoute'
];
var app = angular.module('kibana', dependencies);
// keep a reference to each module defined before boot, so that
// after boot it can define new features. Also serves as a flag.
var preBootModules = [];
@ -26,6 +19,21 @@ define(function (require) {
// features defined after boot
var registerFns = {};
var dependencies = [
'elasticsearch',
'kibana',
'ngRoute'
];
_('controllers directives factories services filters'.split(' '))
.map(function (type) { return 'kibana/' + type; })
.each(function (name) {
preBootModules.push(angular.module(name, []));
dependencies.push(name);
});
var app = angular.module('kibana', dependencies);
// This stores the Kibana revision number, @REV@ is replaced by grunt.
app.constant('kbnVersion', '@REV@');
@ -49,10 +57,9 @@ define(function (require) {
};
app.config(function ($routeProvider, $controllerProvider, $compileProvider, $filterProvider, $provide) {
$routeProvider
.when('/courier-test', {
templateUrl: 'kibana/partials/courier-test.html',
templateUrl: 'courier/test.html',
})
.otherwise({
redirectTo: 'courier-test'
@ -68,18 +75,11 @@ define(function (require) {
// load the core components
require([
'courier/courier'
'services/courier',
'services/es',
'controllers/kibana'
], function () {
var dependencies = [];
_('controllers directives factories services filters'.split(' '))
.map(function (type) { return 'kibana.' + type; })
.each(function (name) {
app.useModule(angular.module(name, []));
dependencies.push(name);
});
// bootstrap the app
$(function () {
angular

View file

@ -1 +0,0 @@
<courierTest/>

View file

@ -1,7 +1,9 @@
(function () {
var config = {
baseUrl: 'kibana',
paths: {},
paths: {
courier: '../courier'
},
shim: {
angular: {
deps: ['jquery'],
@ -17,7 +19,7 @@
'jquery',
'angular',
'angular-route',
'elasticsearch'
['elasticsearch', 'elasticsearch.angular']
];
bowerComponents.forEach(function (name) {
@ -30,7 +32,7 @@
}
config.paths[name] = path;
if (name.match(/^angular-/)) {
if (path.match(/angular/) && name !== 'angular') {
config.shim[name] = {
deps: ['angular']
};

View file

@ -0,0 +1,18 @@
define(function (require) {
var angular = require('angular');
var Courier = require('courier/courier');
angular.module('kibana/services')
.service('courier', function (es) {
var courier = new Courier({
fetchInterval: 15000,
client: es
});
courier.on('error', function (err) {
console.error(err);
});
return courier;
});
});

View file

@ -0,0 +1,8 @@
define(function (require) {
var angular = require('angular');
var module = angular.module('kibana/services');
module.service('es', function (esFactory) {
return esFactory();
});
});

View file

@ -68,7 +68,7 @@ define(function (require) {
source.on('results', _.noop);
source.index('the index name');
expect(courier._getQueryForSource(source)).to.match(/the index name/);
expect(Courier._flattenDataSource(source).index).to.eql('the index name');
});
});
@ -96,25 +96,22 @@ define(function (require) {
})
.on('results', _.noop);
expect(courier._writeQueryForSource(math))
.to.eql(JSON.stringify({
index: 'people',
type: 'students'
}) + '\n' +
JSON.stringify({
query: {
filtered: {
query: { match_all: {} },
filter: { bool: {
must: [
{ terms: { classes: ['algebra', 'calculus', 'geometry'], execution: 'or' } },
{ term: { school: 'high school' } }
]
} }
}
var query = Courier._flattenDataSource(math);
expect(query.index).to.eql('people');
expect(query.type).to.eql('students');
expect(query.body).to.eql({
query: {
filtered: {
query: { 'match_all': {} },
filter: { bool: {
must: [
{ terms: { classes: ['algebra', 'calculus', 'geometry'], execution: 'or' } },
{ term: { school: 'high school' } }
]
} }
}
})
);
}
});
});
});
});