Merge pull request #220 from spenceralger/segmented_search_2

[Discover] Segmented Request
This commit is contained in:
Spencer 2014-08-04 14:39:27 -07:00
commit 2d676771c5
14 changed files with 625 additions and 159 deletions

View file

@ -0,0 +1,66 @@
define(function () {
return function HitSortFnFactory() {
/**
* Creates a sort function that will resort hits based on the value
* es used to sort them.
*
* background:
* When a hit is sorted by elasticsearch, es will write the values that it used
* to sort them into an array at the top level of the hit like so
*
* ```
* hits: {
* total: x,
* hits: [
* {
* _id: i,
* _source: {},
* sort: [
* // all values used to sort, in the order of precidance
* ]
* }
* ]
* };
* ```
*
* @param {[type]} field [description]
* @param {[type]} direction [description]
* @return {[type]} [description]
*/
return function createHitSortFn(direction) {
var descending = (direction === 'desc');
return function sortHits(hitA, hitB) {
var bBelowa = null;
var aSorts = hitA.sort || [];
var bSorts = hitB.sort || [];
// walk each sort value, and compair until one is different
for (var i = 0; i < bSorts.length; i++) {
var a = aSorts[i];
var b = bSorts[i];
if (a == null || b > a) {
bBelowa = !descending;
break;
}
if (b < a) {
bBelowa = descending;
break;
}
}
if (bBelowa !== null) {
return bBelowa ? -1 : 1;
} else {
return 0;
}
};
};
};
});

View file

@ -0,0 +1,184 @@
define(function (require) {
return function DiscoverSegmentedFetch(es, Private, Promise, Notifier) {
var activeReq = null;
var getStateFromRequest = Private(require('components/courier/fetch/strategy/search')).getSourceStateFromRequest;
var _ = require('lodash');
var moment = require('moment');
var segmentedFetch = {};
var notify = new Notifier({
location: 'Segmented Fetch'
});
/**
* Fetch search results, but segment by index name.
*
* @param {object} opts
* @param {SearchSource} opts.searchSource - The searchSource to base the fetch on
* @param {number} opts.totalSize - The maximum number of rows that should be returned, as a sum of all segments
* @param {enum} opts.direction - The direction that indices should be fetched. When fetching time based data
* in decening order, this should be set to descending so that the data comes in its
* proper order, otherwize indices will be fetched ascending
*
* // all callbacks can return a promise to delay furthur processing
* @param {function} opts.first - a function that will be called for the first segment
* @param {function} opts.each - a function that will be called for each segment
* @param {function} opts.eachMerged - a function that will be called with the merged result on each segment
*
* @return {Promise}
*/
segmentedFetch.fetch = function (opts) {
var searchSource = opts.searchSource;
var direction = opts.direction;
var limitSize = false;
var remainingSize = false;
if (opts.totalSize) {
limitSize = true;
remainingSize = opts.totalSize;
}
var req = searchSource._createRequest();
req.moment = moment();
req.source.activeFetchCount += 1;
// track the req out of scope so that while we are itterating we can
// ensure we are still relevant
activeReq = req;
var queue = searchSource.get('index').toIndexList();
if (direction === 'desc') {
queue = queue.reverse();
}
var i = -1;
var merged = {
took: 0,
hits: {
hits: [],
total: 0,
max_score: 0
}
};
getStateFromRequest(req)
.then(function (state) {
return (function recurse() {
i++;
var index = queue.shift();
if (limitSize) {
state.body.size = remainingSize;
}
req.state = state;
return es.search({
index: index,
type: state.type,
ignoreUnavailable: true,
body: state.body
})
.catch(function (err) {
if (err.status === 403 && err.message.match(/ClusterBlockException.+index closed/)) {
return false;
} else {
throw err;
}
})
.then(function (resp) {
// abort if fetch is called twice quickly
if (req !== activeReq) return;
// a response was swallowed intentionally. Move to next index
if (!resp) {
if (queue.length) return recurse();
else return done();
}
var start; // promise that starts the chain
if (i > 0) {
start = Promise.resolve();
} else {
start = Promise.try(function () {
if (_.isFunction(opts.first)) {
return opts.first(resp, req);
}
});
}
if (limitSize) {
remainingSize -= resp.hits.hits.length;
}
return start.then(function () {
var prom = each(merged, resp);
return prom;
})
.then(function () {
if (_.isFunction(opts.each)) return opts.each(resp, req);
})
.then(function () {
var mergedCopy = _.omit(merged, '_bucketIndex');
req.resp = mergedCopy;
if (_.isFunction(opts.eachMerged)) {
// resolve with a "shallow clone" that omits the _aggIndex
// which helps with watchers and protects the index
return opts.eachMerged(mergedCopy, req);
}
})
.then(function () {
if (queue.length) return recurse();
return done();
});
});
}());
})
.then(req.defer.resolve, req.defer.reject);
function done() {
req.complete = true;
req.ms = req.moment.diff() * -1;
req.source.activeFetchCount -= 1;
return (i + 1);
}
return req.defer.promise;
};
function each(merged, resp) {
merged.took += resp.took;
merged.hits.total = Math.max(merged.hits.total, resp.hits.total);
merged.hits.max_score = Math.max(merged.hits.max_score, resp.hits.max_score);
[].push.apply(merged.hits.hits, resp.hits.hits);
if (!resp.aggregations) return;
// start merging aggregations
if (!merged.aggregations) {
merged.aggregations = {
_agg_0: {
buckets: []
}
};
merged._bucketIndex = {};
}
resp.aggregations._agg_0.buckets.forEach(function (bucket) {
var mbucket = merged._bucketIndex[bucket.key];
if (mbucket) {
mbucket.doc_count += bucket.doc_count;
return;
}
mbucket = merged._bucketIndex[bucket.key] = bucket;
merged.aggregations._agg_0.buckets.push(mbucket);
});
}
return segmentedFetch;
};
});

