mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
* Merge search source and abstract and es6ify SearchSource class * Remove duplicate function and rename SourceAbstracts to SearchSource
This commit is contained in:
parent
74982f83bd
commit
2ce03c4e83
2 changed files with 595 additions and 627 deletions
|
@ -1,397 +0,0 @@
|
|||
import _ from 'lodash';
|
||||
import angular from 'angular';
|
||||
|
||||
import 'ui/promises';
|
||||
|
||||
import { requestQueue } from '../_request_queue';
|
||||
import { FetchSoonProvider } from '../fetch';
|
||||
import { FieldWildcardProvider } from '../../field_wildcard';
|
||||
import { getHighlightRequest } from '../../../../core_plugins/kibana/common/highlight';
|
||||
import { BuildESQueryProvider } from './build_query';
|
||||
|
||||
export function AbstractDataSourceProvider(Private, Promise, PromiseEmitter, config) {
|
||||
const fetchSoon = Private(FetchSoonProvider);
|
||||
const buildESQuery = Private(BuildESQueryProvider);
|
||||
const { fieldWildcardFilter } = Private(FieldWildcardProvider);
|
||||
const getConfig = (...args) => config.get(...args);
|
||||
|
||||
function SourceAbstract(initialState) {
|
||||
const self = this;
|
||||
self._instanceid = _.uniqueId('data_source');
|
||||
|
||||
self._state = (function () {
|
||||
// state can be serialized as JSON, and passed back in to restore
|
||||
if (initialState) {
|
||||
if (typeof initialState === 'string') {
|
||||
return JSON.parse(initialState);
|
||||
} else {
|
||||
return _.cloneDeep(initialState);
|
||||
}
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
}());
|
||||
|
||||
// set internal state values
|
||||
self._methods.forEach(function (name) {
|
||||
self[name] = function (val) {
|
||||
if (val == null) {
|
||||
delete self._state[name];
|
||||
} else {
|
||||
self._state[name] = val;
|
||||
}
|
||||
|
||||
return self;
|
||||
};
|
||||
});
|
||||
|
||||
self.history = [];
|
||||
self._requestStartHandlers = [];
|
||||
}
|
||||
|
||||
/*****
|
||||
* PUBLIC API
|
||||
*****/
|
||||
|
||||
/**
|
||||
* Get values from the state
|
||||
* @param {string} name - The name of the property desired
|
||||
* @return {any} - the value found
|
||||
*/
|
||||
SourceAbstract.prototype.get = function (name) {
|
||||
let self = this;
|
||||
while (self) {
|
||||
if (self._state[name] !== void 0) return self._state[name];
|
||||
self = self.getParent();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Get the value from our own state, don't traverse up the chain
|
||||
* @param {string} name - The name of the property desired
|
||||
* @return {any} - the value found
|
||||
*/
|
||||
SourceAbstract.prototype.getOwn = function (name) {
|
||||
if (this._state[name] !== void 0) return this._state[name];
|
||||
};
|
||||
|
||||
/**
|
||||
* Change the entire state of a SourceAbstract
|
||||
* @param {object|string} state - The SourceAbstract's new state, or a
|
||||
* string of the state value to set
|
||||
*/
|
||||
SourceAbstract.prototype.set = function (state, val) {
|
||||
const self = this;
|
||||
|
||||
if (typeof state === 'string') {
|
||||
// the getter and setter methods check for undefined explicitly
|
||||
// to identify getters and null to identify deletion
|
||||
if (val === undefined) {
|
||||
val = null;
|
||||
}
|
||||
self[state](val);
|
||||
} else {
|
||||
self._state = state;
|
||||
}
|
||||
return self;
|
||||
};
|
||||
|
||||
/**
|
||||
* Create a new dataSource object of the same type
|
||||
* as this, which inherits this dataSource's properties
|
||||
* @return {SourceAbstract}
|
||||
*/
|
||||
SourceAbstract.prototype.extend = function () {
|
||||
return (new this.Class()).inherits(this);
|
||||
};
|
||||
|
||||
/**
|
||||
* return a simple, encodable object representing the state of the SourceAbstract
|
||||
* @return {[type]} [description]
|
||||
*/
|
||||
SourceAbstract.prototype.toJSON = function () {
|
||||
return _.clone(this._state);
|
||||
};
|
||||
|
||||
/**
|
||||
* Create a string representation of the object
|
||||
* @return {[type]} [description]
|
||||
*/
|
||||
SourceAbstract.prototype.toString = function () {
|
||||
return angular.toJson(this.toJSON());
|
||||
};
|
||||
|
||||
/**
|
||||
* Put a request in to the courier that this Source should
|
||||
* be fetched on the next run of the courier
|
||||
* @return {Promise}
|
||||
*/
|
||||
SourceAbstract.prototype.onResults = function (handler) {
|
||||
const self = this;
|
||||
|
||||
return new PromiseEmitter(function (resolve, reject) {
|
||||
const defer = Promise.defer();
|
||||
defer.promise.then(resolve, reject);
|
||||
|
||||
const request = self._createRequest(defer);
|
||||
|
||||
request.setErrorHandler((request, error) => {
|
||||
reject(error);
|
||||
request.abort();
|
||||
});
|
||||
|
||||
}, handler);
|
||||
};
|
||||
|
||||
/**
|
||||
* Noop
|
||||
*/
|
||||
SourceAbstract.prototype.getParent = function () {
|
||||
return this._parent;
|
||||
};
|
||||
|
||||
/**
|
||||
* Fetch just this source ASAP
|
||||
*
|
||||
* ONLY USE IF YOU WILL BE USING THE RESULTS
|
||||
* provided by the returned promise, otherwise
|
||||
* call #fetchQueued()
|
||||
*
|
||||
* @async
|
||||
*/
|
||||
SourceAbstract.prototype.fetch = function () {
|
||||
const self = this;
|
||||
let req = _.first(self._myStartableQueued());
|
||||
|
||||
if (!req) {
|
||||
req = self._createRequest();
|
||||
}
|
||||
|
||||
fetchSoon.these([req]);
|
||||
|
||||
return req.getCompletePromise();
|
||||
};
|
||||
|
||||
/**
|
||||
* Fetch this source and reject the returned Promise on error
|
||||
*
|
||||
* Otherwise behaves like #fetch()
|
||||
*
|
||||
* @async
|
||||
*/
|
||||
SourceAbstract.prototype.fetchAsRejectablePromise = function () {
|
||||
const self = this;
|
||||
let req = _.first(self._myStartableQueued());
|
||||
|
||||
if (!req) {
|
||||
req = self._createRequest();
|
||||
}
|
||||
|
||||
req.setErrorHandler((request, error) => {
|
||||
request.defer.reject(error);
|
||||
request.abort();
|
||||
});
|
||||
|
||||
fetchSoon.these([req]);
|
||||
|
||||
return req.getCompletePromise();
|
||||
};
|
||||
|
||||
/**
|
||||
* Fetch all pending requests for this source ASAP
|
||||
* @async
|
||||
*/
|
||||
SourceAbstract.prototype.fetchQueued = function () {
|
||||
return fetchSoon.these(this._myStartableQueued());
|
||||
};
|
||||
|
||||
/**
|
||||
* Cancel all pending requests for this dataSource
|
||||
* @return {undefined}
|
||||
*/
|
||||
SourceAbstract.prototype.cancelQueued = function () {
|
||||
requestQueue
|
||||
.filter(req => req.source === this)
|
||||
.forEach(req => req.abort());
|
||||
};
|
||||
|
||||
/**
|
||||
* Completely destroy the SearchSource.
|
||||
* @return {undefined}
|
||||
*/
|
||||
SourceAbstract.prototype.destroy = function () {
|
||||
this.cancelQueued();
|
||||
this._requestStartHandlers.length = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Add a handler that will be notified whenever requests start
|
||||
* @param {Function} handler
|
||||
* @return {undefined}
|
||||
*/
|
||||
SourceAbstract.prototype.onRequestStart = function (handler) {
|
||||
this._requestStartHandlers.push(handler);
|
||||
};
|
||||
|
||||
/**
|
||||
* Called by requests of this search source when they are started
|
||||
* @param {Courier.Request} request
|
||||
* @return {Promise<undefined>}
|
||||
*/
|
||||
SourceAbstract.prototype.requestIsStarting = function (request) {
|
||||
this.activeFetchCount = (this.activeFetchCount || 0) + 1;
|
||||
this.history = [request];
|
||||
|
||||
return Promise
|
||||
.map(this._requestStartHandlers, fn => fn(this, request))
|
||||
.then(_.noop);
|
||||
};
|
||||
|
||||
/**
|
||||
* Called by requests of this search source when they are done
|
||||
* @param {Courier.Request} request
|
||||
* @return {undefined}
|
||||
*/
|
||||
SourceAbstract.prototype.requestIsStopped = function (/* request */) {
|
||||
this.activeFetchCount -= 1;
|
||||
};
|
||||
|
||||
/*****
|
||||
* PRIVATE API
|
||||
*****/
|
||||
|
||||
SourceAbstract.prototype._myStartableQueued = function () {
|
||||
return requestQueue
|
||||
.getStartable()
|
||||
.filter(req => req.source === this);
|
||||
};
|
||||
|
||||
SourceAbstract.prototype._createRequest = function () {
|
||||
throw new Error('_createRequest must be implemented by subclass');
|
||||
};
|
||||
|
||||
/**
|
||||
* Walk the inheritance chain of a source and return it's
|
||||
* flat representaion (taking into account merging rules)
|
||||
* @returns {Promise}
|
||||
* @resolved {Object|null} - the flat state of the SourceAbstract
|
||||
*/
|
||||
SourceAbstract.prototype._flatten = function () {
|
||||
const type = this._getType();
|
||||
|
||||
// the merged state of this dataSource and it's ancestors
|
||||
const flatState = {};
|
||||
|
||||
// function used to write each property from each state object in the chain to flat state
|
||||
const root = this;
|
||||
|
||||
// start the chain at this source
|
||||
let current = this;
|
||||
|
||||
// call the ittr and return it's promise
|
||||
return (function ittr() {
|
||||
// itterate the _state object (not array) and
|
||||
// pass each key:value pair to source._mergeProp. if _mergeProp
|
||||
// returns a promise, then wait for it to complete and call _mergeProp again
|
||||
return Promise.all(_.map(current._state, function ittr(value, key) {
|
||||
if (Promise.is(value)) {
|
||||
return value.then(function (value) {
|
||||
return ittr(value, key);
|
||||
});
|
||||
}
|
||||
|
||||
const prom = root._mergeProp(flatState, value, key);
|
||||
return Promise.is(prom) ? prom : null;
|
||||
}))
|
||||
.then(function () {
|
||||
// move to this sources parent
|
||||
const parent = current.getParent();
|
||||
// keep calling until we reach the top parent
|
||||
if (parent) {
|
||||
current = parent;
|
||||
return ittr();
|
||||
}
|
||||
});
|
||||
}())
|
||||
.then(function () {
|
||||
if (type === 'search') {
|
||||
// This is down here to prevent the circular dependency
|
||||
flatState.body = flatState.body || {};
|
||||
|
||||
const computedFields = flatState.index.getComputedFields();
|
||||
flatState.body.stored_fields = computedFields.storedFields;
|
||||
flatState.body.script_fields = flatState.body.script_fields || {};
|
||||
flatState.body.docvalue_fields = flatState.body.docvalue_fields || [];
|
||||
|
||||
_.extend(flatState.body.script_fields, computedFields.scriptFields);
|
||||
flatState.body.docvalue_fields = _.union(flatState.body.docvalue_fields, computedFields.docvalueFields);
|
||||
|
||||
if (flatState.body._source) {
|
||||
// exclude source fields for this index pattern specified by the user
|
||||
const filter = fieldWildcardFilter(flatState.body._source.excludes);
|
||||
flatState.body.docvalue_fields = flatState.body.docvalue_fields.filter(filter);
|
||||
}
|
||||
|
||||
// if we only want to search for certain fields
|
||||
const fields = flatState.fields;
|
||||
if (fields) {
|
||||
// filter out the docvalue_fields, and script_fields to only include those that we are concerned with
|
||||
flatState.body.docvalue_fields = _.intersection(flatState.body.docvalue_fields, fields);
|
||||
flatState.body.script_fields = _.pick(flatState.body.script_fields, fields);
|
||||
|
||||
// request the remaining fields from both stored_fields and _source
|
||||
const remainingFields = _.difference(fields, _.keys(flatState.body.script_fields));
|
||||
flatState.body.stored_fields = remainingFields;
|
||||
_.set(flatState.body, '_source.includes', remainingFields);
|
||||
}
|
||||
|
||||
flatState.body.query = buildESQuery(flatState.index, flatState.query, flatState.filters);
|
||||
|
||||
if (flatState.highlightAll != null) {
|
||||
if (flatState.highlightAll && flatState.body.query) {
|
||||
flatState.body.highlight = getHighlightRequest(flatState.body.query, getConfig);
|
||||
}
|
||||
delete flatState.highlightAll;
|
||||
}
|
||||
|
||||
/**
|
||||
* Translate a filter into a query to support es 3+
|
||||
* @param {Object} filter - The filter to translate
|
||||
* @return {Object} the query version of that filter
|
||||
*/
|
||||
const translateToQuery = function (filter) {
|
||||
if (!filter) return;
|
||||
|
||||
if (filter.query) {
|
||||
return filter.query;
|
||||
}
|
||||
|
||||
return filter;
|
||||
};
|
||||
|
||||
// re-write filters within filter aggregations
|
||||
(function recurse(aggBranch) {
|
||||
if (!aggBranch) return;
|
||||
Object.keys(aggBranch).forEach(function (id) {
|
||||
const agg = aggBranch[id];
|
||||
|
||||
if (agg.filters) {
|
||||
// translate filters aggregations
|
||||
const filters = agg.filters.filters;
|
||||
|
||||
Object.keys(filters).forEach(function (filterId) {
|
||||
filters[filterId] = translateToQuery(filters[filterId]);
|
||||
});
|
||||
}
|
||||
|
||||
recurse(agg.aggs || agg.aggregations);
|
||||
});
|
||||
}(flatState.body.aggs || flatState.body.aggregations));
|
||||
}
|
||||
|
||||
return flatState;
|
||||
});
|
||||
};
|
||||
|
||||
return SourceAbstract;
|
||||
}
|
|
@ -51,280 +51,645 @@
|
|||
*/
|
||||
|
||||
import _ from 'lodash';
|
||||
import angular from 'angular';
|
||||
|
||||
import 'ui/promises';
|
||||
|
||||
import { NormalizeSortRequestProvider } from './_normalize_sort_request';
|
||||
import { RootSearchSourceProvider } from './_root_search_source';
|
||||
import { AbstractDataSourceProvider } from './_abstract';
|
||||
import { SearchRequestProvider } from '../fetch/request';
|
||||
import { SegmentedRequestProvider } from '../fetch/request/segmented';
|
||||
|
||||
export function SearchSourceProvider(Promise, Private, config) {
|
||||
const SourceAbstract = Private(AbstractDataSourceProvider);
|
||||
import { requestQueue } from '../_request_queue';
|
||||
import { FetchSoonProvider } from '../fetch';
|
||||
import { FieldWildcardProvider } from '../../field_wildcard';
|
||||
import { getHighlightRequest } from '../../../../core_plugins/kibana/common/highlight';
|
||||
import { BuildESQueryProvider } from './build_query';
|
||||
|
||||
function parseInitialState(initialState) {
|
||||
if (!initialState) {
|
||||
return {};
|
||||
}
|
||||
|
||||
return typeof initialState === 'string' ?
|
||||
JSON.parse(initialState)
|
||||
: _.cloneDeep(initialState);
|
||||
}
|
||||
|
||||
function isIndexPattern(val) {
|
||||
return Boolean(val && typeof val.toIndexList === 'function');
|
||||
}
|
||||
|
||||
export function SearchSourceProvider(Promise, PromiseEmitter, Private, config) {
|
||||
const SearchRequest = Private(SearchRequestProvider);
|
||||
const SegmentedRequest = Private(SegmentedRequestProvider);
|
||||
const normalizeSortRequest = Private(NormalizeSortRequestProvider);
|
||||
const fetchSoon = Private(FetchSoonProvider);
|
||||
const buildESQuery = Private(BuildESQueryProvider);
|
||||
const { fieldWildcardFilter } = Private(FieldWildcardProvider);
|
||||
const getConfig = (...args) => config.get(...args);
|
||||
|
||||
const forIp = Symbol('for which index pattern?');
|
||||
|
||||
function isIndexPattern(val) {
|
||||
return Boolean(val && typeof val.toIndexList === 'function');
|
||||
}
|
||||
class SearchSource {
|
||||
constructor(initialState) {
|
||||
this._instanceid = _.uniqueId('data_source');
|
||||
|
||||
_.class(SearchSource).inherits(SourceAbstract);
|
||||
function SearchSource(initialState) {
|
||||
SearchSource.Super.call(this, initialState);
|
||||
}
|
||||
this._state = parseInitialState(initialState);
|
||||
|
||||
/*****
|
||||
* PUBLIC API
|
||||
*****/
|
||||
/**
|
||||
* List of the editable state properties that turn into a
|
||||
* chainable API
|
||||
*
|
||||
* @type {Array}
|
||||
*/
|
||||
this._methods = [
|
||||
'type',
|
||||
'query',
|
||||
'filter',
|
||||
'sort',
|
||||
'highlight',
|
||||
'highlightAll',
|
||||
'aggs',
|
||||
'from',
|
||||
'searchAfter',
|
||||
'size',
|
||||
'source',
|
||||
'version',
|
||||
'fields'
|
||||
];
|
||||
|
||||
/**
|
||||
* List of the editable state properties that turn into a
|
||||
* chainable API
|
||||
*
|
||||
* @type {Array}
|
||||
*/
|
||||
SearchSource.prototype._methods = [
|
||||
'type',
|
||||
'query',
|
||||
'filter',
|
||||
'sort',
|
||||
'highlight',
|
||||
'highlightAll',
|
||||
'aggs',
|
||||
'from',
|
||||
'searchAfter',
|
||||
'size',
|
||||
'source',
|
||||
'version',
|
||||
'fields'
|
||||
];
|
||||
// set internal state values
|
||||
this._methods.forEach(name => {
|
||||
this[name] = val => {
|
||||
if (val == null) {
|
||||
delete this._state[name];
|
||||
} else {
|
||||
this._state[name] = val;
|
||||
}
|
||||
|
||||
SearchSource.prototype.index = function (indexPattern) {
|
||||
const state = this._state;
|
||||
return this;
|
||||
};
|
||||
});
|
||||
|
||||
const hasSource = state.source;
|
||||
const sourceCameFromIp = hasSource && state.source.hasOwnProperty(forIp);
|
||||
const sourceIsForOurIp = sourceCameFromIp && state.source[forIp] === state.index;
|
||||
if (sourceIsForOurIp) {
|
||||
delete state.source;
|
||||
this.history = [];
|
||||
this._requestStartHandlers = [];
|
||||
|
||||
this._filterPredicates = [
|
||||
(filter) => {
|
||||
// remove null/undefined filters
|
||||
return filter;
|
||||
},
|
||||
(filter) => {
|
||||
const disabled = _.get(filter, 'meta.disabled');
|
||||
return disabled === undefined || disabled === false;
|
||||
},
|
||||
(filter, state) => {
|
||||
if (!config.get('courier:ignoreFilterIfFieldNotInIndex')) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if ('meta' in filter && 'index' in state) {
|
||||
const field = state.index.fields.byName[filter.meta.key];
|
||||
if (!field) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
];
|
||||
}
|
||||
|
||||
if (indexPattern === undefined) return state.index;
|
||||
if (indexPattern === null) return delete state.index;
|
||||
if (!isIndexPattern(indexPattern)) {
|
||||
throw new TypeError('expected indexPattern to be an IndexPattern duck.');
|
||||
/*****
|
||||
* PUBLIC API
|
||||
*****/
|
||||
|
||||
|
||||
index(indexPattern) {
|
||||
const state = this._state;
|
||||
|
||||
const hasSource = state.source;
|
||||
const sourceCameFromIp = hasSource && state.source.hasOwnProperty(forIp);
|
||||
const sourceIsForOurIp = sourceCameFromIp && state.source[forIp] === state.index;
|
||||
if (sourceIsForOurIp) {
|
||||
delete state.source;
|
||||
}
|
||||
|
||||
if (indexPattern === undefined) return state.index;
|
||||
if (indexPattern === null) return delete state.index;
|
||||
if (!isIndexPattern(indexPattern)) {
|
||||
throw new TypeError('expected indexPattern to be an IndexPattern duck.');
|
||||
}
|
||||
|
||||
state.index = indexPattern;
|
||||
if (!state.source) {
|
||||
// imply source filtering based on the index pattern, but allow overriding
|
||||
// it by simply setting another value for "source". When index is changed
|
||||
state.source = function () {
|
||||
return indexPattern.getSourceFiltering();
|
||||
};
|
||||
state.source[forIp] = indexPattern;
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
state.index = indexPattern;
|
||||
if (!state.source) {
|
||||
// imply source filtering based on the index pattern, but allow overriding
|
||||
// it by simply setting another value for "source". When index is changed
|
||||
state.source = function () {
|
||||
return indexPattern.getSourceFiltering();
|
||||
};
|
||||
state.source[forIp] = indexPattern;
|
||||
/**
|
||||
* Set a searchSource that this source should inherit from
|
||||
* @param {SearchSource} searchSource - the parent searchSource
|
||||
* @return {this} - chainable
|
||||
*/
|
||||
inherits(parent) {
|
||||
this._parent = parent;
|
||||
return this;
|
||||
}
|
||||
|
||||
return this;
|
||||
};
|
||||
/**
|
||||
* Get the parent of this SearchSource
|
||||
* @return {undefined|searchSource}
|
||||
*/
|
||||
getParent(onlyHardLinked) {
|
||||
const self = this;
|
||||
if (self._parent === false) return;
|
||||
if (self._parent) return self._parent;
|
||||
return onlyHardLinked ? undefined : Private(RootSearchSourceProvider).get();
|
||||
}
|
||||
|
||||
SearchSource.prototype.extend = function () {
|
||||
return (new SearchSource()).inherits(this);
|
||||
};
|
||||
/**
|
||||
* Temporarily prevent this Search from being fetched... not a fan but it's easy
|
||||
*/
|
||||
disable() {
|
||||
this._fetchDisabled = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a searchSource that this source should inherit from
|
||||
* @param {SearchSource} searchSource - the parent searchSource
|
||||
* @return {this} - chainable
|
||||
*/
|
||||
SearchSource.prototype.inherits = function (parent) {
|
||||
this._parent = parent;
|
||||
return this;
|
||||
};
|
||||
/**
|
||||
* Reverse of SearchSource#disable(), only need to call this if source was previously disabled
|
||||
*/
|
||||
enable() {
|
||||
this._fetchDisabled = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the parent of this SearchSource
|
||||
* @return {undefined|searchSource}
|
||||
*/
|
||||
SearchSource.prototype.getParent = function (onlyHardLinked) {
|
||||
const self = this;
|
||||
if (self._parent === false) return;
|
||||
if (self._parent) return self._parent;
|
||||
return onlyHardLinked ? undefined : Private(RootSearchSourceProvider).get();
|
||||
};
|
||||
onBeginSegmentedFetch(initFunction) {
|
||||
const self = this;
|
||||
return new Promise((resolve, reject) => {
|
||||
function addRequest() {
|
||||
const defer = Promise.defer();
|
||||
const req = new SegmentedRequest(self, defer, initFunction);
|
||||
|
||||
/**
|
||||
* Temporarily prevent this Search from being fetched... not a fan but it's easy
|
||||
*/
|
||||
SearchSource.prototype.disable = function () {
|
||||
this._fetchDisabled = true;
|
||||
};
|
||||
req.setErrorHandler((request, error) => {
|
||||
reject(error);
|
||||
request.abort();
|
||||
});
|
||||
|
||||
/**
|
||||
* Reverse of SourceAbstract#disable(), only need to call this if source was previously disabled
|
||||
*/
|
||||
SearchSource.prototype.enable = function () {
|
||||
this._fetchDisabled = false;
|
||||
};
|
||||
// Return promises created by the completion handler so that
|
||||
// errors will bubble properly
|
||||
return req.getCompletePromise().then(addRequest);
|
||||
}
|
||||
|
||||
SearchSource.prototype.onBeginSegmentedFetch = function (initFunction) {
|
||||
const self = this;
|
||||
return new Promise((resolve, reject) => {
|
||||
function addRequest() {
|
||||
addRequest();
|
||||
});
|
||||
}
|
||||
|
||||
addFilterPredicate(predicate) {
|
||||
this._filterPredicates.push(predicate);
|
||||
}
|
||||
|
||||
clone() {
|
||||
const clone = new SearchSource(this.toString());
|
||||
// when serializing the internal state with .toString() we lose the internal classes used in the index
|
||||
// pattern, so we have to set it again to workaround this behavior
|
||||
clone.set('index', this.get('index'));
|
||||
clone.inherits(this.getParent());
|
||||
return clone;
|
||||
}
|
||||
|
||||
async getSearchRequestBody() {
|
||||
const searchRequest = await this._flatten();
|
||||
return searchRequest.body;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by requests of this search source when they are done
|
||||
* @param {Courier.Request} request
|
||||
* @return {undefined}
|
||||
*/
|
||||
requestIsStopped() {
|
||||
this.activeFetchCount -= 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get values from the state
|
||||
* @param {string} name - The name of the property desired
|
||||
* @return {any} - the value found
|
||||
*/
|
||||
get(name) {
|
||||
let self = this;
|
||||
while (self) {
|
||||
if (self._state[name] !== void 0) return self._state[name];
|
||||
self = self.getParent();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the value from our own state, don't traverse up the chain
|
||||
* @param {string} name - The name of the property desired
|
||||
* @return {any} - the value found
|
||||
*/
|
||||
getOwn(name) {
|
||||
if (this._state[name] !== void 0) return this._state[name];
|
||||
}
|
||||
|
||||
/**
|
||||
* Change the entire state of a SearchSource
|
||||
* @param {object|string} state - The SearchSource's new state, or a
|
||||
* string of the state value to set
|
||||
*/
|
||||
set(state, val) {
|
||||
const self = this;
|
||||
|
||||
if (typeof state === 'string') {
|
||||
// the getter and setter methods check for undefined explicitly
|
||||
// to identify getters and null to identify deletion
|
||||
if (val === undefined) {
|
||||
val = null;
|
||||
}
|
||||
self[state](val);
|
||||
} else {
|
||||
self._state = state;
|
||||
}
|
||||
return self;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new dataSource object of the same type
|
||||
* as this, which inherits this dataSource's properties
|
||||
* @return {SearchSource}
|
||||
*/
|
||||
extend() {
|
||||
return new SearchSource().inherits(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* return a simple, encodable object representing the state of the SearchSource
|
||||
* @return {[type]} [description]
|
||||
*/
|
||||
toJSON = function () {
|
||||
return _.clone(this._state);
|
||||
};
|
||||
|
||||
/**
|
||||
* Create a string representation of the object
|
||||
* @return {[type]} [description]
|
||||
*/
|
||||
toString() {
|
||||
return angular.toJson(this.toJSON());
|
||||
}
|
||||
|
||||
/**
|
||||
* Put a request in to the courier that this Source should
|
||||
* be fetched on the next run of the courier
|
||||
* @return {Promise}
|
||||
*/
|
||||
onResults(handler) {
|
||||
const self = this;
|
||||
|
||||
return new PromiseEmitter(function (resolve, reject) {
|
||||
const defer = Promise.defer();
|
||||
const req = new SegmentedRequest(self, defer, initFunction);
|
||||
defer.promise.then(resolve, reject);
|
||||
|
||||
req.setErrorHandler((request, error) => {
|
||||
const request = self._createRequest(defer);
|
||||
|
||||
request.setErrorHandler((request, error) => {
|
||||
reject(error);
|
||||
request.abort();
|
||||
});
|
||||
|
||||
// Return promises created by the completion handler so that
|
||||
// errors will bubble properly
|
||||
return req.getCompletePromise().then(addRequest);
|
||||
}
|
||||
addRequest();
|
||||
});
|
||||
};
|
||||
|
||||
SearchSource.prototype.addFilterPredicate = function (predicate) {
|
||||
this._filterPredicates.push(predicate);
|
||||
};
|
||||
|
||||
/******
|
||||
* PRIVATE APIS
|
||||
******/
|
||||
|
||||
/**
|
||||
* Gets the type of the DataSource
|
||||
* @return {string}
|
||||
*/
|
||||
SearchSource.prototype._getType = function () {
|
||||
return 'search';
|
||||
};
|
||||
|
||||
/**
|
||||
* Create a common search request object, which should
|
||||
* be put into the pending request queye, for this search
|
||||
* source
|
||||
*
|
||||
* @param {Deferred} defer - the deferred object that should be resolved
|
||||
* when the request is complete
|
||||
* @return {SearchRequest}
|
||||
*/
|
||||
SearchSource.prototype._createRequest = function (defer) {
|
||||
return new SearchRequest(this, defer);
|
||||
};
|
||||
|
||||
/**
|
||||
* Used to merge properties into the state within ._flatten().
|
||||
* The state is passed in and modified by the function
|
||||
*
|
||||
* @param {object} state - the current merged state
|
||||
* @param {*} val - the value at `key`
|
||||
* @param {*} key - The key of `val`
|
||||
* @return {undefined}
|
||||
*/
|
||||
SearchSource.prototype._mergeProp = function (state, val, key) {
|
||||
if (typeof val === 'function') {
|
||||
const source = this;
|
||||
return Promise.cast(val(this))
|
||||
.then(function (newVal) {
|
||||
return source._mergeProp(state, newVal, key);
|
||||
});
|
||||
}
|
||||
|
||||
if (val == null || !key || !_.isString(key)) return;
|
||||
|
||||
switch (key) {
|
||||
case 'filter':
|
||||
let filters = Array.isArray(val) ? val : [val];
|
||||
|
||||
filters = filters.filter(filter => {
|
||||
return this._filterPredicates.every(predicate => predicate(filter, state));
|
||||
});
|
||||
|
||||
state.filters = [ ...state.filters || [], ...filters];
|
||||
return;
|
||||
case 'index':
|
||||
case 'type':
|
||||
case 'id':
|
||||
case 'highlightAll':
|
||||
if (key && state[key] == null) {
|
||||
state[key] = val;
|
||||
}
|
||||
return;
|
||||
case 'searchAfter':
|
||||
key = 'search_after';
|
||||
addToBody();
|
||||
break;
|
||||
case 'source':
|
||||
key = '_source';
|
||||
addToBody();
|
||||
break;
|
||||
case 'sort':
|
||||
val = normalizeSortRequest(val, this.get('index'));
|
||||
addToBody();
|
||||
break;
|
||||
case 'query':
|
||||
state.query = (state.query || []).concat(val);
|
||||
break;
|
||||
case 'fields':
|
||||
state[key] = _.uniq([...(state[key] || []), ...val]);
|
||||
break;
|
||||
default:
|
||||
addToBody();
|
||||
}, handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the key and val to the body of the request
|
||||
* Fetch just this source ASAP
|
||||
*
|
||||
* ONLY USE IF YOU WILL BE USING THE RESULTS
|
||||
* provided by the returned promise, otherwise
|
||||
* call #fetchQueued()
|
||||
*
|
||||
* @async
|
||||
*/
|
||||
function addToBody() {
|
||||
state.body = state.body || {};
|
||||
// ignore if we already have a value
|
||||
if (state.body[key] == null) {
|
||||
state.body[key] = val;
|
||||
fetch() {
|
||||
const self = this;
|
||||
let req = _.first(self._myStartableQueued());
|
||||
|
||||
if (!req) {
|
||||
req = self._createRequest();
|
||||
}
|
||||
|
||||
fetchSoon.these([req]);
|
||||
|
||||
return req.getCompletePromise();
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch this source and reject the returned Promise on error
|
||||
*
|
||||
* Otherwise behaves like #fetch()
|
||||
*
|
||||
* @async
|
||||
*/
|
||||
fetchAsRejectablePromise() {
|
||||
const self = this;
|
||||
let req = _.first(self._myStartableQueued());
|
||||
|
||||
if (!req) {
|
||||
req = self._createRequest();
|
||||
}
|
||||
|
||||
req.setErrorHandler((request, error) => {
|
||||
request.defer.reject(error);
|
||||
request.abort();
|
||||
});
|
||||
|
||||
fetchSoon.these([req]);
|
||||
|
||||
return req.getCompletePromise();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Fetch all pending requests for this source ASAP
|
||||
* @async
|
||||
*/
|
||||
fetchQueued() {
|
||||
return fetchSoon.these(this._myStartableQueued());
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel all pending requests for this dataSource
|
||||
* @return {undefined}
|
||||
*/
|
||||
cancelQueued() {
|
||||
requestQueue
|
||||
.filter(req => req.source === this)
|
||||
.forEach(req => req.abort());
|
||||
}
|
||||
|
||||
/**
|
||||
* Completely destroy the SearchSource.
|
||||
* @return {undefined}
|
||||
*/
|
||||
destroy() {
|
||||
this.cancelQueued();
|
||||
this._requestStartHandlers.length = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a handler that will be notified whenever requests start
|
||||
* @param {Function} handler
|
||||
* @return {undefined}
|
||||
*/
|
||||
onRequestStart(handler) {
|
||||
this._requestStartHandlers.push(handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by requests of this search source when they are started
|
||||
* @param {Courier.Request} request
|
||||
* @return {Promise<undefined>}
|
||||
*/
|
||||
requestIsStarting(request) {
|
||||
this.activeFetchCount = (this.activeFetchCount || 0) + 1;
|
||||
this.history = [request];
|
||||
|
||||
return Promise
|
||||
.map(this._requestStartHandlers, fn => fn(this, request))
|
||||
.then(_.noop);
|
||||
}
|
||||
|
||||
|
||||
/******
|
||||
* PRIVATE APIS
|
||||
******/
|
||||
|
||||
_myStartableQueued() {
|
||||
return requestQueue
|
||||
.getStartable()
|
||||
.filter(req => req.source === this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the type of the DataSource
|
||||
* @return {string}
|
||||
*/
|
||||
_getType() {
|
||||
return 'search';
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a common search request object, which should
|
||||
* be put into the pending request queye, for this search
|
||||
* source
|
||||
*
|
||||
* @param {Deferred} defer - the deferred object that should be resolved
|
||||
* when the request is complete
|
||||
* @return {SearchRequest}
|
||||
*/
|
||||
_createRequest(defer) {
|
||||
return new SearchRequest(this, defer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to merge properties into the state within ._flatten().
|
||||
* The state is passed in and modified by the function
|
||||
*
|
||||
* @param {object} state - the current merged state
|
||||
* @param {*} val - the value at `key`
|
||||
* @param {*} key - The key of `val`
|
||||
* @return {undefined}
|
||||
*/
|
||||
_mergeProp(state, val, key) {
|
||||
if (typeof val === 'function') {
|
||||
const source = this;
|
||||
return Promise.cast(val(this))
|
||||
.then(function (newVal) {
|
||||
return source._mergeProp(state, newVal, key);
|
||||
});
|
||||
}
|
||||
|
||||
if (val == null || !key || !_.isString(key)) return;
|
||||
|
||||
switch (key) {
|
||||
case 'filter':
|
||||
let filters = Array.isArray(val) ? val : [val];
|
||||
|
||||
filters = filters.filter(filter => {
|
||||
return this._filterPredicates.every(predicate => predicate(filter, state));
|
||||
});
|
||||
|
||||
state.filters = [...state.filters || [], ...filters];
|
||||
return;
|
||||
case 'index':
|
||||
case 'type':
|
||||
case 'id':
|
||||
case 'highlightAll':
|
||||
if (key && state[key] == null) {
|
||||
state[key] = val;
|
||||
}
|
||||
return;
|
||||
case 'searchAfter':
|
||||
key = 'search_after';
|
||||
addToBody();
|
||||
break;
|
||||
case 'source':
|
||||
key = '_source';
|
||||
addToBody();
|
||||
break;
|
||||
case 'sort':
|
||||
val = normalizeSortRequest(val, this.get('index'));
|
||||
addToBody();
|
||||
break;
|
||||
case 'query':
|
||||
state.query = (state.query || []).concat(val);
|
||||
break;
|
||||
case 'fields':
|
||||
state[key] = _.uniq([...(state[key] || []), ...val]);
|
||||
break;
|
||||
default:
|
||||
addToBody();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the key and val to the body of the request
|
||||
*/
|
||||
function addToBody() {
|
||||
state.body = state.body || {};
|
||||
// ignore if we already have a value
|
||||
if (state.body[key] == null) {
|
||||
state.body[key] = val;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
SearchSource.prototype._filterPredicates = [
|
||||
(filter) => {
|
||||
// remove null/undefined filters
|
||||
return filter;
|
||||
},
|
||||
(filter) => {
|
||||
const disabled = _.get(filter, 'meta.disabled');
|
||||
return disabled === undefined || disabled === false;
|
||||
},
|
||||
(filter, state) => {
|
||||
if (!config.get('courier:ignoreFilterIfFieldNotInIndex')) {
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* Walk the inheritance chain of a source and return it's
|
||||
* flat representaion (taking into account merging rules)
|
||||
* @returns {Promise}
|
||||
* @resolved {Object|null} - the flat state of the SearchSource
|
||||
*/
|
||||
_flatten() {
|
||||
const type = this._getType();
|
||||
|
||||
if ('meta' in filter && 'index' in state) {
|
||||
const field = state.index.fields.byName[filter.meta.key];
|
||||
if (!field) return false;
|
||||
}
|
||||
return true;
|
||||
// the merged state of this dataSource and it's ancestors
|
||||
const flatState = {};
|
||||
|
||||
// function used to write each property from each state object in the chain to flat state
|
||||
const root = this;
|
||||
|
||||
// start the chain at this source
|
||||
let current = this;
|
||||
|
||||
// call the ittr and return it's promise
|
||||
return (function ittr() {
|
||||
// itterate the _state object (not array) and
|
||||
// pass each key:value pair to source._mergeProp. if _mergeProp
|
||||
// returns a promise, then wait for it to complete and call _mergeProp again
|
||||
return Promise.all(_.map(current._state, function ittr(value, key) {
|
||||
if (Promise.is(value)) {
|
||||
return value.then(function (value) {
|
||||
return ittr(value, key);
|
||||
});
|
||||
}
|
||||
|
||||
const prom = root._mergeProp(flatState, value, key);
|
||||
return Promise.is(prom) ? prom : null;
|
||||
}))
|
||||
.then(function () {
|
||||
// move to this sources parent
|
||||
const parent = current.getParent();
|
||||
// keep calling until we reach the top parent
|
||||
if (parent) {
|
||||
current = parent;
|
||||
return ittr();
|
||||
}
|
||||
});
|
||||
}())
|
||||
.then(function () {
|
||||
if (type === 'search') {
|
||||
// This is down here to prevent the circular dependency
|
||||
flatState.body = flatState.body || {};
|
||||
|
||||
const computedFields = flatState.index.getComputedFields();
|
||||
flatState.body.stored_fields = computedFields.storedFields;
|
||||
flatState.body.script_fields = flatState.body.script_fields || {};
|
||||
flatState.body.docvalue_fields = flatState.body.docvalue_fields || [];
|
||||
|
||||
_.extend(flatState.body.script_fields, computedFields.scriptFields);
|
||||
flatState.body.docvalue_fields = _.union(flatState.body.docvalue_fields, computedFields.docvalueFields);
|
||||
|
||||
if (flatState.body._source) {
|
||||
// exclude source fields for this index pattern specified by the user
|
||||
const filter = fieldWildcardFilter(flatState.body._source.excludes);
|
||||
flatState.body.docvalue_fields = flatState.body.docvalue_fields.filter(filter);
|
||||
}
|
||||
|
||||
// if we only want to search for certain fields
|
||||
const fields = flatState.fields;
|
||||
if (fields) {
|
||||
// filter out the docvalue_fields, and script_fields to only include those that we are concerned with
|
||||
flatState.body.docvalue_fields = _.intersection(flatState.body.docvalue_fields, fields);
|
||||
flatState.body.script_fields = _.pick(flatState.body.script_fields, fields);
|
||||
|
||||
// request the remaining fields from both stored_fields and _source
|
||||
const remainingFields = _.difference(fields, _.keys(flatState.body.script_fields));
|
||||
flatState.body.stored_fields = remainingFields;
|
||||
_.set(flatState.body, '_source.includes', remainingFields);
|
||||
}
|
||||
|
||||
flatState.body.query = buildESQuery(flatState.index, flatState.query, flatState.filters);
|
||||
|
||||
if (flatState.highlightAll != null) {
|
||||
if (flatState.highlightAll && flatState.body.query) {
|
||||
flatState.body.highlight = getHighlightRequest(flatState.body.query, getConfig);
|
||||
}
|
||||
delete flatState.highlightAll;
|
||||
}
|
||||
|
||||
/**
|
||||
* Translate a filter into a query to support es 3+
|
||||
* @param {Object} filter - The filter to translate
|
||||
* @return {Object} the query version of that filter
|
||||
*/
|
||||
const translateToQuery = function (filter) {
|
||||
if (!filter) return;
|
||||
|
||||
if (filter.query) {
|
||||
return filter.query;
|
||||
}
|
||||
|
||||
return filter;
|
||||
};
|
||||
|
||||
// re-write filters within filter aggregations
|
||||
(function recurse(aggBranch) {
|
||||
if (!aggBranch) return;
|
||||
Object.keys(aggBranch).forEach(function (id) {
|
||||
const agg = aggBranch[id];
|
||||
|
||||
if (agg.filters) {
|
||||
// translate filters aggregations
|
||||
const filters = agg.filters.filters;
|
||||
|
||||
Object.keys(filters).forEach(function (filterId) {
|
||||
filters[filterId] = translateToQuery(filters[filterId]);
|
||||
});
|
||||
}
|
||||
|
||||
recurse(agg.aggs || agg.aggregations);
|
||||
});
|
||||
}(flatState.body.aggs || flatState.body.aggregations));
|
||||
}
|
||||
|
||||
return flatState;
|
||||
});
|
||||
}
|
||||
];
|
||||
|
||||
SearchSource.prototype.clone = function () {
|
||||
const clone = new SearchSource(this.toString());
|
||||
// when serializing the internal state with .toString() we lose the internal classes used in the index
|
||||
// pattern, so we have to set it again to workaround this behavior
|
||||
clone.set('index', this.get('index'));
|
||||
clone.inherits(this.getParent());
|
||||
return clone;
|
||||
};
|
||||
|
||||
SearchSource.prototype.getSearchRequestBody = async function () {
|
||||
const searchRequest = await this._flatten();
|
||||
return searchRequest.body;
|
||||
};
|
||||
}
|
||||
|
||||
return SearchSource;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue