Tribe node support (#9132)

* Adds support for Tribe nodes

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* @spalger review feedback

* Close create{Admin,Data}Cluster handles closing the connection
* Remove callAsKibanaUser argument from tests
* ClientLogger uses ES6 properties for tags and logQueries
* Ensure were destructuring cluster to access callAsKibanaUser

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* [tribe] Use class syntax on new data sources

* [tribe] Use includes instead of indexOf in call_client

* [tribe] DocRequest --> AbstractDocRequest

* [tribe] Fix AbstractDoc test rename

* Removes factory objects and adds addClientPlugin to Cluster (#9467)

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Resolves eslint error

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Use properties on the instance instead of class properties

Class properties are still in the very eary stages and not widely supported.

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* [tribe] Remove disabled dev tools app, do not bundle console when tribe is enabled

* [tribe] Use destructuring, don't reassign args

* [tribe] Use class syntax for client request wrapper

* [tribe] callAsKibanaUser -> callWithInternalUser

* [tribe] Remove clients from module context, service is a singleton

* [tribe] Use instance property shorthand for admin and data DocRequests

* Removes questions

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Fixes typo in tests

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Correctly names test case

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Revert "Use properties on the instance instead of class properties"

This reverts commit ebd06ae591.

* Adds tests for create_{admin,data}_cluster

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Persists clusters to server

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* [tribe] Move cluster config requests to distinct getters

* Adds getClient and removes addClientPlugin

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Expose createClient, consolidate config parsing

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Removes createClients from Cluster

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Prevent status change from red to red

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Updates esvm:tribe ports to be consistant with dev

9200 is admin
9201:9202 are both data clusters
9203 is a tribe node connecting to both data clusters

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* [tribe] Get ssl.ca from serverConfig

* [tribe/esvm] Remove plugin configuration

* Removes unused variable

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* [tribe] Named exports for creating clusters

* [tribe] Named exports for client logger, cluster

* [tribe] Named exports for health check

* Remove invalid comment

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* [tribe] Comment explaining difference between admin and data browser clients

* Rename ES checks to be consistant with functionality

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Organize NOOP functions

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Removing function comments

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>

* Explicitly check for presence of url in tribe

Signed-off-by: Tyler Smalley <tyler.smalley@elastic.co>
This commit is contained in:
Tyler Smalley 2016-12-30 07:46:55 -08:00 committed by Jonathan Budzenski
parent 5e0ae4c360
commit 024c81697e
73 changed files with 1545 additions and 672 deletions

View file

@ -1,8 +1,13 @@
import { trim, trimRight } from 'lodash';
import { trim, trimRight, bindKey, get } from 'lodash';
import { methodNotAllowed } from 'boom';
import healthCheck from './lib/health_check';
import exposeClient from './lib/expose_client';
import { createDataCluster } from './lib/create_data_cluster';
import { createAdminCluster } from './lib/create_admin_cluster';
import { clientLogger } from './lib/client_logger';
import { createClusters } from './lib/create_clusters';
import filterHeaders from './lib/filter_headers';
import createProxy, { createPath } from './lib/create_proxy';
const DEFAULT_REQUEST_HEADERS = [ 'authorization' ];
@ -26,6 +31,7 @@ module.exports = function ({ Plugin }) {
customHeaders: object().default({}),
pingTimeout: number().default(ref('requestTimeout')),
startupTimeout: number().default(5000),
logQueries: boolean().default(false),
ssl: object({
verify: boolean().default(true),
ca: array().single().items(string()),
@ -33,6 +39,29 @@ module.exports = function ({ Plugin }) {
key: string()
}).default(),
apiVersion: Joi.string().default('master'),
healthCheck: object({
delay: number().default(2500)
}).default(),
tribe: object({
url: string().uri({ scheme: ['http', 'https'] }),
preserveHost: boolean().default(true),
username: string(),
password: string(),
shardTimeout: number().default(0),
requestTimeout: number().default(30000),
requestHeadersWhitelist: array().items().single().default(DEFAULT_REQUEST_HEADERS),
customHeaders: object().default({}),
pingTimeout: number().default(ref('requestTimeout')),
startupTimeout: number().default(5000),
logQueries: boolean().default(false),
ssl: object({
verify: boolean().default(true),
ca: array().single().items(string()),
cert: string(),
key: string()
}).default(),
apiVersion: Joi.string().default('master'),
}).default()
}).default();
},
@ -42,15 +71,24 @@ module.exports = function ({ Plugin }) {
esRequestTimeout: options.requestTimeout,
esShardTimeout: options.shardTimeout,
esApiVersion: options.apiVersion,
esDataIsTribe: get(options, 'tribe.url') ? true : false,
};
}
},
init(server, options) {
const kibanaIndex = server.config().get('kibana.index');
const clusters = createClusters(server);
server.expose('getCluster', clusters.get);
server.expose('createCluster', clusters.create);
server.expose('filterHeaders', filterHeaders);
server.expose('ElasticsearchClientLogging', clientLogger(server));
createDataCluster(server);
createAdminCluster(server);
// Expose the client to the server
exposeClient(server);
createProxy(server, 'GET', '/{paths*}');
createProxy(server, 'POST', '/_mget');
createProxy(server, 'POST', '/{index}/_search');
@ -69,7 +107,7 @@ module.exports = function ({ Plugin }) {
function noDirectIndex({ path }, reply) {
const requestPath = trimRight(trim(path), '/');
const matchPath = createPath(kibanaIndex);
const matchPath = createPath('/elasticsearch', kibanaIndex);
if (requestPath === matchPath) {
return reply(methodNotAllowed('You cannot modify the primary kibana index through this interface.'));

View file

@ -0,0 +1,134 @@
import expect from 'expect.js';
import { Cluster } from '../cluster';
import sinon from 'sinon';
import { errors as esErrors } from 'elasticsearch';
import { set, partial, cloneDeep } from 'lodash';
import Boom from 'boom';
describe('plugins/elasticsearch', function () {
describe('cluster', function () {
let cluster;
const config = {
url: 'http://localhost:9200',
ssl: { verify: false },
requestHeadersWhitelist: [ 'authorization' ]
};
beforeEach(() => {
cluster = new Cluster(config);
});
it('persists the config', () => {
expect(cluster._config).to.eql(config);
});
it('exposes error definitions', () => {
expect(cluster.errors).to.be(esErrors);
});
it('closes the clients', () => {
cluster._client.close = sinon.spy();
cluster._noAuthClient.close = sinon.spy();
cluster.close();
sinon.assert.calledOnce(cluster._client.close);
sinon.assert.calledOnce(cluster._noAuthClient.close);
});
it('protects the config from changes', () => {
const localRequestHeadersWhitelist = cluster.getRequestHeadersWhitelist();
expect(localRequestHeadersWhitelist.length).to.not.equal(config.requestHeadersWhitelist);
});
describe('callWithInternalUser', () => {
let client;
beforeEach(() => {
client = cluster._client = sinon.stub();
set(client, 'nodes.info', sinon.stub().returns(Promise.resolve()));
});
it('should return a function', () => {
expect(cluster.callWithInternalUser).to.be.a('function');
});
it('throws an error for an invalid endpoint', () => {
const fn = partial(cluster.callWithInternalUser, 'foo');
expect(fn).to.throwException(/called with an invalid endpoint: foo/);
});
it('calls the client with params', () => {
const params = { foo: 'Foo' };
cluster.callWithInternalUser('nodes.info', params);
sinon.assert.calledOnce(client.nodes.info);
expect(client.nodes.info.getCall(0).args[0]).to.eql(params);
});
});
describe('callWithRequest', () => {
let client;
beforeEach(() => {
client = cluster._noAuthClient = sinon.stub();
set(client, 'nodes.info', sinon.stub().returns(Promise.resolve()));
});
it('should return a function', () => {
expect(cluster.callWithRequest).to.be.a('function');
});
it('throws an error for an invalid endpoint', () => {
const fn = partial(cluster.callWithRequest, {}, 'foo');
expect(fn).to.throwException(/called with an invalid endpoint: foo/);
});
it('calls the client with params', () => {
const params = { foo: 'Foo' };
cluster.callWithRequest({}, 'nodes.info', params);
sinon.assert.calledOnce(client.nodes.info);
expect(client.nodes.info.getCall(0).args[0]).to.eql(params);
});
it('passes only whitelisted headers', () => {
const headers = { authorization: 'Basic TEST' };
const request = { headers: Object.assign({}, headers, { foo: 'Foo' }) };
cluster.callWithRequest(request, 'nodes.info');
sinon.assert.calledOnce(client.nodes.info);
expect(client.nodes.info.getCall(0).args[0]).to.eql({
headers: headers
});
});
describe('wrap401Errors', () => {
let handler;
const error = new Error('Authentication required');
error.statusCode = 401;
beforeEach(() => {
handler = sinon.stub();
});
it('ensures WWW-Authenticate header', async () => {
set(client, 'mock.401', sinon.stub().returns(Promise.reject(error)));
await cluster.callWithRequest({}, 'mock.401', {}, { wrap401Errors: true }).catch(handler);
sinon.assert.calledOnce(handler);
expect(handler.getCall(0).args[0].output.headers['WWW-Authenticate']).to.eql('Basic realm="Authorization Required"');
});
it('persists WWW-Authenticate header', async () => {
set(error, 'body.error.header[WWW-Authenticate]', 'Basic realm="Test"');
set(client, 'mock.401', sinon.stub().returns(Promise.reject(error)));
await cluster.callWithRequest({}, 'mock.401', {}, { wrap401Errors: true }).catch(handler);
sinon.assert.calledOnce(handler);
expect(handler.getCall(0).args[0].output.headers['WWW-Authenticate']).to.eql('Basic realm="Test"');
});
});
});
});
});

View file

@ -0,0 +1,66 @@
import expect from 'expect.js';
import sinon from 'sinon';
import { bindKey, set, get, partial } from 'lodash';
import { createAdminCluster } from '../create_admin_cluster';
describe('plugins/elasticsearch', function () {
describe('create_admin_cluster', function () {
let cluster;
let server;
beforeEach(() => {
const config = {
elasticsearch: {
url: 'http://localhost:9200',
logQueries: true
}
};
server = sinon.spy();
cluster = {
close: sinon.spy()
};
set(server, 'plugins.elasticsearch.createCluster', sinon.mock().returns(cluster));
set(server, 'on', sinon.spy());
server.config = () => {
return { get: partial(get, config) };
};
createAdminCluster(server);
});
it('creates the cluster', () => {
const { createCluster } = server.plugins.elasticsearch;
sinon.assert.calledOnce(createCluster);
expect(createCluster.getCall(0).args[0]).to.eql('admin');
expect(createCluster.getCall(0).args[1].url).to.eql('http://localhost:9200');
});
it('sets client logger for cluster options', () => {
const { createCluster } = server.plugins.elasticsearch;
const firstCall = createCluster.getCall(0);
const Log = firstCall.args[1].log;
const logger = new Log;
sinon.assert.calledOnce(createCluster);
expect(firstCall.args[0]).to.eql('admin');
expect(firstCall.args[1].url).to.eql('http://localhost:9200');
expect(logger.tags).to.eql(['admin']);
expect(logger.logQueries).to.eql(true);
});
it('close cluster of server close', () => {
const clusterClose = server.on.getCall(0).args[1];
clusterClose();
sinon.assert.calledOnce(cluster.close);
sinon.assert.calledOnce(server.on);
expect(server.on.getCall(0).args[0]).to.eql('close');
});
});
});

View file

@ -0,0 +1,50 @@
import expect from 'expect.js';
import { createClusters } from '../create_clusters';
import { Cluster } from '../cluster';
import sinon from 'sinon';
import { partial } from 'lodash';
describe('plugins/elasticsearch', function () {
describe('createClusters', function () {
let clusters;
let server;
beforeEach(() => {
server = {
plugins: {
elasticsearch: {}
},
expose: sinon.mock()
};
clusters = createClusters(server);
});
describe('createCluster', () => {
let cluster;
const config = {
url: 'http://localhost:9200',
ssl: {
verify: false
}
};
beforeEach(() => {
cluster = clusters.create('admin', config);
});
it('returns a cluster', () => {
expect(cluster).to.be.a(Cluster);
});
it('persists the cluster', () => {
expect(clusters.get('admin')).to.be.a(Cluster);
});
it('throws if cluster already exists', () => {
const fn = partial(clusters.create, 'admin', config);
expect(fn).to.throwException(/cluster \'admin\' already exists/);
});
});
});
});

View file

@ -0,0 +1,85 @@
import expect from 'expect.js';
import sinon from 'sinon';
import { bindKey, set, get, partial } from 'lodash';
import { createDataCluster } from '../create_data_cluster';
describe('plugins/elasticsearch', function () {
describe('create_data_cluster', function () {
let cluster;
let server;
let config;
beforeEach(() => {
config = {
elasticsearch: {
url: 'http://localhost:9200',
logQueries: true
}
};
server = sinon.spy();
cluster = {
close: sinon.spy()
};
set(server, 'plugins.elasticsearch.createCluster', sinon.mock().returns(cluster));
set(server, 'on', sinon.spy());
server.config = () => {
return { get: partial(get, config) };
};
});
it('creates the cluster with elasticsearch config', () => {
createDataCluster(server);
const { createCluster } = server.plugins.elasticsearch;
sinon.assert.calledOnce(createCluster);
expect(createCluster.getCall(0).args[0]).to.eql('data');
expect(createCluster.getCall(0).args[1].url).to.eql('http://localhost:9200');
});
it('creates the cluster with elasticsearch.tribe config', () => {
config.elasticsearch.tribe = {
url: 'http://localhost:9201'
};
createDataCluster(server);
const { createCluster } = server.plugins.elasticsearch;
sinon.assert.calledOnce(createCluster);
expect(createCluster.getCall(0).args[0]).to.eql('data');
expect(createCluster.getCall(0).args[1].url).to.eql('http://localhost:9201');
});
it('sets client logger for cluster options', () => {
createDataCluster(server);
const { createCluster } = server.plugins.elasticsearch;
const firstCall = createCluster.getCall(0);
const Log = firstCall.args[1].log;
const logger = new Log;
sinon.assert.calledOnce(createCluster);
expect(firstCall.args[0]).to.eql('data');
expect(firstCall.args[1].url).to.eql('http://localhost:9200');
expect(logger.tags).to.eql(['data']);
expect(logger.logQueries).to.eql(true);
});
it('close cluster of server close', () => {
createDataCluster(server);
const clusterClose = server.on.getCall(0).args[1];
clusterClose();
sinon.assert.calledOnce(cluster.close);
sinon.assert.calledOnce(server.on);
expect(server.on.getCall(0).args[0]).to.eql('close');
});
});
});

View file

@ -9,39 +9,45 @@ describe('plugins/elasticsearch', function () {
describe('lib/create_kibana_index', function () {
let server;
let client;
let callWithInternalUser;
let cluster;
beforeEach(function () {
server = {};
client = {};
let config = { kibana: { index: '.my-kibana' } };
const get = sinon.stub();
get.returns(config);
get.withArgs('kibana.index').returns(config.kibana.index);
config = function () { return { get: get }; };
_.set(client, 'indices.create', sinon.stub());
_.set(client, 'cluster.health', sinon.stub());
_.set(server, 'plugins.elasticsearch.client', client);
_.set(server, 'plugins.elasticsearch', {});
_.set(server, 'config', config);
callWithInternalUser = sinon.stub();
cluster = { callWithInternalUser: callWithInternalUser };
server.plugins.elasticsearch.getCluster = sinon.stub().withArgs('admin').returns(cluster);
});
describe('successful requests', function () {
beforeEach(function () {
client.indices.create.returns(Promise.resolve());
client.cluster.health.returns(Promise.resolve());
callWithInternalUser.withArgs('indices.create', sinon.match.any).returns(Promise.resolve());
callWithInternalUser.withArgs('cluster.health', sinon.match.any).returns(Promise.resolve());
});
it('should check cluster.health upon successful index creation', function () {
const fn = createKibanaIndex(server);
return fn.then(function () {
sinon.assert.calledOnce(client.cluster.health);
sinon.assert.calledOnce(callWithInternalUser.withArgs('cluster.health', sinon.match.any));
});
});
it('should be created with mappings for config.buildNum', function () {
const fn = createKibanaIndex(server);
return fn.then(function () {
const params = client.indices.create.args[0][0];
const params = callWithInternalUser.args[0][1];
expect(params)
.to.have.property('body');
expect(params.body)
@ -60,7 +66,7 @@ describe('plugins/elasticsearch', function () {
it('should be created with 1 shard and default replica', function () {
const fn = createKibanaIndex(server);
return fn.then(function () {
const params = client.indices.create.args[0][0];
const params = callWithInternalUser.args[0][1];
expect(params)
.to.have.property('body');
expect(params.body)
@ -75,19 +81,17 @@ describe('plugins/elasticsearch', function () {
it('should be created with index name set in the config', function () {
const fn = createKibanaIndex(server);
return fn.then(function () {
const params = client.indices.create.args[0][0];
const params = callWithInternalUser.args[0][1];
expect(params)
.to.have.property('index', '.my-kibana');
});
});
});
describe('failure requests', function () {
it('should reject with an Error', function () {
const error = new Error('Oops!');
client.indices.create.returns(Promise.reject(error));
callWithInternalUser.withArgs('indices.create', sinon.match.any).returns(Promise.reject(error));
const fn = createKibanaIndex(server);
return fn.catch(function (err) {
expect(err).to.be.a(Error);
@ -96,24 +100,21 @@ describe('plugins/elasticsearch', function () {
it('should reject with an error if index creation fails', function () {
const error = new Error('Oops!');
client.indices.create.returns(Promise.reject(error));
callWithInternalUser.withArgs('indices.create', sinon.match.any).returns(Promise.reject(error));
const fn = createKibanaIndex(server);
return fn.catch(function (err) {
expect(err.message).to.be('Unable to create Kibana index ".my-kibana"');
});
});
it('should reject with an error if health check fails', function () {
const error = new Error('Oops!');
client.indices.create.returns(Promise.resolve());
client.cluster.health.returns(Promise.reject(error));
callWithInternalUser.withArgs('indices.create', sinon.match.any).returns(Promise.resolve());
callWithInternalUser.withArgs('cluster.health', sinon.match.any).returns(Promise.reject(new Error()));
const fn = createKibanaIndex(server);
return fn.catch(function (err) {
expect(err.message).to.be('Waiting for Kibana index ".my-kibana" to come online failed.');
});
});
});
});
});

View file

@ -3,20 +3,21 @@ import createProxy from '../create_proxy';
describe('plugins/elasticsearch', function () {
describe('lib/create_proxy', function () {
describe('#createPath', function () {
it('prepends /elasticsearch to route', function () {
const path = createProxy.createPath('/wat');
expect(path).to.equal('/elasticsearch/wat');
const path = createProxy.createPath('/foobar', '/wat');
expect(path).to.equal('/foobar/wat');
});
context('when arg does not start with a slash', function () {
it('adds slash anyway', function () {
const path = createProxy.createPath('wat');
expect(path).to.equal('/elasticsearch/wat');
});
it('ensures leading slash for prefix', function () {
const path = createProxy.createPath('foobar', '/wat');
expect(path).to.equal('/foobar/wat');
});
it('ensures leading slash for path', function () {
const path = createProxy.createPath('/foobar', 'wat');
expect(path).to.equal('/foobar/wat');
});
});
});
});

View file

@ -5,23 +5,22 @@ import expect from 'expect.js';
import url from 'url';
import serverConfig from '../../../../../test/server_config';
import checkEsVersion from '../check_es_version';
import { ensureEsVersion } from '../ensure_es_version';
describe('plugins/elasticsearch', () => {
describe('lib/check_es_version', () => {
describe('lib/ensure_es_version', () => {
const KIBANA_VERSION = '5.1.0';
let server;
let plugin;
let callWithInternalUser;
beforeEach(function () {
server = {
log: sinon.stub(),
plugins: {
elasticsearch: {
client: {
nodes: {}
},
getCluster: sinon.stub().withArgs('admin').returns({ callWithInternalUser: sinon.stub() }),
status: {
red: sinon.stub()
},
@ -52,25 +51,27 @@ describe('plugins/elasticsearch', () => {
nodes[name] = node;
}
const client = server.plugins.elasticsearch.client;
client.nodes.info = sinon.stub().returns(Promise.resolve({ nodes: nodes }));
const cluster = server.plugins.elasticsearch.getCluster('admin');
cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any).returns(Promise.resolve({ nodes: nodes }));
callWithInternalUser = cluster.callWithInternalUser;
}
function setNodeWithoutHTTP(version) {
const nodes = { 'node-without-http': { version, ip: 'ip' } };
const client = server.plugins.elasticsearch.client;
client.nodes.info = sinon.stub().returns(Promise.resolve({ nodes: nodes }));
const cluster = server.plugins.elasticsearch.getCluster('admin');
cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any).returns(Promise.resolve({ nodes: nodes }));
callWithInternalUser = cluster.callWithInternalUser;
}
it('returns true with single a node that matches', async () => {
setNodes('5.1.0');
const result = await checkEsVersion(server, KIBANA_VERSION);
const result = await ensureEsVersion(server, KIBANA_VERSION);
expect(result).to.be(true);
});
it('returns true with multiple nodes that satisfy', async () => {
setNodes('5.1.0', '5.2.0', '5.1.1-Beta1');
const result = await checkEsVersion(server, KIBANA_VERSION);
const result = await ensureEsVersion(server, KIBANA_VERSION);
expect(result).to.be(true);
});
@ -78,7 +79,7 @@ describe('plugins/elasticsearch', () => {
// 5.0.0 ES is too old to work with a 5.1.0 version of Kibana.
setNodes('5.1.0', '5.2.0', '5.0.0');
try {
await checkEsVersion(server, KIBANA_VERSION);
await ensureEsVersion(server, KIBANA_VERSION);
} catch (e) {
expect(e).to.be.a(Error);
}
@ -91,7 +92,7 @@ describe('plugins/elasticsearch', () => {
{ version: '5.0.0', attributes: { client: 'true' } },
);
try {
await checkEsVersion(server, KIBANA_VERSION);
await ensureEsVersion(server, KIBANA_VERSION);
} catch (e) {
expect(e).to.be.a(Error);
}
@ -99,7 +100,7 @@ describe('plugins/elasticsearch', () => {
it('warns if a node is only off by a patch version', async () => {
setNodes('5.1.1');
await checkEsVersion(server, KIBANA_VERSION);
await ensureEsVersion(server, KIBANA_VERSION);
sinon.assert.callCount(server.log, 2);
expect(server.log.getCall(0).args[0]).to.contain('debug');
expect(server.log.getCall(1).args[0]).to.contain('warning');
@ -107,7 +108,7 @@ describe('plugins/elasticsearch', () => {
it('warns if a node is off by a patch version and without http publish address', async () => {
setNodeWithoutHTTP('5.1.1');
await checkEsVersion(server, KIBANA_VERSION);
await ensureEsVersion(server, KIBANA_VERSION);
sinon.assert.callCount(server.log, 2);
expect(server.log.getCall(0).args[0]).to.contain('debug');
expect(server.log.getCall(1).args[0]).to.contain('warning');
@ -116,7 +117,7 @@ describe('plugins/elasticsearch', () => {
it('errors if a node incompatible and without http publish address', async () => {
setNodeWithoutHTTP('6.1.1');
try {
await checkEsVersion(server, KIBANA_VERSION);
await ensureEsVersion(server, KIBANA_VERSION);
} catch (e) {
expect(e.message).to.contain('incompatible nodes');
expect(e).to.be.a(Error);
@ -126,12 +127,12 @@ describe('plugins/elasticsearch', () => {
it('only warns once per node list', async () => {
setNodes('5.1.1');
await checkEsVersion(server, KIBANA_VERSION);
await ensureEsVersion(server, KIBANA_VERSION);
sinon.assert.callCount(server.log, 2);
expect(server.log.getCall(0).args[0]).to.contain('debug');
expect(server.log.getCall(1).args[0]).to.contain('warning');
await checkEsVersion(server, KIBANA_VERSION);
await ensureEsVersion(server, KIBANA_VERSION);
sinon.assert.callCount(server.log, 3);
expect(server.log.getCall(2).args[0]).to.contain('debug');
});
@ -139,13 +140,13 @@ describe('plugins/elasticsearch', () => {
it('warns again if the node list changes', async () => {
setNodes('5.1.1');
await checkEsVersion(server, KIBANA_VERSION);
await ensureEsVersion(server, KIBANA_VERSION);
sinon.assert.callCount(server.log, 2);
expect(server.log.getCall(0).args[0]).to.contain('debug');
expect(server.log.getCall(1).args[0]).to.contain('warning');
setNodes('5.1.2');
await checkEsVersion(server, KIBANA_VERSION);
await ensureEsVersion(server, KIBANA_VERSION);
sinon.assert.callCount(server.log, 4);
expect(server.log.getCall(2).args[0]).to.contain('debug');
expect(server.log.getCall(3).args[0]).to.contain('warning');

View file

@ -0,0 +1,48 @@
import expect from 'expect.js';
import { noop } from 'lodash';
import sinon from 'sinon';
import { ensureNotTribe } from '../ensure_not_tribe';
describe('plugins/elasticsearch ensureNotTribe', () => {
const sandbox = sinon.sandbox.create();
afterEach(() => sandbox.restore());
const stubcallWithInternalUser = (nodesInfoResp = { nodes: {} }) => {
return sinon.stub().withArgs(
'nodes.info',
sinon.match.any
).returns(
Promise.resolve(nodesInfoResp)
);
};
it('fetches the local node stats of the node that the elasticsearch client is connected to', async () => {
const callWithInternalUser = stubcallWithInternalUser();
await ensureNotTribe(callWithInternalUser);
sinon.assert.calledOnce(callWithInternalUser);
});
it('throws a SetupError when the node info contains tribe settings', async () => {
const nodeInfo = {
nodes: {
__nodeId__: {
settings: {
tribe: {
t1: {},
t2: {},
}
}
}
}
};
try {
await ensureNotTribe(stubcallWithInternalUser(nodeInfo));
throw new Error('ensureNotTribe() should have thrown');
} catch (err) {
expect(err).to.be.a(Error);
}
});
});

View file

@ -13,10 +13,12 @@ const esPort = serverConfig.servers.elasticsearch.port;
const esUrl = url.format(serverConfig.servers.elasticsearch);
describe('plugins/elasticsearch', () => {
describe('lib/health_check', () => {
describe('lib/health_check', function () {
this.timeout(3000);
let health;
let plugin;
let client;
let cluster;
beforeEach(() => {
const COMPATIBLE_VERSION_NUMBER = '5.0.0';
@ -34,19 +36,11 @@ describe('plugins/elasticsearch', () => {
}
};
// set up the elasticsearch client stub
client = {
cluster: { health: sinon.stub() },
indices: { create: sinon.stub() },
nodes: { info: sinon.stub() },
ping: sinon.stub(),
create: sinon.stub(),
index: sinon.stub().returns(Promise.resolve()),
get: sinon.stub().returns(Promise.resolve({ found: false })),
search: sinon.stub().returns(Promise.resolve({ hits: { hits: [] } })),
};
client.nodes.info.returns(Promise.resolve({
cluster = { callWithInternalUser: sinon.stub() };
cluster.callWithInternalUser.withArgs('index', sinon.match.any).returns(Promise.resolve());
cluster.callWithInternalUser.withArgs('get', sinon.match.any).returns(Promise.resolve({ found: false }));
cluster.callWithInternalUser.withArgs('search', sinon.match.any).returns(Promise.resolve({ hits: { hits: [] } }));
cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any).returns(Promise.resolve({
nodes: {
'node-01': {
version: COMPATIBLE_VERSION_NUMBER,
@ -68,7 +62,11 @@ describe('plugins/elasticsearch', () => {
log: sinon.stub(),
info: { port: 5601 },
config: function () { return { get, set }; },
plugins: { elasticsearch: { client } }
plugins: {
elasticsearch: {
getCluster: sinon.stub().returns(cluster)
}
}
};
health = healthCheck(plugin, server);
@ -79,44 +77,59 @@ describe('plugins/elasticsearch', () => {
});
it('should set the cluster green if everything is ready', function () {
client.ping.returns(Promise.resolve());
client.cluster.health.returns(Promise.resolve({ timed_out: false, status: 'green' }));
cluster.callWithInternalUser.withArgs('ping').returns(Promise.resolve());
cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any).returns(
Promise.resolve({ timed_out: false, status: 'green' })
);
return health.run()
.then(function () {
sinon.assert.calledOnce(plugin.status.yellow);
expect(plugin.status.yellow.args[0][0]).to.be('Waiting for Elasticsearch');
sinon.assert.calledOnce(client.ping);
sinon.assert.calledOnce(client.nodes.info);
sinon.assert.calledOnce(client.cluster.health);
sinon.assert.calledOnce(cluster.callWithInternalUser.withArgs('ping'));
sinon.assert.calledTwice(cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any));
sinon.assert.calledOnce(cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any));
sinon.assert.calledOnce(plugin.status.green);
expect(plugin.status.green.args[0][0]).to.be('Kibana index ready');
});
});
it('should set the cluster red if the ping fails, then to green', function () {
client.ping.onCall(0).returns(Promise.reject(new NoConnections()));
client.ping.onCall(1).returns(Promise.resolve());
client.cluster.health.returns(Promise.resolve({ timed_out: false, status: 'green' }));
const ping = cluster.callWithInternalUser.withArgs('ping');
ping.onCall(0).returns(Promise.reject(new NoConnections()));
ping.onCall(1).returns(Promise.resolve());
cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any).returns(
Promise.resolve({ timed_out: false, status: 'green' })
);
return health.run()
.then(function () {
sinon.assert.calledOnce(plugin.status.yellow);
expect(plugin.status.yellow.args[0][0]).to.be('Waiting for Elasticsearch');
sinon.assert.calledOnce(plugin.status.red);
expect(plugin.status.red.args[0][0]).to.be(
`Unable to connect to Elasticsearch at ${esUrl}.`
);
sinon.assert.calledTwice(client.ping);
sinon.assert.calledOnce(client.nodes.info);
sinon.assert.calledOnce(client.cluster.health);
sinon.assert.calledTwice(ping);
sinon.assert.calledTwice(cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any));
sinon.assert.calledOnce(cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any));
sinon.assert.calledOnce(plugin.status.green);
expect(plugin.status.green.args[0][0]).to.be('Kibana index ready');
});
});
it('should set the cluster red if the health check status is red, then to green', function () {
client.ping.returns(Promise.resolve());
client.cluster.health.onCall(0).returns(Promise.resolve({ timed_out: false, status: 'red' }));
client.cluster.health.onCall(1).returns(Promise.resolve({ timed_out: false, status: 'green' }));
cluster.callWithInternalUser.withArgs('ping').returns(Promise.resolve());
const clusterHealth = cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any);
clusterHealth.onCall(0).returns(Promise.resolve({ timed_out: false, status: 'red' }));
clusterHealth.onCall(1).returns(Promise.resolve({ timed_out: false, status: 'green' }));
return health.run()
.then(function () {
sinon.assert.calledOnce(plugin.status.yellow);
@ -125,39 +138,45 @@ describe('plugins/elasticsearch', () => {
expect(plugin.status.red.args[0][0]).to.be(
'Elasticsearch is still initializing the kibana index.'
);
sinon.assert.calledOnce(client.ping);
sinon.assert.calledOnce(client.nodes.info);
sinon.assert.calledTwice(client.cluster.health);
sinon.assert.calledOnce(cluster.callWithInternalUser.withArgs('ping'));
sinon.assert.calledTwice(cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any));
sinon.assert.calledTwice(cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any));
sinon.assert.calledOnce(plugin.status.green);
expect(plugin.status.green.args[0][0]).to.be('Kibana index ready');
});
});
it('should set the cluster yellow if the health check timed_out and create index', function () {
client.ping.returns(Promise.resolve());
client.cluster.health.onCall(0).returns(Promise.resolve({ timed_out: true, status: 'red' }));
client.cluster.health.onCall(1).returns(Promise.resolve({ timed_out: false, status: 'green' }));
client.indices.create.returns(Promise.resolve());
cluster.callWithInternalUser.withArgs('ping').returns(Promise.resolve());
const clusterHealth = cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any);
clusterHealth.onCall(0).returns(Promise.resolve({ timed_out: true, status: 'red' }));
clusterHealth.onCall(1).returns(Promise.resolve({ timed_out: false, status: 'green' }));
cluster.callWithInternalUser.withArgs('indices.create', sinon.match.any).returns(Promise.resolve());
return health.run()
.then(function () {
sinon.assert.calledTwice(plugin.status.yellow);
expect(plugin.status.yellow.args[0][0]).to.be('Waiting for Elasticsearch');
expect(plugin.status.yellow.args[1][0]).to.be('No existing Kibana index found');
sinon.assert.calledOnce(client.ping);
sinon.assert.calledOnce(client.indices.create);
sinon.assert.calledOnce(client.nodes.info);
sinon.assert.calledTwice(client.cluster.health);
sinon.assert.calledOnce(cluster.callWithInternalUser.withArgs('ping'));
sinon.assert.calledOnce(cluster.callWithInternalUser.withArgs('indices.create', sinon.match.any));
sinon.assert.calledTwice(cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any));
sinon.assert.calledTwice(clusterHealth);
});
});
describe('#waitUntilReady', function () {
it('polls health until index is ready', function () {
client.cluster.health.onCall(0).returns(Promise.resolve({ timed_out: true })); // no index
client.cluster.health.onCall(1).returns(Promise.resolve({ status: 'red' })); // initializing
client.cluster.health.onCall(2).returns(Promise.resolve({ status: 'green' })); // ready
const clusterHealth = cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any);
clusterHealth.onCall(0).returns(Promise.resolve({ timed_out: true }));
clusterHealth.onCall(1).returns(Promise.resolve({ status: 'red' }));
clusterHealth.onCall(2).returns(Promise.resolve({ status: 'green' }));
return health.waitUntilReady().then(function () {
sinon.assert.calledThrice(client.cluster.health);
sinon.assert.calledThrice(clusterHealth);
});
});
});

View file

@ -2,20 +2,25 @@ import expect from 'expect.js';
import mapUri from '../map_uri';
import { get, defaults } from 'lodash';
import sinon from 'sinon';
import url from 'url';
describe('plugins/elasticsearch', function () {
describe('lib/map_uri', function () {
let request;
function stubServer(settings) {
const values = defaults(settings || {}, {
'elasticsearch.url': 'http://localhost:9200',
'elasticsearch.requestHeadersWhitelist': ['authorization'],
'elasticsearch.customHeaders': {}
function stubCluster(settings) {
settings = defaults(settings || {}, {
url: 'http://localhost:9200',
requestHeadersWhitelist: ['authorization'],
customHeaders: {}
});
const config = { get: (key, def) => get(values, key, def) };
return { config: () => config };
return {
getUrl: () => settings.url,
getCustomHeaders: () => settings.customHeaders,
getRequestHeadersWhitelist: () => settings.requestHeadersWhitelist
};
}
beforeEach(function () {
@ -34,34 +39,34 @@ describe('plugins/elasticsearch', function () {
});
it('sends custom headers if set', function () {
const server = stubServer({
'elasticsearch.customHeaders': { foo: 'bar' }
});
const settings = {
customHeaders: { foo: 'bar' }
};
mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) {
mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) {
expect(err).to.be(null);
expect(upstreamHeaders).to.have.property('foo', 'bar');
});
});
it('sends configured custom headers even if the same named header exists in request', function () {
const server = stubServer({
'elasticsearch.requestHeadersWhitelist': ['x-my-custom-header'],
'elasticsearch.customHeaders': { 'x-my-custom-header': 'asconfigured' }
});
const settings = {
requestHeadersWhitelist: ['x-my-custom-header'],
customHeaders: { 'x-my-custom-header': 'asconfigured' }
};
mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) {
mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) {
expect(err).to.be(null);
expect(upstreamHeaders).to.have.property('x-my-custom-header', 'asconfigured');
});
});
it('only proxies the whitelisted request headers', function () {
const server = stubServer({
'elasticsearch.requestHeadersWhitelist': ['x-my-custom-HEADER', 'Authorization'],
});
const settings = {
requestHeadersWhitelist: ['x-my-custom-HEADER', 'Authorization'],
};
mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) {
mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) {
expect(err).to.be(null);
expect(upstreamHeaders).to.have.property('authorization');
expect(upstreamHeaders).to.have.property('x-my-custom-header');
@ -70,24 +75,24 @@ describe('plugins/elasticsearch', function () {
});
it('proxies no headers if whitelist is set to []', function () {
const server = stubServer({
'elasticsearch.requestHeadersWhitelist': [],
});
const settings = {
requestHeadersWhitelist: [],
};
mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) {
mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) {
expect(err).to.be(null);
expect(Object.keys(upstreamHeaders).length).to.be(0);
});
});
it('proxies no headers if whitelist is set to no value', function () {
const server = stubServer({
const settings = {
// joi converts `elasticsearch.requestHeadersWhitelist: null` into
// an array with a null inside because of the `array().single()` rule.
'elasticsearch.requestHeadersWhitelist': [ null ],
});
requestHeadersWhitelist: [ null ],
};
mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) {
mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) {
expect(err).to.be(null);
expect(Object.keys(upstreamHeaders).length).to.be(0);
});
@ -95,9 +100,8 @@ describe('plugins/elasticsearch', function () {
it('strips the /elasticsearch prefix from the path', () => {
request.path = '/elasticsearch/es/path';
const server = stubServer();
mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) {
mapUri(stubCluster(), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) {
expect(err).to.be(null);
expect(upstreamUri).to.be('http://localhost:9200/es/path');
});
@ -105,9 +109,9 @@ describe('plugins/elasticsearch', function () {
it('extends the es.url path', function () {
request.path = '/elasticsearch/index/type';
const server = stubServer({ 'elasticsearch.url': 'https://localhost:9200/base-path' });
const settings = { url: 'https://localhost:9200/base-path' };
mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) {
mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) {
expect(err).to.be(null);
expect(upstreamUri).to.be('https://localhost:9200/base-path/index/type');
});
@ -116,9 +120,9 @@ describe('plugins/elasticsearch', function () {
it('extends the es.url query string', function () {
request.path = '/elasticsearch/*';
request.query = { foo: 'bar' };
const server = stubServer({ 'elasticsearch.url': 'https://localhost:9200/?base=query' });
const settings = { url: 'https://localhost:9200/?base=query' };
mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) {
mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) {
expect(err).to.be(null);
expect(upstreamUri).to.be('https://localhost:9200/*?foo=bar&base=query');
});
@ -127,9 +131,8 @@ describe('plugins/elasticsearch', function () {
it('filters the _ querystring param', function () {
request.path = '/elasticsearch/*';
request.query = { _: Date.now() };
const server = stubServer();
mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) {
mapUri(stubCluster(), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) {
expect(err).to.be(null);
expect(upstreamUri).to.be('http://localhost:9200/*');
});

View file

@ -9,7 +9,7 @@ describe('plugins/elasticsearch', function () {
describe('lib/upgrade_config', function () {
let get;
let server;
let client;
let callWithInternalUser;
let config;
let upgrade;
@ -18,7 +18,9 @@ describe('plugins/elasticsearch', function () {
get.withArgs('kibana.index').returns('.my-kibana');
get.withArgs('pkg.version').returns('4.0.1');
get.withArgs('pkg.buildNum').returns(Math.random());
client = { create: sinon.stub() };
callWithInternalUser = sinon.stub();
server = {
log: sinon.stub(),
config: function () {
@ -26,7 +28,13 @@ describe('plugins/elasticsearch', function () {
get: get
};
},
plugins: { elasticsearch: { client: client } }
plugins: {
elasticsearch: {
getCluster: sinon.stub().withArgs('admin').returns({
callWithInternalUser: callWithInternalUser
})
}
}
};
upgrade = upgradeConfig(server);
});
@ -35,7 +43,7 @@ describe('plugins/elasticsearch', function () {
const response = { hits: { hits:[] } };
beforeEach(function () {
client.create.returns(Promise.resolve());
callWithInternalUser.withArgs('create', sinon.match.any).returns(Promise.resolve());
});
describe('production', function () {
@ -47,15 +55,15 @@ describe('plugins/elasticsearch', function () {
it('should resolve buildNum to pkg.buildNum config', function () {
return upgrade(response).then(function (resp) {
sinon.assert.calledOnce(client.create);
const params = client.create.args[0][0];
sinon.assert.calledOnce(callWithInternalUser);
const params = callWithInternalUser.args[0][1];
expect(params.body).to.have.property('buildNum', get('pkg.buildNum'));
});
});
it('should resolve version to pkg.version config', function () {
return upgrade(response).then(function (resp) {
const params = client.create.args[0][0];
const params = callWithInternalUser.args[0][1];
expect(params).to.have.property('id', get('pkg.version'));
});
});
@ -70,14 +78,14 @@ describe('plugins/elasticsearch', function () {
it('should resolve buildNum to pkg.buildNum config', function () {
return upgrade(response).then(function (resp) {
const params = client.create.args[0][0];
const params = callWithInternalUser.args[0][1];
expect(params.body).to.have.property('buildNum', get('pkg.buildNum'));
});
});
it('should resolve version to pkg.version config', function () {
return upgrade(response).then(function (resp) {
const params = client.create.args[0][0];
const params = callWithInternalUser.args[0][1];
expect(params).to.have.property('id', get('pkg.version'));
});
});
@ -93,11 +101,12 @@ describe('plugins/elasticsearch', function () {
it('should create new config if the nothing is upgradeable', function () {
get.withArgs('pkg.buildNum').returns(9833);
client.create.returns(Promise.resolve());
callWithInternalUser.withArgs('create', sinon.match.any).returns(Promise.resolve());
const response = { hits: { hits: [ { _id: '4.0.1-alpha3' }, { _id: '4.0.1-beta1' }, { _id: '4.0.0-SNAPSHOT1' } ] } };
return upgrade(response).then(function (resp) {
sinon.assert.calledOnce(client.create);
const params = client.create.args[0][0];
sinon.assert.calledOnce(callWithInternalUser);
const params = callWithInternalUser.args[0][1];
expect(params).to.have.property('body');
expect(params.body).to.have.property('buildNum', 9833);
expect(params).to.have.property('index', '.my-kibana');
@ -108,11 +117,13 @@ describe('plugins/elasticsearch', function () {
it('should update the build number on the new config', function () {
get.withArgs('pkg.buildNum').returns(5801);
client.create.returns(Promise.resolve());
callWithInternalUser.withArgs('create', sinon.match.any).returns(Promise.resolve());
const response = { hits: { hits: [ { _id: '4.0.0', _source: { buildNum: 1 } } ] } };
return upgrade(response).then(function (resp) {
sinon.assert.calledOnce(client.create);
const params = client.create.args[0][0];
sinon.assert.calledOnce(callWithInternalUser);
const params = callWithInternalUser.args[0][1];
expect(params).to.have.property('body');
expect(params.body).to.have.property('buildNum', 5801);
expect(params).to.have.property('index', '.my-kibana');
@ -123,8 +134,10 @@ describe('plugins/elasticsearch', function () {
it('should log a message for upgrades', function () {
get.withArgs('pkg.buildNum').returns(5801);
client.create.returns(Promise.resolve());
callWithInternalUser.withArgs('create', sinon.match.any).returns(Promise.resolve());
const response = { hits: { hits: [ { _id: '4.0.0', _source: { buildNum: 1 } } ] } };
return upgrade(response).then(function (resp) {
sinon.assert.calledOnce(server.log);
expect(server.log.args[0][0]).to.eql(['plugin', 'elasticsearch']);
@ -137,11 +150,13 @@ describe('plugins/elasticsearch', function () {
it('should copy attributes from old config', function () {
get.withArgs('pkg.buildNum').returns(5801);
client.create.returns(Promise.resolve());
callWithInternalUser.withArgs('create', sinon.match.any).returns(Promise.resolve());
const response = { hits: { hits: [ { _id: '4.0.0', _source: { buildNum: 1, defaultIndex: 'logstash-*' } } ] } };
return upgrade(response).then(function (resp) {
sinon.assert.calledOnce(client.create);
const params = client.create.args[0][0];
sinon.assert.calledOnce(callWithInternalUser);
const params = callWithInternalUser.args[0][1];
expect(params).to.have.property('body');
expect(params.body).to.have.property('defaultIndex', 'logstash-*');
});

View file

@ -1,31 +0,0 @@
import _ from 'lodash';
import Promise from 'bluebird';
import Boom from 'boom';
import toPath from 'lodash/internal/toPath';
import filterHeaders from './filter_headers';
module.exports = (server, client) => {
return (req, endpoint, clientParams = {}, options = {}) => {
const wrap401Errors = options.wrap401Errors !== false;
const filteredHeaders = filterHeaders(req.headers, server.config().get('elasticsearch.requestHeadersWhitelist'));
_.set(clientParams, 'headers', filteredHeaders);
const path = toPath(endpoint);
const api = _.get(client, path);
let apiContext = _.get(client, path.slice(0, -1));
if (_.isEmpty(apiContext)) {
apiContext = client;
}
if (!api) throw new Error(`callWithRequest called with an invalid endpoint: ${endpoint}`);
return api.call(apiContext, clientParams)
.catch((err) => {
if (!wrap401Errors || err.statusCode !== 401) {
return Promise.reject(err);
}
const boomError = Boom.wrap(err, err.statusCode);
const wwwAuthHeader = _.get(err, 'body.error.header[WWW-Authenticate]');
boomError.output.headers['WWW-Authenticate'] = wwwAuthHeader || 'Basic realm="Authorization Required"';
throw boomError;
});
};
};

View file

@ -0,0 +1,39 @@
export function clientLogger(server) {
return class ElasticsearchClientLogging {
// additional tags to differentiate connection
tags = [];
logQueries = false;
error(err) {
server.log(['error', 'elasticsearch'].concat(this.tags), err);
}
warning(message) {
server.log(['warning', 'elasticsearch'].concat(this.tags), message);
}
trace(method, options, query, _response, statusCode) {
/* Check if query logging is enabled
* It requires Kibana to be configured with verbose logging turned on. */
if (this.logQueries) {
const methodAndPath = `${method} ${options.path}`;
const queryDsl = query ? query.trim() : '';
server.log(['elasticsearch', 'query', 'debug'].concat(this.tags), [
statusCode,
methodAndPath,
queryDsl
].join('\n'));
}
}
// elasticsearch-js expects the following functions to exist
info() {}
debug() {}
close() {}
};
}

View file

@ -0,0 +1,110 @@
import elasticsearch from 'elasticsearch';
import { get, set, isEmpty, cloneDeep, pick } from 'lodash';
import toPath from 'lodash/internal/toPath';
import Boom from 'boom';
import filterHeaders from './filter_headers';
import { parseConfig } from './parse_config';
export class Cluster {
constructor(config) {
this._config = Object.assign({}, config);
this.errors = elasticsearch.errors;
this._client = this.createClient();
this._noAuthClient = this.createClient({ auth: false });
return this;
}
callWithRequest = (req = {}, endpoint, clientParams = {}, options = {}) => {
if (req.headers) {
const filteredHeaders = filterHeaders(req.headers, this.getRequestHeadersWhitelist());
set(clientParams, 'headers', filteredHeaders);
}
return callAPI(this._noAuthClient, endpoint, clientParams, options);
}
callWithInternalUser = (endpoint, clientParams = {}, options = {}) => {
return callAPI(this._client, endpoint, clientParams, options);
}
getRequestHeadersWhitelist = () => getClonedProperty(this._config, 'requestHeadersWhitelist');
getCustomHeaders = () => getClonedProperty(this._config, 'customHeaders');
getRequestTimeout = () => getClonedProperty(this._config, 'requestTimeout');
getUrl = () => getClonedProperty(this._config, 'url');
getSsl = () => getClonedProperty(this._config, 'ssl');
getClient = () => this._client;
close() {
if (this._client) {
this._client.close();
}
if (this._noAuthClient) {
this._noAuthClient.close();
}
}
createClient = configOverrides => {
const config = Object.assign({}, this._getClientConfig(), configOverrides);
return new elasticsearch.Client(parseConfig(config));
}
_getClientConfig = () => {
return getClonedProperties(this._config, [
'url',
'ssl',
'username',
'password',
'customHeaders',
'plugins',
'apiVersion',
'keepAlive',
'pingTimeout',
'requestTimeout',
'log'
]);
}
}
function callAPI(client, endpoint, clientParams = {}, options = {}) {
const wrap401Errors = options.wrap401Errors !== false;
const clientPath = toPath(endpoint);
const api = get(client, clientPath);
let apiContext = get(client, clientPath.slice(0, -1));
if (isEmpty(apiContext)) {
apiContext = client;
}
if (!api) {
throw new Error(`called with an invalid endpoint: ${endpoint}`);
}
return api.call(apiContext, clientParams).catch((err) => {
if (!wrap401Errors || err.statusCode !== 401) {
return Promise.reject(err);
}
const boomError = Boom.wrap(err, err.statusCode);
const wwwAuthHeader = get(err, 'body.error.header[WWW-Authenticate]');
boomError.output.headers['WWW-Authenticate'] = wwwAuthHeader || 'Basic realm="Authorization Required"';
throw boomError;
});
}
function getClonedProperties(config, paths) {
return cloneDeep(paths ? pick(config, paths) : config);
}
function getClonedProperty(config, path) {
return cloneDeep(path ? get(config, path) : config);
}

View file

@ -0,0 +1,19 @@
import { bindKey } from 'lodash';
import { clientLogger } from './client_logger';
export function createAdminCluster(server) {
const config = server.config();
const ElasticsearchClientLogging = clientLogger(server);
class AdminClientLogging extends ElasticsearchClientLogging {
tags = ['admin'];
logQueries = config.get('elasticsearch.logQueries');
}
const adminCluster = server.plugins.elasticsearch.createCluster(
'admin',
Object.assign({ log: AdminClientLogging }, config.get('elasticsearch'))
);
server.on('close', bindKey(adminCluster, 'close'));
}

View file

@ -1,32 +1,15 @@
import url from 'url';
import _ from 'lodash';
import { get, size } from 'lodash';
const readFile = (file) => require('fs').readFileSync(file, 'utf8');
import http from 'http';
import https from 'https';
module.exports = _.memoize(function (server) {
const config = server.config();
const target = url.parse(config.get('elasticsearch.url'));
import { parseConfig } from './parse_config';
export default function (config) {
const target = url.parse(get(config, 'url'));
if (!/^https/.test(target.protocol)) return new http.Agent();
const agentOptions = {
rejectUnauthorized: config.get('elasticsearch.ssl.verify')
};
if (_.size(config.get('elasticsearch.ssl.ca'))) {
agentOptions.ca = config.get('elasticsearch.ssl.ca').map(readFile);
}
// Add client certificate and key if required by elasticsearch
if (config.get('elasticsearch.ssl.cert') && config.get('elasticsearch.ssl.key')) {
agentOptions.cert = readFile(config.get('elasticsearch.ssl.cert'));
agentOptions.key = readFile(config.get('elasticsearch.ssl.key'));
}
return new https.Agent(agentOptions);
});
// See https://lodash.com/docs#memoize: We use a Map() instead of the default, because we want the keys in the cache
// to be the server objects, and by default these would be coerced to strings as keys (which wouldn't be useful)
module.exports.cache = new Map();
return new https.Agent(parseConfig(config).ssl);
}

View file

@ -0,0 +1,25 @@
import { Cluster } from './cluster';
import { get, set } from 'lodash';
export function createClusters(server) {
const esPlugin = server.plugins.elasticsearch;
esPlugin._clusters = esPlugin._clusters || new Map();
return {
get(name) {
return esPlugin._clusters.get(name);
},
create(name, config) {
const cluster = new Cluster(config);
if (esPlugin._clusters.has(name)) {
throw new Error(`cluster '${name}' already exists`);
}
esPlugin._clusters.set(name, cluster);
return cluster;
}
};
}

View file

@ -0,0 +1,27 @@
import { bindKey } from 'lodash';
import { clientLogger } from './client_logger';
export function createDataCluster(server) {
const config = server.config();
const ElasticsearchClientLogging = clientLogger(server);
class DataClientLogging extends ElasticsearchClientLogging {
tags = ['data'];
logQueries = getConfig().logQueries;
}
function getConfig() {
if (Boolean(config.get('elasticsearch.tribe.url'))) {
return config.get('elasticsearch.tribe');
}
return config.get('elasticsearch');
}
const dataCluster = server.plugins.elasticsearch.createCluster(
'data',
Object.assign({ log: DataClientLogging }, getConfig())
);
server.on('close', bindKey(dataCluster, 'close'));
}

View file

@ -2,10 +2,10 @@ import { format } from 'util';
import { mappings } from './kibana_index_mappings';
module.exports = function (server) {
const client = server.plugins.elasticsearch.client;
const { callWithInternalUser } = server.plugins.elasticsearch.getCluster('admin');
const index = server.config().get('kibana.index');
return client.indices.create({
return callWithInternalUser('indices.create', {
index: index,
body: {
settings: {
@ -18,7 +18,7 @@ module.exports = function (server) {
throw new Error(`Unable to create Kibana index "${index}"`);
})
.then(function () {
return client.cluster.health({
return callWithInternalUser('cluster.health', {
waitForStatus: 'yellow',
index: index
})

View file

@ -3,48 +3,60 @@ import mapUri from './map_uri';
import { resolve } from 'url';
import { assign } from 'lodash';
function createProxy(server, method, route, config) {
function createProxy(server, method, path, config) {
const proxies = new Map([
['/elasticsearch', server.plugins.elasticsearch.getCluster('data')],
['/es_admin', server.plugins.elasticsearch.getCluster('admin')]
]);
const options = {
method: method,
path: createProxy.createPath(route),
config: {
timeout: {
socket: server.config().get('elasticsearch.requestTimeout')
}
},
handler: {
proxy: {
mapUri: mapUri(server),
agent: createAgent(server),
xforward: true,
timeout: server.config().get('elasticsearch.requestTimeout'),
onResponse: function (err, responseFromUpstream, request, reply) {
if (err) {
reply(err);
return;
}
const responseHandler = function (err, upstreamResponse, request, reply) {
if (err) {
reply(err);
return;
}
if (responseFromUpstream.headers.location) {
// TODO: Workaround for #8705 until hapi has been updated to >= 15.0.0
responseFromUpstream.headers.location = encodeURI(responseFromUpstream.headers.location);
}
if (upstreamResponse.headers.location) {
// TODO: Workaround for #8705 until hapi has been updated to >= 15.0.0
upstreamResponse.headers.location = encodeURI(upstreamResponse.headers.location);
}
reply(null, responseFromUpstream);
}
}
},
reply(null, upstreamResponse);
};
assign(options.config, config);
for (const [proxyPrefix, cluster] of proxies) {
const options = {
method,
path: createProxy.createPath(proxyPrefix, path),
config: {
timeout: {
socket: cluster.getRequestTimeout()
}
},
handler: {
proxy: {
mapUri: mapUri(cluster, proxyPrefix),
agent: createAgent({
url: cluster.getUrl(),
ssl: cluster.getSsl()
}),
xforward: true,
timeout: cluster.getRequestTimeout(),
onResponse: responseHandler
}
},
};
server.route(options);
assign(options.config, config);
server.route(options);
}
}
createProxy.createPath = function createPath(path) {
const pre = '/elasticsearch';
const sep = path[0] === '/' ? '' : '/';
return `${pre}${sep}${path}`;
createProxy.createPath = function createPath(prefix, path) {
path = path[0] === '/' ? path : `/${path}`;
prefix = prefix[0] === '/' ? prefix : `/${prefix}`;
return `${prefix}${path}`;
};
module.exports = createProxy;

View file

@ -3,25 +3,30 @@
* that defined in Kibana's package.json.
*/
import _ from 'lodash';
import { forEach, get } from 'lodash';
import isEsCompatibleWithKibana from './is_es_compatible_with_kibana';
/**
* tracks the node descriptions that get logged in warnings so
* that we don't spam the log with the same message over and over.
* tracks the node descriptions that get logged in warnings so
* that we don't spam the log with the same message over and over.
*
* There are situations, like in testing or multi-tenancy, where
* the server argument changes, so we must track the previous
* node warnings per server
* There are situations, like in testing or multi-tenancy, where
* the server argument changes, so we must track the previous
* node warnings per server
*/
const lastWarnedNodesForServer = new WeakMap();
module.exports = function checkEsVersion(server, kibanaVersion) {
export function ensureEsVersion(server, kibanaVersion) {
const { callWithInternalUser } = server.plugins.elasticsearch.getCluster('admin');
server.log(['plugin', 'debug'], 'Checking Elasticsearch version');
const client = server.plugins.elasticsearch.client;
return client.nodes.info()
return callWithInternalUser('nodes.info', {
filterPath: [
'nodes.*.version',
'nodes.*.http.publish_address',
'nodes.*.ip',
]
})
.then(function (info) {
// Aggregate incompatible ES nodes.
const incompatibleNodes = [];
@ -29,7 +34,7 @@ module.exports = function checkEsVersion(server, kibanaVersion) {
// Aggregate ES nodes which should prompt a Kibana upgrade.
const warningNodes = [];
_.forEach(info.nodes, esNode => {
forEach(info.nodes, esNode => {
if (!isEsCompatibleWithKibana(esNode.version, kibanaVersion)) {
// Exit early to avoid collecting ES nodes with newer major versions in the `warningNodes`.
return incompatibleNodes.push(esNode);
@ -44,7 +49,7 @@ module.exports = function checkEsVersion(server, kibanaVersion) {
function getHumanizedNodeNames(nodes) {
return nodes.map(node => {
const publishAddress = _.get(node, 'http.publish_address') ? (_.get(node, 'http.publish_address') + ' ') : '';
const publishAddress = get(node, 'http.publish_address') ? (get(node, 'http.publish_address') + ' ') : '';
return 'v' + node.version + ' @ ' + publishAddress + '(' + node.ip + ')';
});
}
@ -53,7 +58,7 @@ module.exports = function checkEsVersion(server, kibanaVersion) {
const simplifiedNodes = warningNodes.map(node => ({
version: node.version,
http: {
publish_address: _.get(node, 'http.publish_address')
publish_address: get(node, 'http.publish_address')
},
ip: node.ip,
}));
@ -85,4 +90,4 @@ module.exports = function checkEsVersion(server, kibanaVersion) {
return true;
});
};
}

View file

@ -0,0 +1,18 @@
import { get } from 'lodash';
export function ensureNotTribe(callWithInternalUser) {
return callWithInternalUser('nodes.info', {
nodeId: '_local',
filterPath: 'nodes.*.settings.tribe'
})
.then(function (info) {
const nodeId = Object.keys(info.nodes || {})[0];
const tribeSettings = get(info, ['nodes', nodeId, 'settings', 'tribe']);
if (tribeSettings) {
throw new Error('Kibana does not support using tribe nodes as the primary elasticsearch connection.');
}
return true;
});
}

View file

@ -1,99 +0,0 @@
import elasticsearch from 'elasticsearch';
import _ from 'lodash';
import Bluebird from 'bluebird';
const readFile = (file) => require('fs').readFileSync(file, 'utf8');
import util from 'util';
import url from 'url';
import callWithRequest from './call_with_request';
import filterHeaders from './filter_headers';
module.exports = function (server) {
const config = server.config();
class ElasticsearchClientLogging {
error(err) {
server.log(['error', 'elasticsearch'], err);
}
warning(message) {
server.log(['warning', 'elasticsearch'], message);
}
info() {}
debug() {}
trace() {}
close() {}
}
function createClient(options) {
options = _.defaults(options || {}, {
url: config.get('elasticsearch.url'),
username: config.get('elasticsearch.username'),
password: config.get('elasticsearch.password'),
verifySsl: config.get('elasticsearch.ssl.verify'),
clientCrt: config.get('elasticsearch.ssl.cert'),
clientKey: config.get('elasticsearch.ssl.key'),
ca: config.get('elasticsearch.ssl.ca'),
apiVersion: config.get('elasticsearch.apiVersion'),
pingTimeout: config.get('elasticsearch.pingTimeout'),
requestTimeout: config.get('elasticsearch.requestTimeout'),
keepAlive: true,
auth: true
});
const uri = url.parse(options.url);
let authorization;
if (options.auth && options.username && options.password) {
uri.auth = util.format('%s:%s', options.username, options.password);
}
const ssl = { rejectUnauthorized: options.verifySsl };
if (options.clientCrt && options.clientKey) {
ssl.cert = readFile(options.clientCrt);
ssl.key = readFile(options.clientKey);
}
if (options.ca) {
ssl.ca = options.ca.map(readFile);
}
const host = {
host: uri.hostname,
port: uri.port,
protocol: uri.protocol,
path: uri.pathname,
auth: uri.auth,
query: uri.query,
headers: config.get('elasticsearch.customHeaders')
};
return new elasticsearch.Client({
host,
ssl,
plugins: options.plugins,
apiVersion: options.apiVersion,
keepAlive: options.keepAlive,
pingTimeout: options.pingTimeout,
requestTimeout: options.requestTimeout,
defer: function () {
return Bluebird.defer();
},
log: ElasticsearchClientLogging
});
}
const client = createClient();
server.on('close', _.bindKey(client, 'close'));
const noAuthClient = createClient({ auth: false });
server.on('close', _.bindKey(noAuthClient, 'close'));
server.expose('ElasticsearchClientLogging', ElasticsearchClientLogging);
server.expose('client', client);
server.expose('createClient', createClient);
server.expose('callWithRequestFactory', _.partial(callWithRequest, server));
server.expose('callWithRequest', callWithRequest(server, noAuthClient));
server.expose('filterHeaders', filterHeaders);
server.expose('errors', elasticsearch.errors);
return client;
};

View file

@ -1,11 +1,11 @@
import _ from 'lodash';
import Promise from 'bluebird';
import elasticsearch from 'elasticsearch';
import exposeClient from './expose_client';
import migrateConfig from './migrate_config';
import createKibanaIndex from './create_kibana_index';
import checkEsVersion from './check_es_version';
import kibanaVersion from './kibana_version';
import { ensureEsVersion } from './ensure_es_version';
import { ensureNotTribe } from './ensure_not_tribe';
const NoConnections = elasticsearch.errors.NoConnections;
import util from 'util';
@ -15,27 +15,25 @@ const NO_INDEX = 'no_index';
const INITIALIZING = 'initializing';
const READY = 'ready';
const REQUEST_DELAY = 2500;
module.exports = function (plugin, server) {
const config = server.config();
const client = server.plugins.elasticsearch.client;
const callAdminAsKibanaUser = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser;
const callDataAsKibanaUser = server.plugins.elasticsearch.getCluster('data').callWithInternalUser;
const REQUEST_DELAY = config.get('elasticsearch.healthCheck.delay');
plugin.status.yellow('Waiting for Elasticsearch');
function waitForPong() {
return client.ping().catch(function (err) {
function waitForPong(callWithInternalUser, url) {
return callWithInternalUser('ping').catch(function (err) {
if (!(err instanceof NoConnections)) throw err;
plugin.status.red(format('Unable to connect to Elasticsearch at %s.', url));
plugin.status.red(format('Unable to connect to Elasticsearch at %s.', config.get('elasticsearch.url')));
return Promise.delay(REQUEST_DELAY).then(waitForPong);
return Promise.delay(REQUEST_DELAY).then(waitForPong.bind(null, callWithInternalUser, url));
});
}
// just figure out the current "health" of the es setup
function getHealth() {
return client.cluster.health({
return callAdminAsKibanaUser('cluster.health', {
timeout: '5s', // tells es to not sit around and wait forever
index: config.get('kibana.index'),
ignore: [408]
@ -82,7 +80,7 @@ module.exports = function (plugin, server) {
}
function waitForEsVersion() {
return checkEsVersion(server, kibanaVersion.get()).catch(err => {
return ensureEsVersion(server, kibanaVersion.get()).catch(err => {
plugin.status.red(err);
return Promise.delay(REQUEST_DELAY).then(waitForEsVersion);
});
@ -93,14 +91,26 @@ module.exports = function (plugin, server) {
}
function check() {
return waitForPong()
.then(waitForEsVersion)
.then(waitForShards)
const healthCheck =
waitForPong(callAdminAsKibanaUser, config.get('elasticsearch.url'))
.then(waitForEsVersion)
.then(ensureNotTribe.bind(this, callAdminAsKibanaUser))
.then(waitForShards)
.then(_.partial(migrateConfig, server))
.then(() => {
const tribeUrl = config.get('elasticsearch.tribe.url');
if (tribeUrl) {
return waitForPong(callDataAsKibanaUser, tribeUrl)
.then(() => ensureEsVersion(server, kibanaVersion.get(), callDataAsKibanaUser));
}
});
return healthCheck
.then(setGreenStatus)
.then(_.partial(migrateConfig, server))
.catch(err => plugin.status.red(err));
}
let timeoutId = null;
function scheduleCheck(ms) {

View file

@ -3,9 +3,7 @@ import { parse as parseUrl, format as formatUrl, resolve } from 'url';
import filterHeaders from './filter_headers';
import setHeaders from './set_headers';
export default function mapUri(server, prefix) {
const config = server.config();
export default function mapUri(cluster, proxyPrefix) {
function joinPaths(pathA, pathB) {
return trimRight(pathA, '/') + '/' + trimLeft(pathB, '/');
}
@ -19,7 +17,7 @@ export default function mapUri(server, prefix) {
port: esUrlPort,
pathname: esUrlBasePath,
query: esUrlQuery
} = parseUrl(config.get('elasticsearch.url'), true);
} = parseUrl(cluster.getUrl(), true);
// copy most url components directly from the elasticsearch.url
const mappedUrlComponents = {
@ -31,17 +29,17 @@ export default function mapUri(server, prefix) {
};
// pathname
const reqSubPath = request.path.replace('/elasticsearch', '');
const reqSubPath = request.path.replace(proxyPrefix, '');
mappedUrlComponents.pathname = joinPaths(esUrlBasePath, reqSubPath);
// querystring
const mappedQuery = defaults(omit(request.query, '_'), esUrlQuery || {});
const mappedQuery = defaults(omit(request.query, '_'), esUrlQuery);
if (Object.keys(mappedQuery).length) {
mappedUrlComponents.query = mappedQuery;
}
const filteredHeaders = filterHeaders(request.headers, config.get('elasticsearch.requestHeadersWhitelist'));
const mappedHeaders = setHeaders(filteredHeaders, config.get('elasticsearch.customHeaders'));
const filteredHeaders = filterHeaders(request.headers, cluster.getRequestHeadersWhitelist());
const mappedHeaders = setHeaders(filteredHeaders, cluster.getCustomHeaders());
const mappedUrl = formatUrl(mappedUrlComponents);
done(null, mappedUrl, mappedHeaders);
};

View file

@ -3,7 +3,8 @@ import { mappings } from './kibana_index_mappings';
module.exports = function (server) {
const config = server.config();
const client = server.plugins.elasticsearch.client;
const { callWithInternalUser } = server.plugins.elasticsearch.getCluster('admin');
const options = {
index: config.get('kibana.index'),
type: 'config',
@ -20,5 +21,5 @@ module.exports = function (server) {
}
};
return client.search(options).then(upgrade(server));
return callWithInternalUser('search', options).then(upgrade(server));
};

View file

@ -0,0 +1,47 @@
import util from 'util';
import url from 'url';
import { get, size, pick } from 'lodash';
import { readFileSync } from 'fs';
import Bluebird from 'bluebird';
const readFile = (file) => readFileSync(file, 'utf8');
export function parseConfig(serverConfig = {}) {
const config = Object.assign({
keepAlive: true
}, pick(serverConfig, [
'plugins', 'apiVersion', 'keepAlive', 'pingTimeout',
'requestTimeout', 'log', 'logQueries'
]));
const uri = url.parse(serverConfig.url);
config.host = {
host: uri.hostname,
port: uri.port,
protocol: uri.protocol,
path: uri.pathname,
query: uri.query,
headers: serverConfig.customHeaders
};
// Auth
if (serverConfig.auth !== false && serverConfig.username && serverConfig.password) {
config.host.auth = util.format('%s:%s', serverConfig.username, serverConfig.password);
}
// SSL
config.ssl = { rejectUnauthorized: get(serverConfig, 'ssl.verify') };
if (get(serverConfig, 'ssl.cert') && get(serverConfig, 'ssl.key')) {
config.ssl.cert = readFile(serverConfig.ssl.cert);
config.ssl.key = readFile(serverConfig.ssl.key);
}
if (size(get(serverConfig, 'ssl.ca'))) {
config.ssl.ca = serverConfig.ssl.ca.map(readFile);
}
config.defer = () => Bluebird.defer();
return config;
}

View file

@ -6,11 +6,11 @@ import { format } from 'util';
module.exports = function (server) {
const MAX_INTEGER = Math.pow(2, 53) - 1;
const client = server.plugins.elasticsearch.client;
const { callWithInternalUser } = server.plugins.elasticsearch.getCluster('admin');
const config = server.config();
function createNewConfig() {
return client.create({
return callWithInternalUser('create', {
index: config.get('kibana.index'),
type: 'config',
body: { buildNum: config.get('pkg.buildNum') },
@ -31,7 +31,9 @@ module.exports = function (server) {
return hit._id !== '@@version' && hit._id === config.get('pkg.version');
});
if (devConfig) return Promise.resolve();
if (devConfig) {
return Promise.resolve();
}
// Look for upgradeable configs. If none of them are upgradeable
// then create a new one.
@ -50,7 +52,7 @@ module.exports = function (server) {
newVersion: config.get('pkg.version')
});
return client.create({
return callWithInternalUser('create', {
index: config.get('kibana.index'),
type: 'config',
body: body._source,

View file

@ -16,6 +16,6 @@ require('plugins/kibana/management/saved_object_registry').register({
});
// This is the only thing that gets injected into controllers
module.service('savedDashboards', function (SavedDashboard, kbnIndex, es, kbnUrl) {
return new SavedObjectLoader(SavedDashboard, kbnIndex, es, kbnUrl);
module.service('savedDashboards', function (SavedDashboard, kbnIndex, esAdmin, kbnUrl) {
return new SavedObjectLoader(SavedDashboard, kbnIndex, esAdmin, kbnUrl);
});

View file

@ -4,7 +4,6 @@ import 'plugins/kibana/discover/saved_searches/_saved_search';
import 'ui/notify';
import uiModules from 'ui/modules';
import { SavedObjectLoader } from 'ui/courier/saved_object/saved_object_loader';
const module = uiModules.get('discover/saved_searches', [
'kibana/notify'
]);
@ -16,8 +15,8 @@ require('plugins/kibana/management/saved_object_registry').register({
title: 'searches'
});
module.service('savedSearches', function (Promise, config, kbnIndex, es, createNotifier, SavedSearch, kbnUrl) {
const savedSearchLoader = new SavedObjectLoader(SavedSearch, kbnIndex, es, kbnUrl);
module.service('savedSearches', function (Promise, config, kbnIndex, esAdmin, createNotifier, SavedSearch, kbnUrl) {
const savedSearchLoader = new SavedObjectLoader(SavedSearch, kbnIndex, esAdmin, kbnUrl);
// Customize loader properties since adding an 's' on type doesn't work for type 'search' .
savedSearchLoader.loaderProperties = {
name: 'searches',
@ -28,5 +27,6 @@ module.service('savedSearches', function (Promise, config, kbnIndex, es, createN
savedSearchLoader.urlFor = function (id) {
return kbnUrl.eval('#/discover/{{id}}', { id: id });
};
return savedSearchLoader;
});

View file

@ -1,6 +1,6 @@
export default function RefreshKibanaIndexFn(es, kbnIndex) {
export default function RefreshKibanaIndexFn(esAdmin, kbnIndex) {
return function () {
return es.indices.refresh({
return esAdmin.indices.refresh({
index: kbnIndex
});
};

View file

@ -19,7 +19,7 @@ uiModules.get('apps/management')
return {
restrict: 'E',
controllerAs: 'managementObjectsController',
controller: function ($scope, $injector, $q, AppState, es) {
controller: function ($scope, $injector, $q, AppState, esAdmin) {
const notify = new Notifier({ location: 'Saved Objects' });
// TODO: Migrate all scope variables to the controller.
@ -125,7 +125,7 @@ uiModules.get('apps/management')
function retrieveAndExportDocs(objs) {
if (!objs.length) return notify.error('No saved objects to export.');
es.mget({
esAdmin.mget({
index: kbnIndex,
body: { docs: objs.map(transformToMget) }
})
@ -167,7 +167,7 @@ uiModules.get('apps/management')
};
function refreshIndex() {
return es.indices.refresh({
return esAdmin.indices.refresh({
index: kbnIndex
});
}

View file

@ -16,7 +16,7 @@ uiModules.get('apps/management')
.directive('kbnManagementObjectsView', function (kbnIndex, Notifier) {
return {
restrict: 'E',
controller: function ($scope, $injector, $routeParams, $location, $window, $rootScope, es, Private) {
controller: function ($scope, $injector, $routeParams, $location, $window, $rootScope, esAdmin, Private) {
const notify = new Notifier({ location: 'SavedObject view' });
const castMappingType = Private(IndexPatternsCastMappingTypeProvider);
const serviceObj = registry.get($routeParams.service);
@ -104,7 +104,7 @@ uiModules.get('apps/management')
$scope.title = service.type;
es.get({
esAdmin.get({
index: kbnIndex,
type: service.type,
id: $routeParams.id
@ -163,7 +163,7 @@ uiModules.get('apps/management')
* @returns {type} description
*/
$scope.delete = function () {
es.delete({
esAdmin.delete({
index: kbnIndex,
type: service.type,
id: $routeParams.id
@ -191,7 +191,7 @@ uiModules.get('apps/management')
_.set(source, field.name, value);
});
es.index({
esAdmin.index({
index: kbnIndex,
type: service.type,
id: $routeParams.id,
@ -204,7 +204,7 @@ uiModules.get('apps/management')
};
function redirectHandler(action) {
return es.indices.refresh({
return esAdmin.indices.refresh({
index: kbnIndex
})
.then(function (resp) {

View file

@ -15,13 +15,13 @@ require('plugins/kibana/management/saved_object_registry').register({
title: 'visualizations'
});
app.service('savedVisualizations', function (Promise, es, kbnIndex, SavedVis, Private, Notifier, kbnUrl) {
app.service('savedVisualizations', function (Promise, esAdmin, kbnIndex, SavedVis, Private, Notifier, kbnUrl) {
const visTypes = Private(RegistryVisTypesProvider);
const notify = new Notifier({
location: 'Saved Visualization Service'
});
const saveVisualizationLoader = new SavedObjectLoader(SavedVis, kbnIndex, es, kbnUrl);
const saveVisualizationLoader = new SavedObjectLoader(SavedVis, kbnIndex, esAdmin, kbnUrl);
saveVisualizationLoader.mapHits = function (hit) {
const source = hit._source;
source.id = hit._id;

View file

@ -6,7 +6,7 @@ export function registerFieldCapabilities(server) {
path: '/api/kibana/{indices}/field_capabilities',
method: ['GET'],
handler: function (req, reply) {
const callWithRequest = server.plugins.elasticsearch.callWithRequest;
const { callWithRequest } = server.plugins.elasticsearch.getCluster('data');
const indices = req.params.indices || '';
return callWithRequest(req, 'fieldStats', {

View file

@ -6,7 +6,7 @@ export function registerLanguages(server) {
path: '/api/kibana/scripts/languages',
method: 'GET',
handler: function (request, reply) {
const callWithRequest = server.plugins.elasticsearch.callWithRequest;
const { callWithRequest } = server.plugins.elasticsearch.getCluster('data');
return callWithRequest(request, 'cluster.getSettings', {
include_defaults: true,

View file

@ -6,7 +6,8 @@ export default function registerCount(server) {
path: '/api/kibana/{id}/_count',
method: ['POST', 'GET'],
handler: function (req, reply) {
const boundCallWithRequest = _.partial(server.plugins.elasticsearch.callWithRequest, req);
const { callWithRequest } = server.plugins.elasticsearch.getCluster('data');
const boundCallWithRequest = _.partial(callWithRequest, req);
boundCallWithRequest('count', {
allowNoIndices: false,

View file

@ -15,8 +15,8 @@ define(function (require) {
});
// This is the only thing that gets injected into controllers
module.service('savedSheets', function (Promise, SavedSheet, kbnIndex, es, kbnUrl) {
const savedSheetLoader = new SavedObjectLoader(SavedSheet, kbnIndex, es, kbnUrl);
module.service('savedSheets', function (Promise, SavedSheet, kbnIndex, esAdmin, kbnUrl) {
const savedSheetLoader = new SavedObjectLoader(SavedSheet, kbnIndex, esAdmin, kbnUrl);
savedSheetLoader.urlFor = function (id) {
return kbnUrl.eval('#/{{id}}', { id: id });
};

View file

@ -3,9 +3,8 @@ module.exports = function (server) {
method: 'GET',
path: '/api/timelion/validate/es',
handler: function (request, reply) {
return server.uiSettings().getAll(request).then((uiSettings) => {
const callWithRequest = server.plugins.elasticsearch.callWithRequest;
const { callWithRequest } = server.plugins.elasticsearch.getCluster('data');
const timefield = uiSettings['timelion:es.timefield'];

View file

@ -10,17 +10,22 @@ import esResponse from './fixtures/es_response';
import Promise from 'bluebird';
import _ from 'lodash';
import { expect } from 'chai';
import sinon from 'sinon';
import invoke from './helpers/invoke_series_fn.js';
function stubResponse(response) {
return {
server: { plugins:{
elasticsearch: {
callWithRequest: function () {
return Promise.resolve(response);
server: {
plugins:{
elasticsearch: {
getCluster: sinon.stub().withArgs('data').returns({
callWithRequest: function () {
return Promise.resolve(response);
}
})
}
}
} }
}
};
}

View file

@ -61,9 +61,10 @@ module.exports = new Datasource('es', {
fit: 'nearest'
});
const callWithRequest = tlConfig.server.plugins.elasticsearch.callWithRequest;
const { callWithRequest } = tlConfig.server.plugins.elasticsearch.getCluster('data');
const body = buildRequest(config, tlConfig);
return callWithRequest(tlConfig.request, 'search', body).then(function (resp) {
if (!resp._shards.total) throw new Error('Elasticsearch index not found: ' + config.index);
return {

View file

@ -2,7 +2,7 @@ import crypto from 'crypto';
export default function (server) {
async function updateMetadata(urlId, urlDoc, req) {
const callWithRequest = server.plugins.elasticsearch.callWithRequest;
const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin');
const kibanaIndex = server.config().get('kibana.index');
try {
@ -25,7 +25,7 @@ export default function (server) {
async function getUrlDoc(urlId, req) {
const urlDoc = await new Promise((resolve, reject) => {
const callWithRequest = server.plugins.elasticsearch.callWithRequest;
const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin');
const kibanaIndex = server.config().get('kibana.index');
callWithRequest(req, 'get', {
@ -46,7 +46,7 @@ export default function (server) {
async function createUrlDoc(url, urlId, req) {
const newUrlId = await new Promise((resolve, reject) => {
const callWithRequest = server.plugins.elasticsearch.callWithRequest;
const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin');
const kibanaIndex = server.config().get('kibana.index');
callWithRequest(req, 'index', {

View file

@ -1,12 +1,22 @@
import toPath from 'lodash/internal/toPath';
export default async function (kbnServer, server, config) {
const forcedOverride = {
console: function (enabledInConfig) {
return !config.get('elasticsearch.tribe.url') && enabledInConfig;
}
};
const { plugins } = kbnServer;
for (const plugin of plugins) {
const enabledInConfig = config.get([...toPath(plugin.configPrefix), 'enabled']);
if (!enabledInConfig) {
const hasOveride = forcedOverride.hasOwnProperty(plugin.id);
if (hasOveride) {
if (!forcedOverride[plugin.id](enabledInConfig)) {
plugins.disable(plugin);
}
} else if (!enabledInConfig) {
plugins.disable(plugin);
}
}

View file

@ -31,6 +31,11 @@ module.exports = function (chrome, internals) {
a.href = chrome.addBasePath('/elasticsearch');
return a.href;
}()))
.value('esAdminUrl', (function () {
const a = document.createElement('a');
a.href = chrome.addBasePath('/es_admin');
return a.href;
}()))
.config(chrome.$setupXsrfRequestInterceptor)
.config(['$compileProvider', function ($compileProvider) {
if (!internals.devMode) {

View file

@ -4,7 +4,7 @@ import sinon from 'auto-release-sinon';
import RequestQueueProv from '../_request_queue';
import SearchStrategyProv from '../fetch/strategy/search';
import DocStrategyProv from '../fetch/strategy/doc';
import DocStrategyProv from '../fetch/strategy/doc_data';
describe('Courier Request Queue', function () {
let docStrategy;

View file

@ -9,7 +9,7 @@ import BluebirdPromise from 'bluebird';
import SavedObjectFactory from '../saved_object/saved_object';
import IndexPatternFactory from 'ui/index_patterns/_index_pattern';
import DocSourceProvider from '../data_source/doc_source';
import DocSourceProvider from '../data_source/admin_doc_source';
import { stubMapper } from 'test_utils/stub_mapper';
@ -19,7 +19,8 @@ describe('Saved Object', function () {
let SavedObject;
let IndexPattern;
let esStub;
let esAdminStub;
let esDataStub;
let DocSource;
/**
@ -29,7 +30,7 @@ describe('Saved Object', function () {
// Allows the type 'dashboard' to be used.
// Unfortunately we need to use bluebird here instead of native promises because there is
// a call to finally.
sinon.stub(esStub.indices, 'getFieldMapping').returns(BluebirdPromise.resolve({
sinon.stub(esAdminStub.indices, 'getFieldMapping').returns(BluebirdPromise.resolve({
'.kibana' : {
'mappings': {
'dashboard': {}
@ -38,8 +39,8 @@ describe('Saved Object', function () {
}));
// Necessary to avoid a timeout condition.
sinon.stub(esStub.indices, 'putMapping').returns(BluebirdPromise.resolve());
sinon.stub(esStub.indices, 'refresh').returns(BluebirdPromise.resolve());
sinon.stub(esAdminStub.indices, 'putMapping').returns(BluebirdPromise.resolve());
sinon.stub(esAdminStub.indices, 'refresh').returns(BluebirdPromise.resolve());
}
/**
@ -66,8 +67,10 @@ describe('Saved Object', function () {
* @param {Object} mockDocResponse
*/
function stubESResponse(mockDocResponse) {
sinon.stub(esStub, 'mget').returns(BluebirdPromise.resolve({ docs: [mockDocResponse] }));
sinon.stub(esStub, 'index').returns(BluebirdPromise.resolve(mockDocResponse));
sinon.stub(esDataStub, 'mget').returns(BluebirdPromise.resolve({ docs: [mockDocResponse] }));
sinon.stub(esDataStub, 'index').returns(BluebirdPromise.resolve(mockDocResponse));
sinon.stub(esAdminStub, 'mget').returns(BluebirdPromise.resolve({ docs: [mockDocResponse] }));
sinon.stub(esAdminStub, 'index').returns(BluebirdPromise.resolve(mockDocResponse));
}
/**
@ -84,10 +87,11 @@ describe('Saved Object', function () {
}
beforeEach(ngMock.module('kibana'));
beforeEach(ngMock.inject(function (es, Private) {
beforeEach(ngMock.inject(function (es, esAdmin, Private) {
SavedObject = Private(SavedObjectFactory);
IndexPattern = Private(IndexPatternFactory);
esStub = es;
esAdminStub = esAdmin;
esDataStub = es;
DocSource = Private(DocSourceProvider);
mockEsService();

View file

@ -14,7 +14,8 @@ import SearchStrategyProvider from './fetch/strategy/search';
import RequestQueueProvider from './_request_queue';
import ErrorHandlersProvider from './_error_handlers';
import FetchProvider from './fetch';
import DocLooperProvider from './looper/doc';
import DocDataLooperProvider from './looper/doc_data';
import DocAdminLooperProvider from './looper/doc_admin';
import SearchLooperProvider from './looper/search';
import RootSearchSourceProvider from './data_source/_root_search_source';
import SavedObjectProvider from './saved_object';
@ -34,7 +35,8 @@ uiModules.get('kibana/courier')
const errorHandlers = Private(ErrorHandlersProvider);
const fetch = Private(FetchProvider);
const docLooper = self.docLooper = Private(DocLooperProvider);
const docDataLooper = self.docLooper = Private(DocDataLooperProvider);
const docAdminLooper = self.docLooper = Private(DocAdminLooperProvider);
const searchLooper = self.searchLooper = Private(SearchLooperProvider);
// expose some internal modules
@ -65,7 +67,8 @@ uiModules.get('kibana/courier')
*/
self.start = function () {
searchLooper.start();
docLooper.start();
docDataLooper.start();
docAdminLooper.start();
return this;
};
@ -124,7 +127,8 @@ uiModules.get('kibana/courier')
*/
self.close = function () {
searchLooper.stop();
docLooper.stop();
docAdminLooper.stop();
docDataLooper.stop();
_.invoke(requestQueue, 'abort');

View file

@ -0,0 +1,168 @@
/**
* @name AbstractDocSource
*
* NOTE: This class is tightly coupled with _doc_send_to_es. Its primary
* methods (`doUpdate`, `doIndex`, `doCreate`) are all proxies for methods
* exposed by _doc_send_to_es (`update`, `index`, `create`). These methods are
* called with AbstractDocSource as the context. When called, they depend on private
* AbstractDocSource methods within their execution.
*/
import _ from 'lodash';
import 'ui/es';
import 'ui/storage';
import DocSendToEsProvider from './_doc_send_to_es';
import AbstractDataSourceProvider from './_abstract';
import DocRequestProvider from '../fetch/request/_abstract_doc';
export default function AbstractDocSourceFactory(Private, Promise, es, sessionStorage) {
const sendToEs = Private(DocSendToEsProvider);
const SourceAbstract = Private(AbstractDataSourceProvider);
const DocRequest = Private(DocRequestProvider);
_.class(AbstractDocSource).inherits(SourceAbstract);
function AbstractDocSource(initialState, strategy) {
AbstractDocSource.Super.call(this, initialState, strategy);
}
AbstractDocSource.prototype.onUpdate = SourceAbstract.prototype.onResults;
AbstractDocSource.prototype.onResults = void 0;
/*****
* PUBLIC API
*****/
AbstractDocSource.prototype._createRequest = function (defer) {
return new DocRequest(this, defer);
};
/**
* List of methods that is turned into a chainable API in the constructor
* @type {Array}
*/
AbstractDocSource.prototype._methods = [
'index',
'type',
'id',
'sourceInclude',
'sourceExclude'
];
/**
* Applies a partial update to the document
* @param {object} fields - The fields to change and their new values (es doc field)
* @return {undefined}
*/
AbstractDocSource.prototype.doUpdate = function (fields) {
if (!this._state.id) return this.doIndex(fields);
return sendToEs.call(this, 'update', false, { doc: fields });
};
/**
* Update the document stored
* @param {[type]} body [description]
* @return {[type]} [description]
*/
AbstractDocSource.prototype.doIndex = function (body) {
return sendToEs.call(this, 'index', false, body);
};
AbstractDocSource.prototype.doCreate = function (body) {
return sendToEs.call(this, 'create', false, body, []);
};
/*****
* PRIVATE API
*****/
/**
* Get the type of this SourceAbstract
* @return {string} - 'doc'
*/
AbstractDocSource.prototype._getType = function () {
return 'doc';
};
/**
* Used to merge properties into the state within ._flatten().
* The state is passed in and modified by the function
*
* @param {object} state - the current merged state
* @param {*} val - the value at `key`
* @param {*} key - The key of `val`
* @return {undefined}
*/
AbstractDocSource.prototype._mergeProp = function (state, val, key) {
const flatKey = '_' + key;
if (val != null && state[flatKey] == null) {
state[flatKey] = val;
}
};
/**
* Creates a key based on the doc's index/type/id
* @return {string}
*/
AbstractDocSource.prototype._versionKey = function () {
const state = this._state;
if (!state.index || !state.type || !state.id) return;
return 'DocVersion:' + (
[ state.index, state.type, state.id ]
.map(encodeURIComponent)
.join('/')
);
};
/**
* Get the cached version number, not the version that is
* stored/shared with other tabs
*
* @return {number} - the version number, or undefined
*/
AbstractDocSource.prototype._getVersion = function () {
if (this._version) return this._version;
else return this._getStoredVersion();
};
/**
* Fetches the stored version from storage
* @return {[type]} [description]
*/
AbstractDocSource.prototype._getStoredVersion = function () {
const key = this._versionKey();
if (!key) return;
const v = sessionStorage.get(key);
this._version = v ? _.parseInt(v) : void 0;
return this._version;
};
/**
* Stores the version into storage
* @param {number, NaN} version - the current version number, NaN works well forcing a refresh
* @return {undefined}
*/
AbstractDocSource.prototype._storeVersion = function (version) {
if (!version) return this._clearVersion();
const key = this._versionKey();
if (!key) return;
this._version = version;
sessionStorage.set(key, version);
};
/**
* Clears the stored version for a AbstractDocSource
*/
AbstractDocSource.prototype._clearVersion = function () {
const key = this._versionKey();
if (!key) return;
sessionStorage.remove(key);
};
return AbstractDocSource;
}

View file

@ -11,7 +11,7 @@ import errors from 'ui/errors';
import RequestQueueProvider from 'ui/courier/_request_queue';
import FetchProvider from 'ui/courier/fetch/fetch';
export default function (Promise, Private, es) {
export default function (Promise, Private, es, esAdmin, kbnIndex) {
const requestQueue = Private(RequestQueueProvider);
const courierFetch = Private(FetchProvider);
@ -33,7 +33,8 @@ export default function (Promise, Private, es) {
params.version = doc._getVersion();
}
return es[method](params)
const client = [].concat(params.index).includes(kbnIndex) ? esAdmin : es;
return client[method](params)
.then(function (resp) {
if (resp.status === 409) throw new errors.VersionConflict(resp);

View file

@ -0,0 +1,27 @@
/**
* @name AdminDocSource
*/
import _ from 'lodash';
import AbstractDocSourceProvider from './_abstract_doc_source';
import DocStrategyProvider from '../fetch/strategy/doc_admin';
import DocRequestProvider from '../fetch/request/doc_admin';
export default function DocSourceFactory(Private) {
const AbstractDocSource = Private(AbstractDocSourceProvider);
const docStrategy = Private(DocStrategyProvider);
const DocRequest = Private(DocRequestProvider);
class AdminDocSource extends AbstractDocSource {
constructor(initialState) {
super(initialState, docStrategy);
}
_createRequest(defer) {
return new DocRequest(this, defer);
}
}
return AdminDocSource;
}

View file

@ -1,170 +1,27 @@
/**
* @name DocSource
*
* NOTE: This class is tightly coupled with _doc_send_to_es. Its primary
* methods (`doUpdate`, `doIndex`, `doCreate`) are all proxies for methods
* exposed by _doc_send_to_es (`update`, `index`, `create`). These methods are
* called with DocSource as the context. When called, they depend on private
* DocSource methods within their execution.
*/
import _ from 'lodash';
import 'ui/es';
import 'ui/storage';
import AbstractDocSourceProvider from './_abstract_doc_source';
import DocStrategyProvider from '../fetch/strategy/doc_data';
import DocRequestProvider from '../fetch/request/doc_data';
import DocSendToEsProvider from './_doc_send_to_es';
import AbstractDataSourceProvider from './_abstract';
import DocRequestProvider from '../fetch/request/doc';
import DocStrategyProvider from '../fetch/strategy/doc';
export default function DocSourceFactory(Private, Promise, es, sessionStorage) {
const sendToEs = Private(DocSendToEsProvider);
const SourceAbstract = Private(AbstractDataSourceProvider);
const DocRequest = Private(DocRequestProvider);
export default function DocSourceFactory(Private) {
const AbstractDocSource = Private(AbstractDocSourceProvider);
const docStrategy = Private(DocStrategyProvider);
const DocRequest = Private(DocRequestProvider);
_.class(DocSource).inherits(SourceAbstract);
function DocSource(initialState) {
DocSource.Super.call(this, initialState, docStrategy);
}
DocSource.prototype.onUpdate = SourceAbstract.prototype.onResults;
DocSource.prototype.onResults = void 0;
/*****
* PUBLIC API
*****/
DocSource.prototype._createRequest = function (defer) {
return new DocRequest(this, defer);
};
/**
* List of methods that is turned into a chainable API in the constructor
* @type {Array}
*/
DocSource.prototype._methods = [
'index',
'type',
'id',
'sourceInclude',
'sourceExclude'
];
/**
* Applies a partial update to the document
* @param {object} fields - The fields to change and their new values (es doc field)
* @return {undefined}
*/
DocSource.prototype.doUpdate = function (fields) {
if (!this._state.id) return this.doIndex(fields);
return sendToEs.call(this, 'update', false, { doc: fields });
};
/**
* Update the document stored
* @param {[type]} body [description]
* @return {[type]} [description]
*/
DocSource.prototype.doIndex = function (body) {
return sendToEs.call(this, 'index', false, body);
};
DocSource.prototype.doCreate = function (body) {
return sendToEs.call(this, 'create', false, body, []);
};
/*****
* PRIVATE API
*****/
/**
* Get the type of this SourceAbstract
* @return {string} - 'doc'
*/
DocSource.prototype._getType = function () {
return 'doc';
};
/**
* Used to merge properties into the state within ._flatten().
* The state is passed in and modified by the function
*
* @param {object} state - the current merged state
* @param {*} val - the value at `key`
* @param {*} key - The key of `val`
* @return {undefined}
*/
DocSource.prototype._mergeProp = function (state, val, key) {
key = '_' + key;
if (val != null && state[key] == null) {
state[key] = val;
class DocSource extends AbstractDocSource {
constructor(initialState) {
super(initialState, docStrategy);
}
};
/**
* Creates a key based on the doc's index/type/id
* @return {string}
*/
DocSource.prototype._versionKey = function () {
const state = this._state;
if (!state.index || !state.type || !state.id) return;
return 'DocVersion:' + (
[ state.index, state.type, state.id ]
.map(encodeURIComponent)
.join('/')
);
};
/**
* Get the cached version number, not the version that is
* stored/shared with other tabs
*
* @return {number} - the version number, or undefined
*/
DocSource.prototype._getVersion = function () {
if (this._version) return this._version;
else return this._getStoredVersion();
};
/**
* Fetches the stored version from storage
* @return {[type]} [description]
*/
DocSource.prototype._getStoredVersion = function () {
const key = this._versionKey();
if (!key) return;
const v = sessionStorage.get(key);
this._version = v ? _.parseInt(v) : void 0;
return this._version;
};
/**
* Stores the version into storage
* @param {number, NaN} version - the current version number, NaN works well forcing a refresh
* @return {undefined}
*/
DocSource.prototype._storeVersion = function (version) {
if (!version) return this._clearVersion();
const key = this._versionKey();
if (!key) return;
this._version = version;
sessionStorage.set(key, version);
};
/**
* Clears the stored version for a DocSource
*/
DocSource.prototype._clearVersion = function () {
const key = this._versionKey();
if (!key) return;
sessionStorage.remove(key);
};
_createRequest(defer) {
return new DocRequest(this, defer);
}
}
return DocSource;
}

View file

@ -3,7 +3,7 @@ import expect from 'expect.js';
import ngMock from 'ng_mock';
import DocSourceProvider from '../../data_source/doc_source';
import DocRequestProvider from '../request/doc';
import DocRequestProvider from '../request/doc_data';
describe('Courier DocFetchRequest class', function () {
let storage;

View file

@ -4,7 +4,7 @@ import IsRequestProvider from './is_request';
import MergeDuplicatesRequestProvider from './merge_duplicate_requests';
import ReqStatusProvider from './req_status';
export default function CourierFetchCallClient(Private, Promise, es) {
export default function CourierFetchCallClient(Private, Promise, esAdmin, es) {
const isRequest = Private(IsRequestProvider);
const mergeDuplicateRequests = Private(MergeDuplicatesRequestProvider);
@ -94,7 +94,9 @@ export default function CourierFetchCallClient(Private, Promise, es) {
throw ABORTED;
}
return (esPromise = es[strategy.clientMethod]({ body }));
const id = strategy.id;
const client = (id && id.includes('admin')) ? esAdmin : es;
return (esPromise = client[strategy.clientMethod]({ body }));
})
.then(function (clientResp) {
return strategy.getResponses(clientResp);

View file

@ -1,17 +1,14 @@
import DocStrategyProvider from '../strategy/doc';
import AbstractRequestProvider from './request';
export default function DocRequestProvider(Private) {
const docStrategy = Private(DocStrategyProvider);
const AbstractRequest = Private(AbstractRequestProvider);
class DocRequest extends AbstractRequest {
class AbstractDocRequest extends AbstractRequest {
constructor(...args) {
super(...args);
this.type = 'doc';
this.strategy = docStrategy;
}
canStart() {
@ -39,5 +36,5 @@ export default function DocRequestProvider(Private) {
}
}
return DocRequest;
return AbstractDocRequest;
}

View file

@ -0,0 +1,14 @@
import DocStrategyProvider from '../strategy/doc_admin';
import AbstractDocRequestProvider from './_abstract_doc';
export default function DocRequestProvider(Private) {
const docStrategy = Private(DocStrategyProvider);
const AbstractDocRequest = Private(AbstractDocRequestProvider);
class AdminDocRequest extends AbstractDocRequest {
strategy = docStrategy;
}
return AdminDocRequest;
}

View file

@ -0,0 +1,14 @@
import DocStrategyProvider from '../strategy/doc_data';
import AbstractDocRequestProvider from './_abstract_doc';
export default function DocRequestProvider(Private) {
const docStrategy = Private(DocStrategyProvider);
const AbstractDocRequest = Private(AbstractDocRequestProvider);
class DataDocRequest extends AbstractDocRequest {
strategy = docStrategy;
}
return DataDocRequest;
}

View file

@ -0,0 +1,26 @@
export default function FetchStrategyForDoc(Promise) {
return {
id: 'doc_admin',
clientMethod: 'mget',
/**
* Flatten a series of requests into as ES request body
* @param {array} requests - an array of flattened requests
* @return {Promise} - a promise that is fulfilled by the request body
*/
reqsFetchParamsToBody: function (reqsFetchParams) {
return Promise.resolve({
docs: reqsFetchParams
});
},
/**
* Fetch the multiple responses from the ES Response
* @param {object} resp - The response sent from Elasticsearch
* @return {array} - the list of responses
*/
getResponses: function (resp) {
return resp.docs;
}
};
}

View file

@ -0,0 +1,19 @@
import FetchProvider from '../fetch';
import LooperProvider from './_looper';
import DocStrategyProvider from '../fetch/strategy/doc_admin';
export default function DocLooperService(Private) {
const fetch = Private(FetchProvider);
const Looper = Private(LooperProvider);
const DocStrategy = Private(DocStrategyProvider);
/**
* The Looper which will manage the doc fetch interval
* @type {Looper}
*/
const docLooper = new Looper(1500, function () {
fetch.fetchQueued(DocStrategy);
});
return docLooper;
}

View file

@ -1,6 +1,6 @@
import FetchProvider from '../fetch';
import LooperProvider from './_looper';
import DocStrategyProvider from '../fetch/strategy/doc';
import DocStrategyProvider from '../fetch/strategy/doc_data';
export default function DocLooperService(Private) {
const fetch = Private(FetchProvider);

View file

@ -16,10 +16,10 @@ import errors from 'ui/errors';
import uuid from 'node-uuid';
import MappingSetupProvider from 'ui/utils/mapping_setup';
import DocSourceProvider from '../data_source/doc_source';
import DocSourceProvider from '../data_source/admin_doc_source';
import SearchSourceProvider from '../data_source/search_source';
export default function SavedObjectFactory(es, kbnIndex, Promise, Private, Notifier, safeConfirm, indexPatterns) {
export default function SavedObjectFactory(esAdmin, kbnIndex, Promise, Private, Notifier, safeConfirm, indexPatterns) {
const DocSource = Private(DocSourceProvider);
const SearchSource = Private(SearchSourceProvider);
@ -255,7 +255,7 @@ export default function SavedObjectFactory(es, kbnIndex, Promise, Private, Notif
* @returns {Promise}
*/
function refreshIndex() {
return es.indices.refresh({ index: kbnIndex });
return esAdmin.indices.refresh({ index: kbnIndex });
}
/**
@ -312,7 +312,7 @@ export default function SavedObjectFactory(es, kbnIndex, Promise, Private, Notif
* @return {promise}
*/
this.delete = () => {
return es.delete(
return esAdmin.delete(
{
index: kbnIndex,
type: type,

View file

@ -3,15 +3,15 @@ import Scanner from 'ui/utils/scanner';
import { StringUtils } from 'ui/utils/string_utils';
export class SavedObjectLoader {
constructor(SavedObjectClass, kbnIndex, es, kbnUrl) {
constructor(SavedObjectClass, kbnIndex, esAdmin, kbnUrl) {
this.type = SavedObjectClass.type;
this.Class = SavedObjectClass;
this.lowercaseType = this.type.toLowerCase();
this.kbnIndex = kbnIndex;
this.kbnUrl = kbnUrl;
this.es = es;
this.esAdmin = esAdmin;
this.scanner = new Scanner(es, {
this.scanner = new Scanner(esAdmin, {
index: kbnIndex,
type: this.lowercaseType
});
@ -92,7 +92,7 @@ export class SavedObjectLoader {
body = { query: { match_all: {} } };
}
return this.es.search({
return this.esAdmin.search({
index: this.kbnIndex,
type: this.type.toLowerCase(),
body,

View file

@ -9,38 +9,45 @@ import 'elasticsearch-browser';
import _ from 'lodash';
import uiModules from 'ui/modules';
let es; // share the client amongst all apps
const plugins = [function (Client, config) {
// esFactory automatically injects the AngularConnector to the config
// https://github.com/elastic/elasticsearch-js/blob/master/src/lib/connectors/angular.js
class CustomAngularConnector extends config.connectionClass {
request = _.wrap(this.request, function (request, params, cb) {
if (String(params.method).toUpperCase() === 'GET') {
params.query = _.defaults({ _: Date.now() }, params.query);
}
return request.call(this, params, cb);
});
}
config.connectionClass = CustomAngularConnector;
}];
uiModules
.get('kibana', ['elasticsearch', 'kibana/config'])
.service('es', function (esFactory, esUrl, $q, esApiVersion, esRequestTimeout) {
if (es) return es;
es = esFactory({
//Elasticsearch client used for requesting data. Connects to the /elasticsearch proxy,
//Uses a tribe node if configured, otherwise uses the base elasticsearch configuration
.service('es', function (esFactory, esUrl, esApiVersion, esRequestTimeout) {
return esFactory({
host: esUrl,
log: 'info',
requestTimeout: esRequestTimeout,
apiVersion: esApiVersion,
plugins: [function (Client, config) {
// esFactory automatically injects the AngularConnector to the config
// https://github.com/elastic/elasticsearch-js/blob/master/src/lib/connectors/angular.js
_.class(CustomAngularConnector).inherits(config.connectionClass);
function CustomAngularConnector(host, config) {
CustomAngularConnector.Super.call(this, host, config);
this.request = _.wrap(this.request, function (request, params, cb) {
if (String(params.method).toUpperCase() === 'GET') {
params.query = _.defaults({ _: Date.now() }, params.query);
}
return request.call(this, params, cb);
});
}
config.connectionClass = CustomAngularConnector;
}]
plugins
});
})
return es;
//Elasticsearch client used for managing Kibana's state. Connects to the /es-admin proxy,
//Always uses the base elasticsearch configuartion
.service('esAdmin', function (esFactory, esAdminUrl, esApiVersion, esRequestTimeout) {
return esFactory({
host: esAdminUrl,
log: 'info',
requestTimeout: esRequestTimeout,
apiVersion: esApiVersion,
plugins
});
});

View file

@ -7,7 +7,7 @@ import errors from 'ui/errors';
import IndexedArray from 'ui/indexed_array';
import FixturesLogstashFieldsProvider from 'fixtures/logstash_fields';
import FixturesStubbedDocSourceResponseProvider from 'fixtures/stubbed_doc_source_response';
import DocSourceProvider from 'ui/courier/data_source/doc_source';
import DocSourceProvider from 'ui/courier/data_source/admin_doc_source';
import UtilsMappingSetupProvider from 'ui/utils/mapping_setup';
import IndexPatternsIntervalsProvider from 'ui/index_patterns/_intervals';
import IndexPatternsIndexPatternProvider from 'ui/index_patterns/_index_pattern';

View file

@ -1,5 +1,5 @@
import _ from 'lodash';
export default function GetIndexPatternIdsFn(es, kbnIndex) {
export default function GetIndexPatternIdsFn(esAdmin, kbnIndex) {
// many places may require the id list, so we will cache it seperately
// didn't incorportate with the indexPattern cache to prevent id collisions.
@ -13,7 +13,7 @@ export default function GetIndexPatternIdsFn(es, kbnIndex) {
});
}
cachedPromise = es.search({
cachedPromise = esAdmin.search({
index: kbnIndex,
type: 'index-pattern',
storedFields: [],

View file

@ -7,7 +7,7 @@ import RegistryFieldFormatsProvider from 'ui/registry/field_formats';
import IndexPatternsGetIdsProvider from 'ui/index_patterns/_get_ids';
import IndexPatternsMapperProvider from 'ui/index_patterns/_mapper';
import IndexPatternsIntervalsProvider from 'ui/index_patterns/_intervals';
import DocSourceProvider from 'ui/courier/data_source/doc_source';
import DocSourceProvider from 'ui/courier/data_source/admin_doc_source';
import UtilsMappingSetupProvider from 'ui/utils/mapping_setup';
import IndexPatternsFieldListProvider from 'ui/index_patterns/_field_list';
import IndexPatternsFlattenHitProvider from 'ui/index_patterns/_flatten_hit';

View file

@ -6,7 +6,7 @@ import IndexPatternsTransformMappingIntoFieldsProvider from 'ui/index_patterns/_
import IndexPatternsIntervalsProvider from 'ui/index_patterns/_intervals';
import IndexPatternsPatternToWildcardProvider from 'ui/index_patterns/_pattern_to_wildcard';
import IndexPatternsLocalCacheProvider from 'ui/index_patterns/_local_cache';
export default function MapperService(Private, Promise, es, config, kbnIndex) {
export default function MapperService(Private, Promise, es, esAdmin, config, kbnIndex) {
const enhanceFieldsWithCapabilities = Private(EnhanceFieldsWithCapabilitiesProvider);
const transformMappingIntoFields = Private(IndexPatternsTransformMappingIntoFieldsProvider);
@ -37,7 +37,7 @@ export default function MapperService(Private, Promise, es, config, kbnIndex) {
if (cache) return Promise.resolve(cache);
if (!skipIndexPatternCache) {
return es.get({
return esAdmin.get({
index: kbnIndex,
type: 'index-pattern',
id: id,

View file

@ -11,7 +11,7 @@ import RegistryFieldFormatsProvider from 'ui/registry/field_formats';
import uiModules from 'ui/modules';
const module = uiModules.get('kibana/index_patterns');
function IndexPatternsProvider(es, Notifier, Private, Promise, kbnIndex) {
function IndexPatternsProvider(esAdmin, Notifier, Private, Promise, kbnIndex) {
const self = this;
const IndexPattern = Private(IndexPatternsIndexPatternProvider);
@ -34,7 +34,7 @@ function IndexPatternsProvider(es, Notifier, Private, Promise, kbnIndex) {
self.getIds.clearCache();
pattern.destroy();
return es.delete({
return esAdmin.delete({
index: kbnIndex,
type: 'index-pattern',
id: pattern.id

View file

@ -1,7 +1,7 @@
import angular from 'angular';
import _ from 'lodash';
define(function () {
return function MappingSetupService(kbnIndex, es) {
return function MappingSetupService(kbnIndex, esAdmin) {
const mappingSetup = this;
const json = {
@ -23,7 +23,7 @@ define(function () {
* @return {[type]} [description]
*/
const getKnownKibanaTypes = _.once(function () {
return es.indices.getFieldMapping({
return esAdmin.indices.getFieldMapping({
// only concerned with types in this kibana index
index: kbnIndex,
// check all types
@ -83,7 +83,7 @@ define(function () {
properties: mapping
};
return es.indices.putMapping({
return esAdmin.indices.putMapping({
index: kbnIndex,
type: type,
body: body

View file

@ -343,7 +343,7 @@ describe('ui settings', function () {
});
function expectElasticsearchGetQuery(server, req, configGet) {
const { callWithRequest } = server.plugins.elasticsearch;
const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin');
sinon.assert.calledOnce(callWithRequest);
const [reqPassed, method, params] = callWithRequest.args[0];
expect(reqPassed).to.be(req);
@ -356,7 +356,7 @@ function expectElasticsearchGetQuery(server, req, configGet) {
}
function expectElasticsearchUpdateQuery(server, req, configGet, doc) {
const { callWithRequest } = server.plugins.elasticsearch;
const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin');
sinon.assert.calledOnce(callWithRequest);
const [reqPassed, method, params] = callWithRequest.args[0];
expect(reqPassed).to.be(req);
@ -388,27 +388,37 @@ function instantiate({ getResult, callWithRequest, settingsStatusOverrides } = {
},
ready: sinon.stub().returns(Promise.resolve())
};
const req = { __stubHapiRequest: true, path: '', headers: {} };
const adminCluster = {
errors: esErrors,
callWithInternalUser: sinon.stub(),
callWithRequest: sinon.spy((withReq, method, params) => {
if (callWithRequest) {
return callWithRequest(withReq, method, params);
}
expect(withReq).to.be(req);
switch (method) {
case 'get':
return Promise.resolve({ _source: getResult });
case 'update':
return Promise.resolve();
default:
throw new Error(`callWithRequest() is using unexpected method "${method}"`);
}
})
};
adminCluster.callWithInternalUser.withArgs('get', sinon.match.any).returns(Promise.resolve({ _source: getResult }));
adminCluster.callWithInternalUser.withArgs('update', sinon.match.any).returns(Promise.resolve());
const server = {
decorate: (_, key, value) => server[key] = value,
plugins: {
elasticsearch: {
errors: esErrors,
callWithRequest: sinon.spy((withReq, method, params) => {
if (callWithRequest) {
return callWithRequest(withReq, method, params);
}
expect(withReq).to.be(req);
switch (method) {
case 'get':
return Promise.resolve({ _source: getResult });
case 'update':
return Promise.resolve();
default:
throw new Error(`callWithRequest() is using unexpected method "${method}"`);
}
})
getCluster: sinon.stub().withArgs('admin').returns(adminCluster)
}
}
};

View file

@ -66,7 +66,7 @@ export default function setupSettings(kbnServer, server, config) {
async function getUserProvided(req, { ignore401Errors = false } = {}) {
assertRequest(req);
const { callWithRequest, errors } = server.plugins.elasticsearch;
const { callWithRequest, errors } = server.plugins.elasticsearch.getCluster('admin');
// If the ui settings status isn't green, we shouldn't be attempting to get
// user settings, since we can't be sure that all the necessary conditions
@ -87,7 +87,7 @@ export default function setupSettings(kbnServer, server, config) {
async function setMany(req, changes) {
assertRequest(req);
const { callWithRequest } = server.plugins.elasticsearch;
const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin');
const clientParams = {
...getClientSettings(config),
body: { doc: changes }

View file

@ -14,6 +14,7 @@ module.exports = function (grunt) {
}
}
},
dev: {
options: {
directory: resolve(directory, 'dev'),
@ -27,6 +28,60 @@ module.exports = function (grunt) {
}
}
},
tribe: {
options: {
directory: resolve(directory, 'tribe'),
config: {
path: {
data: dataDir
}
},
nodes: [{
cluster: { name: 'data-01' },
http: { port: 9201 },
node: { name: 'node-01', data: true, master: true, max_local_storage_nodes: 5 }
}, {
cluster: { name: 'data-02' },
http: { port: 9202 },
node: { name: 'node-02', data: true, master: true, max_local_storage_nodes: 5 }
}, {
cluster: { name: 'admin' },
http: { port: 9200 },
node: { name: 'node-03', data: true, master: true, max_local_storage_nodes: 5 }
}, {
cluster: { name: 'tribe' },
http: { port: 9203 },
node: { name: 'node-04', max_local_storage_nodes: 5 },
tribe: {
c1: {
cluster: {
name: 'data-01'
}
},
c2: {
cluster: {
name: 'data-02'
}
},
on_conflict: 'prefer_c1',
blocks: {
write: true
}
},
discovery: {
zen: {
ping: {
unicast: {
hosts: [ 'localhost:9201', 'localhost:9202' ]
}
}
}
}
}]
},
},
test: {
options: {
directory: resolve(directory, 'test'),
@ -37,10 +92,20 @@ module.exports = function (grunt) {
},
cluster: {
name: 'esvm-test'
},
discovery: {
zen: {
ping: {
unicast: {
hosts: [ `localhost:${serverConfig.servers.elasticsearch.port}` ]
}
}
}
}
}
}
},
ui: {
options: {
directory: resolve(directory, 'test'),
@ -51,10 +116,20 @@ module.exports = function (grunt) {
},
cluster: {
name: 'esvm-ui'
},
discovery: {
zen: {
ping: {
unicast: {
hosts: [ `localhost:${serverConfig.servers.elasticsearch.port}` ]
}
}
}
}
}
}
},
withPlugins: {
options: {
version: '2.1.0',