View file

@ -49,7 +49,10 @@ define(function (require) {
app.controller('discover', function ($scope, config, courier, $route, $window, savedSearches, savedVisualizations,
Notifier, $location, globalState, AppState, timefilter, AdhocVis, Promise) {
Notifier, $location, globalState, AppState, timefilter, AdhocVis, Promise, Private) {
var segmentedFetch = $scope.segmentedFetch = Private(require('apps/discover/_segmented_fetch'));
var HitSortFn = Private(require('apps/discover/_hit_sort_fn'));
var notify = new Notifier({
location: 'Discover'
@ -167,42 +170,9 @@ define(function (require) {
notify.error('An error occured with your request. Reset your inputs and try again.');
}).catch(notify.fatal);
// Bind a result handler. Any time searchSource.fetch() is executed this gets called
// with the results
$scope.searchSource.onResults().then(function onResults(resp) {
var complete = notify.event('on results');
$scope.hits = resp.hits.total;
$scope.rows = resp.hits.hits;
var counts = $scope.rows.fieldCounts = {};
$scope.rows.forEach(function (hit) {
hit._formatted = _.mapValues(hit._source, function (value, name) {
// add up the counts for each field name
if (counts[name]) counts[name] = counts[name] + 1;
else counts[name] = 1;
return ($scope.formatsByName[name] || defaultFormat).convert(value);
});
hit._formatted._source = angular.toJson(hit._source);
});
// ensure that the meta fields always have a "row count" equal to the number of rows
metaFields.forEach(function (fieldName) {
counts[fieldName] = $scope.rows.length;
});
// apply the field counts to the field list
$scope.fields.forEach(function (field) {
field.rowCount = counts[field.name] || 0;
});
complete();
return $scope.searchSource.onResults().then(onResults);
}).catch(function (err) {
console.log('An error', err);
});
return setupVisualization().then(function () {
$scope.updateTime();
init.complete = true;
$scope.$emit('application.load');
});
});
@ -225,16 +195,107 @@ define(function (require) {
};
$scope.opts.fetch = $scope.fetch = function () {
// ignore requests to fetch before the app inits
if (!init.complete) return;
$scope.updateTime();
$scope.updateDataSource()
.then(setupVisualization)
.then(function () {
$state.commit();
courier.fetch();
var sort = $state.sort;
var timeField = $scope.searchSource.get('index').timeFieldName;
var totalSize = $scope.size || 500;
/**
* Basically an emum.
*
* opts:
* "time" - sorted by the timefield
* "non-time" - explicitly sorted by a non-time field, NOT THE SAME AS `sortBy !== "time"`
* "implicit" - no sorting set, NOT THE SAME AS "non-time"
*
* @type {String}
*/
var sortBy = (function () {
if (!_.isArray(sort)) return 'implicit';
else if (sort[0] === timeField) return 'time';
else return 'non-time';
}());
var sortFn = null;
if (sortBy === 'non-time') {
sortFn = new HitSortFn(sort[1]);
}
var eventComplete = notify.event('segmented fetch');
return segmentedFetch.fetch({
searchSource: $scope.searchSource,
totalSize: sortBy === 'non-time' ? false : totalSize,
direction: sortBy === 'time' ? sort[1] : 'desc',
first: function (resp) {
$scope.hits = resp.hits.total;
$scope.rows = [];
$scope.rows.fieldCounts = {};
},
each: notify.timed('handle each segment', function (resp, req) {
var rows = $scope.rows;
var counts = rows.fieldCounts;
// merge the rows and the hits, use a new array to help watchers
rows = $scope.rows = rows.concat(resp.hits.hits);
rows.fieldCounts = counts;
if (sortFn) {
rows.sort(sortFn);
rows = $scope.rows = rows.slice(0, totalSize);
counts = rows.fieldCounts = {};
}
$scope.rows.forEach(function (hit) {
// when we are resorting on each segment we need to rebuild the
// counts each time
if (sortFn && hit._formatted) return;
hit._formatted = _.mapValues(hit._source, function (value, name) {
// add up the counts for each field name
if (counts[name]) counts[name] = counts[name] + 1;
else counts[name] = 1;
return ($scope.formatsByName[name] || defaultFormat).convert(value);
});
hit._formatted._source = angular.toJson(hit._source);
});
// ensure that the meta fields always have a "row count" equal to the number of rows
metaFields.forEach(function (fieldName) {
counts[fieldName] = $scope.rows.length;
});
// apply the field counts to the field list
$scope.fields.forEach(function (field) {
field.rowCount = counts[field.name] || 0;
});
}),
eachMerged: function (merged) {
$scope.mergedEsResp = merged;
}
})
.finally(eventComplete);
})
.catch(notify.error);
};
// we use a custom fetch mechanism, so tie into the courier's looper
courier.searchLooper.add($scope.fetch);
$scope.$on('$destroy', function () {
courier.searchLooper.remove($scope.fetch);
});
$scope.updateTime = function () {
$scope.timeRange = {
from: datemath.parse(timefilter.time.from),

View file

@ -53,7 +53,6 @@
</div>
</div>
<div class="col-md-10" ng-show="hits">
<div class="discover-timechart" ng-if="vis">
<center class="small">
<span tooltip="To change the time, click the clock icon in the navigation bar">{{timeRange.from | moment}} - {{timeRange.to | moment}}</span>
@ -66,12 +65,11 @@
>
</select> -->
</center>
<div ng-show="searchSource.activeFetchCount" class="discover-overlay">
<div class="spinner large">
</div>
</div>
<visualize vis="vis"></visualize>
<visualize vis="vis" es-resp="mergedEsResp"></visualize>
</div>
<div class="discover-table"
fixed-scroll='table'

View file

@ -15,9 +15,10 @@ define(function (require) {
restrict: 'E',
scope : {
vis: '=',
esResp: '=?'
},
template: require('text!apps/visualize/partials/visualize.html'),
link: function ($scope, $el) {
link: function ($scope, $el, attr) {
var chart; // set in "vis" watcher
var notify = createNotifier();
@ -47,6 +48,7 @@ define(function (require) {
var render = function () {
applyClassNames();
if (chart && $scope.chartData && !$scope.onlyShowSpy) {
notify.event('call chart render', function () {
chart.render($scope.chartData);
@ -95,17 +97,30 @@ define(function (require) {
});
if (!attr.esResp) {
// fetch the response ourselves if it's not provided
vis.searchSource.onResults(function onResults(resp) {
$scope.esResp = resp;
}).catch(notify.fatal);
}
vis.searchSource.onResults(function onResults(resp) {
$scope.chartData = vis.buildChartDataFromResponse(vis.searchSource.get('index'), resp);
render();
}).catch(notify.fatal);
vis.searchSource.onError(notify.error).catch(notify.fatal);
$scope.$root.$broadcast('ready:vis');
});
$scope.$watch('esResp', function (resp, prevResp) {
if (!resp) return;
var vis = $scope.vis;
var source = vis.searchSource;
$scope.chartData = vis.buildChartDataFromResponse($scope.vis.searchSource.get('index'), resp);
});
$scope.$watch('chartData', render);
$scope.$on('resize', function () {
var old;
(function waitForAnim() {

View file

@ -15,8 +15,8 @@ define(function (require) {
var resp = entry.resp;
var meta = [];
if (entry.moment) meta.push(['Completion Time', entry.moment.format(config.get('dateFormat'))]);
if (resp && resp.took != null) meta.push(['Duration', resp.took + 'ms']);
if (resp && resp.took != null) meta.push(['Query Duration', resp.took + 'ms']);
if (entry && entry.ms != null) meta.push(['Request Duration', entry.ms + 'ms']);
if (resp && resp.hits) meta.push(['Hits', resp.hits.total]);
if (state.index) meta.push(['Index', state.index]);

View file

@ -14,8 +14,9 @@ define(function (require) {
var SearchSource = Private(require('components/courier/data_source/search_source'));
var pendingRequests = Private(require('components/courier/_pending_requests'));
var searchLooper = Private(require('components/courier/looper/search'));
var docLooper = Private(require('components/courier/looper/doc'));
var docLooper = courier.docLooper = Private(require('components/courier/looper/doc'));
var searchLooper = courier.searchLooper = Private(require('components/courier/looper/search'));
// expose some internal modules
courier.setRootSearchSource = Private(require('components/courier/data_source/_root_search_source')).set;

View file

@ -107,7 +107,8 @@ define(function (require) {
SourceAbstract.prototype.onResults = function (handler) {
var source = this;
return new PromiseEmitter(function (resolve, reject, defer) {
source._createRequest(defer);
var req = source._createRequest(defer);
pendingRequests.push(req);
}, handler);
};
@ -141,6 +142,7 @@ define(function (require) {
var source = this;
var req = source._createRequest();
pendingRequests.push(req);
// fetch just the requests for this source
fetch.these(source._getType(), pendingRequests.splice(0).filter(function (req) {
@ -189,7 +191,6 @@ define(function (require) {
this.history.splice(20);
}
pendingRequests.push(req);
return req;
};

View file

@ -65,6 +65,7 @@ define(function (require) {
var sendResponse = function (req, resp) {
req.complete = true;
req.resp = resp;
req.ms = req.moment.diff() * -1;
req.source.activeFetchCount -= 1;
if (resp.error) return reqErrHandler.handle(req, new errors.FetchFailure(resp));

View file

@ -5,7 +5,7 @@ define(function (require) {
function Looper(ms, fn) {
var _ms = ms === void 0 ? 1500 : ms;
var _fn = fn || _.noop;
var _fns = [fn || _.noop];
var _timerId;
var _started = false;
var looper = this;
@ -30,8 +30,21 @@ define(function (require) {
* @param {function} fn
* @chainable
*/
looper.fn = function (fn) {
_fn = fn;
looper.add = function (fn) {
_fns.push(fn);
return this;
};
/**
* Set the function that will be executed at the
* end of each looper.
*
* @param {function} fn
* @chainable
*/
looper.remove = function (fn) {
var i = _fns.indexOf(fn);
if (i > -1) _fns.splice(i, 1);
return this;
};
@ -94,7 +107,7 @@ define(function (require) {
*/
looper._looperOver = function () {
try {
_fn();
_.callEach(_fns);
} catch (e) {
looper.stop();
if (typeof console === 'undefined' || !console.error) {

View file

@ -71,115 +71,147 @@ define(function (require) {
* Functionality to check that
*/
function Notifier(opts) {
var notif = this;
var self = this;
opts = opts || {};
// label type thing to say where notifications came from
notif.from = opts.location;
self.from = opts.location;
// attach the global notification list
notif._notifs = notifs;
/**
* Log a sometimes redundant event
* @param {string} name - The name of the group
* @param {boolean} success - Simple flag stating whether the event succeeded
*/
notif.event = createGroupLogger('event', {
open: true
});
/**
* Log a major, important, event in the lifecycle of the application
* @param {string} name - The name of the lifecycle event
* @param {boolean} success - Simple flag stating whether the lifecycle event succeeded
*/
notif.lifecycle = createGroupLogger('lifecycle', {
open: true
});
/**
* Kill the page, and display an error
* @param {Error} err - The fatal error that occured
*/
notif.fatal = function (err) {
if (firstFatal) {
firstFatal = false;
window.addEventListener('hashchange', function () {
window.location.reload();
});
}
var html = fatalToastTemplate({
msg: formatMsg(err, notif.from),
stack: err.stack
});
var $container = $('#fatal-splash-screen');
if ($container.size()) {
$container.append(html);
return;
}
// in case the app has not completed boot
$(document.body)
.removeAttr('ng-cloak')
.html('<div id="fatal-splash-screen" class="container-fuild">' + html + '</div>');
console.error(err.stack);
throw err;
};
/**
* Alert the user of an error that occured
* @param {Error|String} err
*/
notif.error = function (err, cb) {
add({
type: 'danger',
content: formatMsg(err, notif.from),
icon: 'warning',
title: 'Error',
lifetime: Infinity,
actions: ['report', 'accept'],
stack: err.stack
}, cb);
};
/**
* Warn the user abort something
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
notif.warning = function (msg, cb) {
add({
type: 'warning',
content: formatMsg(msg, notif.from),
icon: 'warning',
title: 'Warning',
lifetime: 10000,
actions: ['accept']
}, cb);
};
/**
* Display a debug message
* @param {String} msg [description]
* @return {[type]} [description]
*/
notif.info = function (msg, cb) {
add({
type: 'info',
content: formatMsg(msg),
icon: 'info-circle',
title: 'Debug',
lifetime: 5000,
actions: ['accept']
}, cb);
};
'event lifecycle timed fatal error warning info'.split(' ')
.forEach(_.limit(_.bind(_.bindKey, self), 1));
}
// simply a pointer to the global notif list
Notifier.prototype._notifs = notifs;
/**
* Log a sometimes redundant event
* @param {string} name - The name of the group
* @param {boolean} success - Simple flag stating whether the event succeeded
*/
Notifier.prototype.event = createGroupLogger('event', {
open: true
});
/**
* Log a major, important, event in the lifecycle of the application
* @param {string} name - The name of the lifecycle event
* @param {boolean} success - Simple flag stating whether the lifecycle event succeeded
*/
Notifier.prototype.lifecycle = createGroupLogger('lifecycle', {
open: true
});
/**
* Wrap a function so that it's execution time gets logged.
*
* @param {function} fn - the function to wrap, it's .name property is
* read so make sure to set it
* @return {function} - the wrapped function
*/
Notifier.prototype.timed = function (name, fn) {
var self = this;
return function WrappedNotifierFunction() {
var complete = self.event(name);
fn.apply(this, arguments);
complete();
};
};
/**
* Kill the page, display an error, then throw the error.
* Used as a last-resort error back in many promise chains
* so it rethrows the error that's displayed on the page.
*
* @param {Error} err - The error that occured
*/
Notifier.prototype.fatal = function (err) {
this._showFatal(err);
throw err;
};
/**
* Display an error that destroys the entire app. Broken out so that
* global error handlers can display fatal errors without throwing another
* error like in #fatal()
*
* @param {Error} err - The fatal error that occured
*/
Notifier.prototype._showFatal = function (err) {
if (firstFatal) {
firstFatal = false;
window.addEventListener('hashchange', function () {
window.location.reload();
});
}
var html = fatalToastTemplate({
msg: formatMsg(err, this.from),
stack: err.stack
});
var $container = $('#fatal-splash-screen');
if ($container.size()) {
$container.append(html);
return;
}
// in case the app has not completed boot
$(document.body)
.removeAttr('ng-cloak')
.html('<div id="fatal-splash-screen" class="container-fuild">' + html + '</div>');
console.error(err.stack);
};
/**
* Alert the user of an error that occured
* @param {Error|String} err
*/
Notifier.prototype.error = function (err, cb) {
add({
type: 'danger',
content: formatMsg(err, this.from),
icon: 'warning',
title: 'Error',
lifetime: Infinity,
actions: ['report', 'accept'],
stack: err.stack
}, cb);
};
/**
* Warn the user abort something
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
Notifier.prototype.warning = function (msg, cb) {
add({
type: 'warning',
content: formatMsg(msg, this.from),
icon: 'warning',
title: 'Warning',
lifetime: 10000,
actions: ['accept']
}, cb);
};
/**
* Display a debug message
* @param {String} msg [description]
* @return {[type]} [description]
*/
Notifier.prototype.info = function (msg, cb) {
add({
type: 'info',
content: formatMsg(msg),
icon: 'info-circle',
title: 'Debug',
lifetime: 5000,
actions: ['accept']
}, cb);
};
Notifier.prototype.log = log;
// set the timer functions that all notification managers will use

View file

@ -49,6 +49,23 @@ define(function (require) {
// https://github.com/angular/angular.js/blob/58f5da86645990ef984353418cd1ed83213b111e/src/ng/q.js#L335
return obj && typeof obj.then === 'function';
};
Promise.try = function (fn, args, ctx) {
if (typeof fn !== 'function') {
return Promise.reject('fn must be a function');
}
var value;
if (_.isArray(args)) {
try { value = fn.apply(ctx, args); }
catch (e) { return Promise.reject(e); }
} else {
try { value = fn.call(ctx, args); }
catch (e) { return Promise.reject(e); }
}
return Promise.resolve(value);
};
return Promise;
});
@ -101,9 +118,11 @@ define(function (require) {
* });
* ```
*
* @param {Function} fn - Used to init the promise, and call either reject or resolve (passed as args)
* @param {Function} fn - Used to init the promise, and call either
* reject or resolve (passed as args)
* @param {Function} handler - A function that will be called every
* time this promise is resolved
*
* @return {Promise}
*/
function PromiseEmitter(fn, handler) {

View file

@ -60,7 +60,7 @@
require([
'kibana',
'sinon/sinon',
//'specs/apps/dashboard/directives/panel',
'specs/apps/discover/hit_sort_fn',
'specs/directives/timepicker',
'specs/directives/truncate',
'specs/directives/css_truncate',

View file

@ -0,0 +1,75 @@
define(function (require) {
var _ = require('lodash');
require('angular').module('hitSortFunctionTests', ['kibana']);
describe('hit sort function', function () {
var createHitSortFn;
beforeEach(module('hitSortFunctionTests'));
beforeEach(inject(function (Private) {
createHitSortFn = Private(require('apps/discover/_hit_sort_fn'));
}));
var runSortTest = function (dir, sortOpts) {
var groupSize = _.random(10, 30);
var total = sortOpts.length * groupSize;
var hits = new Array(total);
sortOpts = sortOpts.map(function (opt) {
if (_.isArray(opt)) return opt;
else return [opt];
});
var sortOptLength = sortOpts.length;
for (var i = 0; i < hits.length; i++) {
hits[i] = {
_source: {},
sort: sortOpts[i % sortOptLength]
};
}
hits.sort(createHitSortFn(dir))
.forEach(function (hit, i, hits) {
var group = Math.floor(i / groupSize);
expect(hit.sort).to.eql(sortOpts[group]);
});
};
it('sorts a list of hits in ascending order', function () {
runSortTest('asc', [200, 404, 500]);
});
it('sorts a list of hits in descending order', function () {
runSortTest('desc', [10, 3, 1]);
});
it('breaks ties in ascending order', function () {
runSortTest('asc', [
[ 'apache', 200, 'facebook.com' ],
[ 'apache', 200, 'twitter.com' ],
[ 'apache', 301, 'facebook.com' ],
[ 'apache', 301, 'twitter.com' ],
[ 'nginx', 200, 'facebook.com' ],
[ 'nginx', 200, 'twitter.com' ],
[ 'nginx', 301, 'facebook.com' ],
[ 'nginx', 301, 'twitter.com' ]
]);
});
it('breaks ties in descending order', function () {
runSortTest('desc', [
[ 'nginx', 301, 'twitter.com' ],
[ 'nginx', 301, 'facebook.com' ],
[ 'nginx', 200, 'twitter.com' ],
[ 'nginx', 200, 'facebook.com' ],
[ 'apache', 301, 'twitter.com' ],
[ 'apache', 301, 'facebook.com' ],
[ 'apache', 200, 'twitter.com' ],
[ 'apache', 200, 'facebook.com' ]
]);
});
});
});