Revert "[kbn-es] await native realm setup, error if there are failures (#36290)"

This reverts commit 6ef9fbe423.
This commit is contained in:
spalger 2019-05-10 12:52:37 -07:00
parent 9f54841095
commit a102ca1f97
4 changed files with 129 additions and 216 deletions

View file

@ -33,19 +33,6 @@ const { createCliError } = require('./errors');
const { promisify } = require('util');
const treeKillAsync = promisify(require('tree-kill'));
// listen to data on stream until map returns anything but undefined
const first = (stream, map) =>
new Promise(resolve => {
const onData = data => {
const result = map(data);
if (result !== undefined) {
resolve(result);
stream.removeListener('data', onData);
}
};
stream.on('data', onData);
});
exports.Cluster = class Cluster {
constructor(log = defaultLog) {
this._log = log;
@ -171,15 +158,14 @@ exports.Cluster = class Cluster {
this._exec(installPath, options);
await Promise.race([
// wait for native realm to be setup and es to be started
Promise.all([
first(this._process.stdout, data => {
// await the "started" log message
new Promise(resolve => {
this._process.stdout.on('data', data => {
if (/started/.test(data)) {
return true;
resolve();
}
}),
this._nativeRealmSetup,
]),
});
}),
// await the outcome of the process in case it exits before starting
this._outcome.then(() => {
@ -199,12 +185,6 @@ exports.Cluster = class Cluster {
async run(installPath, options = {}) {
this._exec(installPath, options);
// log native realm setup errors so they aren't uncaught
this._nativeRealmSetup.catch(error => {
this._log.error(error);
this.stop();
});
// await the final outcome of the process
await this._outcome;
}
@ -261,43 +241,42 @@ exports.Cluster = class Cluster {
stdio: ['ignore', 'pipe', 'pipe'],
});
// parse log output to find http port
const httpPort = first(this._process.stdout, data => {
const match = data.toString('utf8').match(/HttpServer.+publish_address {[0-9.]+:([0-9]+)/);
if (match) {
return match[1];
}
});
// once the http port is available setup the native realm
this._nativeRealmSetup = httpPort.then(async port => {
const nativeRealm = new NativeRealm(options.password, port, this._log);
await nativeRealm.setPasswords(options);
});
// parse and forward es stdout to the log
this._process.stdout.on('data', data => {
const lines = parseEsLog(data.toString());
lines.forEach(line => {
this._log.info(line.formattedMessage);
// once we have the port we can stop checking for it
if (this.httpPort) {
return;
}
const httpAddressMatch = line.message.match(
/HttpServer.+publish_address {[0-9.]+:([0-9]+)/
);
if (httpAddressMatch) {
this.httpPort = httpAddressMatch[1];
new NativeRealm(options.password, this.httpPort, this._log).setPasswords(options);
}
});
});
// forward es stderr to the log
this._process.stderr.on('data', data => this._log.error(chalk.red(data.toString())));
// observe the exit code of the process and reflect in _outcome promies
const exitCode = new Promise(resolve => this._process.once('exit', resolve));
this._outcome = exitCode.then(code => {
if (this._stopCalled) {
return;
}
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat them as errors
if (code > 0 && !(code === 143 || code === 130)) {
throw createCliError(`ES exited with code ${code}`);
}
this._outcome = new Promise((resolve, reject) => {
this._process.once('exit', code => {
if (this._stopCalled) {
resolve();
return;
}
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat them as errors
if (code > 0 && !(code === 143 || code === 130)) {
reject(createCliError(`ES exited with code ${code}`));
} else {
resolve();
}
});
});
}
};

View file

@ -19,61 +19,10 @@
* under the License.
*/
const { createServer } = require('http');
const { format: formatUrl } = require('url');
const { exitCode, start } = JSON.parse(process.argv[2]);
process.exitCode = exitCode;
if (!start) {
return;
if (start) {
console.log('started');
}
let serverUrl;
const server = createServer((req, res) => {
const url = new URL(req.url, serverUrl);
const send = (code, body) => {
res.writeHead(code, { 'content-type': 'application/json' });
res.end(JSON.stringify(body));
};
if (url.pathname === '/_xpack') {
return send(400, {
error: {
reason: 'foo bar',
},
});
}
return send(404, {
error: {
reason: 'not found',
},
});
});
// setup server auto close after 1 second of silence
let serverCloseTimer;
const delayServerClose = () => {
clearTimeout(serverCloseTimer);
serverCloseTimer = setTimeout(() => server.close(), 1000);
};
server.on('request', delayServerClose);
server.on('listening', delayServerClose);
server.listen(0, '127.0.0.1', function() {
const { port, address: hostname } = server.address();
serverUrl = new URL(
formatUrl({
protocol: 'http:',
port,
hostname,
})
);
console.log(
`[o.e.h.AbstractHttpServerTransport] [computer] publish_address {127.0.0.1:${port}}, bound_addresses {[::1]:${port}}, {127.0.0.1:${port}}`
);
console.log('started');
});
process.exitCode = exitCode;

View file

@ -1,83 +1,82 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
const { Client } = require('@elastic/elasticsearch');
const chalk = require('chalk');
const { log: defaultLog } = require('./log');
exports.NativeRealm = class NativeRealm {
constructor(elasticPassword, port, log = defaultLog) {
this._client = new Client({ node: `http://elastic:${elasticPassword}@localhost:${port}` });
this._elasticPassword = elasticPassword;
this._log = log;
}
async setPassword(username, password = this._elasticPassword) {
this._log.info(`setting ${chalk.bold(username)} password to ${chalk.bold(password)}`);
await this._client.security.changePassword({
username,
refresh: 'wait_for',
body: {
password,
},
});
}
async setPasswords(options) {
if (!(await this.isSecurityEnabled())) {
this._log.info('security is not enabled, unable to set native realm passwords');
return;
}
const reservedUsers = await this.getReservedUsers();
await Promise.all(
reservedUsers.map(async user => {
await this.setPassword(user, options[`password.${user}`]);
})
);
}
async getReservedUsers() {
const users = await this._client.security.getUser();
return Object.keys(users.body).reduce((acc, user) => {
if (users.body[user].metadata._reserved === true) {
acc.push(user);
}
return acc;
}, []);
}
async isSecurityEnabled() {
try {
const {
body: { features },
} = await this._client.xpack.info({ categories: 'features' });
return features.security && features.security.enabled && features.security.available;
} catch (error) {
if (error.meta && error.meta.statusCode === 400) {
return false;
}
throw error;
}
}
};
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
const { Client } = require('@elastic/elasticsearch');
const chalk = require('chalk');
const { log: defaultLog } = require('./log');
exports.NativeRealm = class NativeRealm {
constructor(elasticPassword, port, log = defaultLog) {
this._client = new Client({ node: `http://elastic:${elasticPassword}@localhost:${port}` });
this._elasticPassword = elasticPassword;
this._log = log;
}
async setPassword(username, password = this._elasticPassword) {
this._log.info(`setting ${chalk.bold(username)} password to ${chalk.bold(password)}`);
try {
await this._client.security.changePassword({
username,
refresh: 'wait_for',
body: {
password,
},
});
} catch (e) {
this._log.error(
chalk.red(`unable to set password for ${chalk.bold(username)}: ${e.message}`)
);
}
}
async setPasswords(options) {
if (!(await this.isSecurityEnabled())) {
this._log.info('security is not enabled, unable to set native realm passwords');
return;
}
(await this.getReservedUsers()).forEach(user => {
this.setPassword(user, options[`password.${user}`]);
});
}
async getReservedUsers() {
const users = await this._client.security.getUser();
return Object.keys(users.body).reduce((acc, user) => {
if (users.body[user].metadata._reserved === true) {
acc.push(user);
}
return acc;
}, []);
}
async isSecurityEnabled() {
try {
const {
body: { features },
} = await this._client.xpack.info({ categories: 'features' });
return features.security && features.security.enabled && features.security.available;
} catch (e) {
return false;
}
}
};

View file

@ -79,31 +79,12 @@ describe('isSecurityEnabled', () => {
expect(await nativeRealm.isSecurityEnabled()).toBe(false);
});
test('returns false if 400 error returned', async () => {
test('logs exception and returns false', async () => {
mockClient.xpack.info.mockImplementation(() => {
const error = new Error('ResponseError');
error.meta = {
statusCode: 400,
};
throw error;
throw new Error('ResponseError');
});
expect(await nativeRealm.isSecurityEnabled()).toBe(false);
});
test('rejects if unexpected error is thrown', async () => {
mockClient.xpack.info.mockImplementation(() => {
const error = new Error('ResponseError');
error.meta = {
statusCode: 500,
};
throw error;
});
await expect(nativeRealm.isSecurityEnabled()).rejects.toThrowErrorMatchingInlineSnapshot(
`"ResponseError"`
);
});
});
describe('setPasswords', () => {
@ -223,13 +204,18 @@ describe('setPassword', () => {
});
});
it('rejects with errors', async () => {
it('logs error', async () => {
mockClient.security.changePassword.mockImplementation(() => {
throw new Error('SomeError');
});
await expect(
nativeRealm.setPassword('kibana', 'foo')
).rejects.toThrowErrorMatchingInlineSnapshot(`"SomeError"`);
await nativeRealm.setPassword('kibana', 'foo');
expect(log.error.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
"unable to set password for kibana: SomeError",
],
]
`);
});
});