mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[@kbn/es] fix promise and CLI error handling (#17596)
* [kbn-es/cluster] fix promise handling to properly route success/failure * [kbn-es/cli] catch errors that bubble to the CLI and log with a bit of style * [kbn-es] fix promise handling when building from source * [kbn-es] check for inverse of .stop() condition * [kbn-es/cluster] resolve promise is cluster stops cleanly * [kbn-es/cluster/start] reject if ES exits before starting
This commit is contained in:
parent
bb250b8dd5
commit
f0d01928db
5 changed files with 170 additions and 102 deletions
|
@ -2,6 +2,8 @@ const chalk = require('chalk');
|
|||
const getopts = require('getopts');
|
||||
const dedent = require('dedent');
|
||||
const commands = require('./cli_commands');
|
||||
const { isCliError } = require('./errors');
|
||||
const { log } = require('./utils');
|
||||
|
||||
function help() {
|
||||
const availableCommands = Object.keys(commands).map(
|
||||
|
@ -24,41 +26,55 @@ function help() {
|
|||
}
|
||||
|
||||
exports.run = async (defaults = {}) => {
|
||||
const argv = process.argv.slice(2);
|
||||
const options = getopts(argv, {
|
||||
alias: {
|
||||
h: 'help',
|
||||
},
|
||||
try {
|
||||
const argv = process.argv.slice(2);
|
||||
const options = getopts(argv, {
|
||||
alias: {
|
||||
h: 'help',
|
||||
},
|
||||
|
||||
default: defaults,
|
||||
});
|
||||
const args = options._;
|
||||
const commandName = args[0];
|
||||
default: defaults,
|
||||
});
|
||||
const args = options._;
|
||||
const commandName = args[0];
|
||||
|
||||
if (args.length === 0 || (!commandName && options.help)) {
|
||||
help();
|
||||
return;
|
||||
if (args.length === 0 || (!commandName && options.help)) {
|
||||
help();
|
||||
return;
|
||||
}
|
||||
|
||||
const command = commands[commandName];
|
||||
|
||||
if (command === undefined) {
|
||||
log.error(
|
||||
chalk.red(`[${commandName}] is not a valid command, see 'es --help'`)
|
||||
);
|
||||
process.exitCode = 1;
|
||||
return;
|
||||
}
|
||||
|
||||
if (commandName && options.help) {
|
||||
log.write(dedent`
|
||||
usage: ${command.usage || `es ${commandName} [<args>]`}
|
||||
|
||||
${command.description}
|
||||
|
||||
${command.help(defaults).replace(/\n/g, '\n ')}
|
||||
`);
|
||||
return;
|
||||
}
|
||||
|
||||
await command.run(defaults);
|
||||
} catch (error) {
|
||||
if (isCliError(error)) {
|
||||
// only log the message, the CLI explicitly threw this message
|
||||
// and it doesn't need a stack trace
|
||||
log.error(error.message);
|
||||
} else {
|
||||
log.error('Unhandled error');
|
||||
log.error(error);
|
||||
}
|
||||
|
||||
process.exitCode = 1;
|
||||
}
|
||||
|
||||
const command = commands[commandName];
|
||||
|
||||
if (command === undefined) {
|
||||
console.log(
|
||||
chalk.red(`[${commandName}] is not a valid command, see 'es --help'`)
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (commandName && options.help) {
|
||||
console.log(dedent`
|
||||
usage: ${command.usage || `es ${commandName} [<args>]`}
|
||||
|
||||
${command.description}
|
||||
|
||||
${command.help(defaults).replace(/\n/g, '\n ')}
|
||||
`);
|
||||
return;
|
||||
}
|
||||
|
||||
await command.run(defaults);
|
||||
};
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
const dedent = require('dedent');
|
||||
const getopts = require('getopts');
|
||||
const { Cluster } = require('../cluster');
|
||||
const { createCliError } = require('../errors');
|
||||
|
||||
exports.description = 'Install and run from an Elasticsearch tar';
|
||||
|
||||
|
@ -38,8 +39,7 @@ exports.run = async (defaults = {}) => {
|
|||
const [, path] = options._;
|
||||
|
||||
if (!path || !path.endsWith('tar.gz')) {
|
||||
console.warn('you must provide a path to an ES tar file');
|
||||
return;
|
||||
throw createCliError('you must provide a path to an ES tar file');
|
||||
}
|
||||
|
||||
const { installPath } = await cluster.installArchive(path, options);
|
||||
|
|
|
@ -3,6 +3,7 @@ const chalk = require('chalk');
|
|||
const { installSnapshot, installSource, installArchive } = require('./install');
|
||||
const { ES_BIN } = require('./paths');
|
||||
const { log: defaultLog, parseEsLog, extractConfigFiles } = require('./utils');
|
||||
const { createCliError } = require('./errors');
|
||||
|
||||
exports.Cluster = class Cluster {
|
||||
constructor(log = defaultLog) {
|
||||
|
@ -15,17 +16,17 @@ exports.Cluster = class Cluster {
|
|||
* @param {Object} options
|
||||
* @property {Array} options.installPath
|
||||
* @property {Array} options.sourcePath
|
||||
* @returns {Promise}
|
||||
* @returns {Promise<{installPath}>}
|
||||
*/
|
||||
async installSource(options = {}) {
|
||||
this._log.info(chalk.bold('Installing from source'));
|
||||
this._log.indent(4);
|
||||
|
||||
const install = await installSource({ log: this._log, ...options });
|
||||
const { installPath } = await installSource({ log: this._log, ...options });
|
||||
|
||||
this._log.indent(-4);
|
||||
|
||||
return install;
|
||||
return { installPath };
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -34,17 +35,20 @@ exports.Cluster = class Cluster {
|
|||
* @param {Object} options
|
||||
* @property {Array} options.installPath
|
||||
* @property {Array} options.sourcePath
|
||||
* @returns {Promise}
|
||||
* @returns {Promise<{installPath}>}
|
||||
*/
|
||||
async installSnapshot(options = {}) {
|
||||
this._log.info(chalk.bold('Installing from snapshot'));
|
||||
this._log.indent(4);
|
||||
|
||||
const install = await installSnapshot({ log: this._log, ...options });
|
||||
const { installPath } = await installSnapshot({
|
||||
log: this._log,
|
||||
...options,
|
||||
});
|
||||
|
||||
this._log.indent(-4);
|
||||
|
||||
return install;
|
||||
return { installPath };
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -53,17 +57,20 @@ exports.Cluster = class Cluster {
|
|||
* @param {String} path
|
||||
* @param {Object} options
|
||||
* @property {Array} options.installPath
|
||||
* @returns {Promise}
|
||||
* @returns {Promise<{installPath}>}
|
||||
*/
|
||||
async installArchive(path, options = {}) {
|
||||
this._log.info(chalk.bold('Installing from an archive'));
|
||||
this._log.indent(4);
|
||||
|
||||
const install = await installArchive(path, { log: this._log, ...options });
|
||||
const { installPath } = await installArchive(path, {
|
||||
log: this._log,
|
||||
...options,
|
||||
});
|
||||
|
||||
this._log.indent(-4);
|
||||
|
||||
return install;
|
||||
return { installPath };
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -75,26 +82,72 @@ exports.Cluster = class Cluster {
|
|||
* @returns {Promise}
|
||||
*/
|
||||
async start(installPath, options = {}) {
|
||||
await this.run(installPath, options);
|
||||
this._exec(installPath, options);
|
||||
|
||||
return new Promise(resolve => {
|
||||
this._process.stdout.on('data', data => {
|
||||
if (/started/.test(data)) {
|
||||
return resolve(process);
|
||||
}
|
||||
});
|
||||
});
|
||||
await Promise.race([
|
||||
// await the "started" log message
|
||||
new Promise(resolve => {
|
||||
this._process.stdout.on('data', data => {
|
||||
if (/started/.test(data)) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
}),
|
||||
|
||||
// await the outcome of the process in case it exits before starting
|
||||
this._outcome.then(() => {
|
||||
throw createCliError('ES exited without starting');
|
||||
}),
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts Elasticsearch and immediately returns with process
|
||||
* Starts Elasticsearch and waits for Elasticsearch to exit
|
||||
*
|
||||
* @param {String} installPath
|
||||
* @param {Object} options
|
||||
* @property {Array} options.esArgs
|
||||
* @returns {Process}
|
||||
* @returns {Promise<undefined>}
|
||||
*/
|
||||
run(installPath, { esArgs = [] }) {
|
||||
async run(installPath, options = {}) {
|
||||
this._exec(installPath, options);
|
||||
|
||||
// await the final outcome of the process
|
||||
await this._outcome;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops ES process, if it's running
|
||||
*
|
||||
* @returns {Promise}
|
||||
*/
|
||||
async stop() {
|
||||
if (!this._process || !this._outcome) {
|
||||
throw new Error('ES has not been started');
|
||||
}
|
||||
|
||||
this._process.kill();
|
||||
await this._outcome;
|
||||
}
|
||||
|
||||
/**
|
||||
* Common logic from this.start() and this.run()
|
||||
*
|
||||
* Start the elasticsearch process (stored at `this._process`)
|
||||
* and "pipe" its stdio to `this._log`. Also create `this._outcome`
|
||||
* which will be resolved/rejected when the process exits.
|
||||
*
|
||||
* @private
|
||||
* @param {String} installPath
|
||||
* @param {Object} options
|
||||
* @property {Array} options.esArgs
|
||||
* @return {undefined}
|
||||
*/
|
||||
_exec(installPath, { esArgs = [] }) {
|
||||
if (this._process || this._outcome) {
|
||||
throw new Error('ES has already been started');
|
||||
}
|
||||
|
||||
this._log.info(chalk.bold('Starting'));
|
||||
this._log.indent(4);
|
||||
|
||||
|
@ -120,30 +173,14 @@ exports.Cluster = class Cluster {
|
|||
);
|
||||
|
||||
this._outcome = new Promise((resolve, reject) => {
|
||||
this._process.on('exit', code => {
|
||||
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat then as errors
|
||||
this._process.once('exit', code => {
|
||||
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat them as errors
|
||||
if (code > 0 && !(code === 143 || code === 130)) {
|
||||
return reject(`ES exitted with code ${code}`);
|
||||
reject(createCliError(`ES exitted with code ${code}`));
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
|
||||
return process;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops ES process, if it's running
|
||||
*
|
||||
* @returns {Promise}
|
||||
*/
|
||||
stop() {
|
||||
if (!this._process || !this._outcome) {
|
||||
return Promise.reject('ES has not been started');
|
||||
}
|
||||
|
||||
this._process.kill();
|
||||
return this._outcome;
|
||||
}
|
||||
};
|
||||
|
|
9
packages/kbn-es/src/errors.js
Normal file
9
packages/kbn-es/src/errors.js
Normal file
|
@ -0,0 +1,9 @@
|
|||
exports.createCliError = function(message) {
|
||||
const error = new Error(message);
|
||||
error.isCliError = true;
|
||||
return error;
|
||||
};
|
||||
|
||||
exports.isCliError = function(error) {
|
||||
return error && error.isCliError;
|
||||
};
|
|
@ -6,9 +6,13 @@ const chalk = require('chalk');
|
|||
const crypto = require('crypto');
|
||||
const simpleGit = require('simple-git/promise');
|
||||
const { installArchive } = require('./archive');
|
||||
const { createCliError } = require('../errors');
|
||||
const { findMostRecentlyChanged, log: defaultLog, cache } = require('../utils');
|
||||
const { GRADLE_BIN, ES_ARCHIVE_PATTERN, BASE_PATH } = require('../paths');
|
||||
|
||||
const onceEvent = (emitter, event) =>
|
||||
new Promise(resolve => emitter.once(event, resolve));
|
||||
|
||||
/**
|
||||
* Installs ES from source
|
||||
*
|
||||
|
@ -96,33 +100,35 @@ async function sourceInfo(cwd, log = defaultLog) {
|
|||
* @property {ToolingLog} options.log
|
||||
* @returns {Object} containing archive and optional plugins
|
||||
*/
|
||||
function createSnapshot({ sourcePath, log = defaultLog }) {
|
||||
async function createSnapshot({ sourcePath, log = defaultLog }) {
|
||||
const buildArgs = [':distribution:archives:tar:assemble'];
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
log.info('%s %s', GRADLE_BIN, buildArgs.join(' '));
|
||||
log.info('%s %s', GRADLE_BIN, buildArgs.join(' '));
|
||||
|
||||
const build = execa(GRADLE_BIN, buildArgs, {
|
||||
cwd: sourcePath,
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
});
|
||||
|
||||
const stdout = readline.createInterface({ input: build.stdout });
|
||||
const stderr = readline.createInterface({ input: build.stderr });
|
||||
|
||||
stdout.on('line', line => log.debug(line));
|
||||
stderr.on('line', line => log.error(line));
|
||||
|
||||
build.stdout.on('end', () => {
|
||||
if (build.exitCode > 0) {
|
||||
reject(new Error('unable to build ES'));
|
||||
} else {
|
||||
const esTarballPath = findMostRecentlyChanged(
|
||||
path.resolve(sourcePath, ES_ARCHIVE_PATTERN)
|
||||
);
|
||||
|
||||
resolve(esTarballPath);
|
||||
}
|
||||
});
|
||||
const build = execa(GRADLE_BIN, buildArgs, {
|
||||
cwd: sourcePath,
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
});
|
||||
|
||||
const stdout = readline.createInterface({ input: build.stdout });
|
||||
const stderr = readline.createInterface({ input: build.stderr });
|
||||
|
||||
stdout.on('line', line => log.debug(line));
|
||||
stderr.on('line', line => log.error(line));
|
||||
|
||||
const [exitCode] = await Promise.all([
|
||||
onceEvent(build, 'exit'),
|
||||
onceEvent(stdout, 'close'),
|
||||
onceEvent(stderr, 'close'),
|
||||
]);
|
||||
|
||||
if (exitCode > 0) {
|
||||
throw createCliError('unable to build ES');
|
||||
}
|
||||
|
||||
const esTarballPath = findMostRecentlyChanged(
|
||||
path.resolve(sourcePath, ES_ARCHIVE_PATTERN)
|
||||
);
|
||||
|
||||
return esTarballPath;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue