Move proc_runner from x-pack-kibana into kibana (#17120)

* [tests] add proc_runner from x-pack-kibana

* [tests] implement proc_runner deps

* [tests] fix proc_runner dep

* [packages] move proc_runner into kbn-proc-runner

* [kbn-proc-runner] Package it

* [kbn-proc-runner] Chalk it up

* [kbn-proc-runner] Package.json

* Both kbn-proc-runner and kbn-utils depending on review. Sigh.

* Rename to kbn-dev-util, remove kbn-proc-runner

* Test proc-runner

* Add babel

* [proc-runner] build fixes

* Recreate yarn.locks, add mocha tests to simplemocha config
This commit is contained in:
archana 2018-03-14 19:55:50 -05:00 committed by GitHub
parent 0059180736
commit 409af00d4d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 2709 additions and 4 deletions

View file

@ -0,0 +1,3 @@
{
"presets": ["@kbn/babel-preset/node"]
}

View file

@ -0,0 +1,8 @@
# dev utils
## proc runner
For running integration tests in Kibana
## tooling log

View file

@ -0,0 +1,24 @@
{
"name": "@kbn/dev-utils",
"main": "./target/index.js",
"version": "1.0.0",
"license": "Apache-2.0",
"private": true,
"scripts": {
"build": "babel src --out-dir target",
"kbn:bootstrap": "yarn build"
},
"dependencies": {
"chalk": "^2.3.0",
"moment": "^2.20.1",
"tree-kill": "1.1.0"
},
"devDependencies": {
"babel-cli": "^6.26.0",
"@kbn/babel-preset": "link:../kbn-babel-preset",
"chance": "1.0.6",
"expect.js": "0.3.1"
}
}

View file

@ -0,0 +1,2 @@
export { withProcRunner } from './proc-runner';
export { createToolingLog, pickLevelFromFlags } from './tooling-log';

View file

@ -0,0 +1,3 @@
#! /usr/bin/node
console.log(process.argv.join(' '));

View file

@ -0,0 +1,22 @@
import { withProcRunner } from '../with_proc_runner';
describe('proc-runner', () => {
function runProc({ thing = '', procs }) {
return new Promise(resolve => {
setTimeout(() => {
procs.run('proc', {
cmd: './proc',
args: ['these', 'are', 'words'],
});
resolve(thing);
}, 500);
});
}
it('passes procs to a function', async () => {
await withProcRunner(async procs => {
await runProc({ procs });
await procs.stop('proc');
});
});
});

View file

@ -0,0 +1,11 @@
const $isCliError = Symbol('isCliError');
export function createCliError(message) {
const error = new Error(message);
error[$isCliError] = true;
return error;
}
export function isCliError(error) {
return error && !!error[$isCliError];
}

View file

@ -0,0 +1 @@
export { withProcRunner } from './with_proc_runner';

View file

@ -0,0 +1,4 @@
import { createToolingLog } from '../tooling-log';
export const log = createToolingLog('debug');
log.pipe(process.stdout);

View file

@ -0,0 +1,38 @@
import Rx from 'rxjs/Rx';
import { createCliError } from './errors';
/**
* Creates an Observable from a childProcess that:
* - provides the exit code as it's only value (which may be null)
* as soon as the process exits
* - completes once all stdio streams for the child process have closed
* - fails if the childProcess emits an error event
*
* @param {ChildProcess} childProcess
* @return {Rx.Observable}
*/
export function observeChildProcess(name, childProcess) {
// observe first exit event
const exit$ = Rx.Observable.fromEvent(childProcess, 'exit')
.first()
.map(code => {
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat then as errors
if (code > 0 && !(code === 143 || code === 130)) {
throw createCliError(`[${name}] exitted with code ${code}`);
}
return code;
});
// observe first close event
const close$ = Rx.Observable.fromEvent(childProcess, 'close').first();
// observe first error event until there is a close event
const error$ = Rx.Observable.fromEvent(childProcess, 'error')
.first()
.mergeMap(err => Rx.Observable.throw(err))
.takeUntil(close$);
return Rx.Observable.merge(exit$, close$.ignoreElements(), error$);
}

View file

@ -0,0 +1,51 @@
import Rx from 'rxjs/Rx';
const SEP = /\r?\n/;
import { observeReadable } from './observe_readable';
/**
* Creates an Observable from a Readable Stream that:
* - splits data from `readable` into lines
* - completes when `readable` emits "end"
* - fails if `readable` emits "errors"
*
* @param {ReadableStream} readable
* @return {Rx.Observable}
*/
export function observeLines(readable) {
const done$ = observeReadable(readable).share();
const scan$ = Rx.Observable
.fromEvent(readable, 'data')
.scan(({ buffer }, chunk) => {
buffer += chunk;
let match;
const lines = [];
while (match = buffer.match(SEP)) {
lines.push(buffer.slice(0, match.index));
buffer = buffer.slice(match.index + match[0].length);
}
return { buffer, lines };
}, { buffer: '' })
// stop if done completes or errors
.takeUntil(done$.materialize());
return Rx.Observable.merge(
// use done$ to provide completion/errors
done$,
// merge in the "lines" from each step
scan$
.mergeMap(({ lines }) => lines),
// inject the "unsplit" data at the end
scan$
.last()
.mergeMap(({ buffer }) => (buffer ? [buffer] : []))
// if there were no lines, last() will error, so catch and complete
.catch(() => Rx.Observable.empty())
);
}

View file

@ -0,0 +1,24 @@
import Rx from 'rxjs/Rx';
/**
* Produces an Observable from a ReadableSteam that:
* - completes on the first "end" event
* - fails on the first "error" event
*
* @param {ReadableStream} readable
* @return {Rx.Observable}
*/
export function observeReadable(readable) {
return Rx.Observable
.race(
Rx.Observable
.fromEvent(readable, 'end')
.first()
.ignoreElements(),
Rx.Observable
.fromEvent(readable, 'error')
.first()
.map(err => Rx.Observable.throw(err))
);
}

View file

@ -0,0 +1,16 @@
import Rx from 'rxjs/Rx';
/**
* Creates an Observable from a Process object that:
* - emits "exit", "SIGINT", or "SIGTERM" events that occur
*
* @param {ReadableStream} readable
* @return {Rx.Observable}
*/
export function observeSignals(process) {
return Rx.Observable.merge(
Rx.Observable.fromEvent(process, 'exit').mapTo('exit'),
Rx.Observable.fromEvent(process, 'SIGINT').mapTo('SIGINT'),
Rx.Observable.fromEvent(process, 'SIGTERM').mapTo('SIGTERM'),
);
}

View file

@ -0,0 +1,65 @@
import { spawn } from 'child_process';
import { statSync } from 'fs';
import Rx from 'rxjs/Rx';
import { gray } from 'chalk';
import treeKill from 'tree-kill';
import { promisify } from 'util';
const treeKillAsync = promisify(treeKill);
import { log } from './log';
import { observeLines } from './observe_lines';
import { observeChildProcess } from './observe_child_process';
export function createProc(name, { cmd, args, cwd, env, stdin }) {
log.info('[%s] > %s', name, cmd, args.join(' '));
// spawn fails with ENOENT when either the
// cmd or cwd don't exist, so we check for the cwd
// ahead of time so that the error is less ambiguous
try {
if (!statSync(cwd).isDirectory()) {
throw new Error(`cwd "${cwd}" exists but is not a directory`);
}
} catch (err) {
if (err.code === 'ENOENT') {
throw new Error(`cwd "${cwd}" does not exist`);
}
}
const childProcess = spawn(cmd, args, {
cwd,
env,
stdio: [stdin ? 'pipe' : 'ignore', 'pipe', 'pipe'],
});
if (stdin) {
childProcess.stdin.end(stdin, 'utf8');
}
return new class Proc {
name = name;
lines$ = Rx.Observable.merge(
observeLines(childProcess.stdout),
observeLines(childProcess.stderr)
)
.do(line => log.write(` ${gray('proc')} [${gray(name)}] ${line}`))
.share();
outcome$ = observeChildProcess(name, childProcess).share();
outcomePromise = Rx.Observable.merge(
this.lines$.ignoreElements(),
this.outcome$
).toPromise();
closedPromise = this.outcomePromise.then(() => {}, () => {});
async stop(signal) {
await treeKillAsync(childProcess.pid, signal);
await this.closedPromise;
}
}();
}

View file

@ -0,0 +1,184 @@
import moment from 'moment';
import { log } from './log';
import { createCliError } from './errors';
import { createProc } from './proc';
import { observeSignals } from './observe_signals';
const noop = () => {};
/**
* Helper for starting and managing processes. In many ways it resembles the
* API from `grunt_run`, processes are named and can be started, waited for,
* backgrounded once they log something matching a RegExp...
*
* @class ProcRunner
*/
export class ProcRunner {
constructor() {
this._closing = false;
this._procs = [];
this._signalSubscription = observeSignals(process).subscribe({
next: async (signal) => {
await this.teardown(signal);
if (signal !== 'exit') {
// resend the signal
process.kill(process.pid, signal);
}
}
});
}
/**
* Start a process, tracking it by `name`
* @param {String} name
* @param {Object} options
* @property {String} options.cmd executable to run
* @property {Array<String>?} options.args arguments to provide the executable
* @property {String?} options.cwd current working directory for the process
* @property {RegExp|Boolean} options.wait Should start() wait for some time? Use
* `true` will wait until the proc exits,
* a `RegExp` will wait until that log line
* is found
* @return {Promise<undefined>}
*/
async run(name, options) {
const {
cmd,
args = [],
cwd = process.cwd(),
stdin = null,
wait = false,
env = process.env
} = options;
if (this._closing) {
throw new Error('ProcRunner is closing');
}
if (wait && !(wait instanceof RegExp) && wait !== true) {
throw new TypeError('wait param should either be a RegExp or `true`');
}
if (!!this._getProc(name)) {
throw new Error(`Process with name "${name}" already running`);
}
const proc = this._createProc(name, { cmd, args, cwd, env, stdin });
try {
// wait for process to log matching line
if (wait instanceof RegExp) {
await proc.lines$
.filter(line => wait.test(line))
.first()
.catch(err => {
if (err.name !== 'EmptyError') {
throw createCliError(`[${name}] exitted without matching pattern: ${wait}`);
} else {
throw err;
}
})
.toPromise();
}
// wait for process to complete
if (wait === true) {
await proc.outcomePromise;
}
} finally {
// while the procRunner closes promises will resolve/reject because
// processes and stopping, but consumers of run() shouldn't have to
// prepare for that, so just return a never-resolving promise
if (this._closing) {
await new Promise(noop);
}
}
}
/**
* Stop a named proc
* @param {String} name
* @param {String} [signal='SIGTERM']
* @return {Promise<undefined>}
*/
async stop(name, signal = 'SIGTERM') {
const proc = this._getProc(name);
if (proc) {
await proc.stop(signal);
} else {
log.warning('[%s] already stopped', name);
}
}
/**
* Wait for all running processes to stop naturally
* @return {Promise<undefined>}
*/
async waitForAllToStop() {
await Promise.all(
this._procs.map(proc => proc.closedPromise)
);
}
/**
* Close the ProcRunner and stop all running
* processes with `signal`
*
* @param {String} [signal=undefined]
* @return {Promise}
*/
async teardown(signal) {
if (this._closing) return;
this._closing = true;
this._signalSubscription.unsubscribe();
this._signalSubscription = null;
if (!signal && this._procs.length > 0) {
log.warning(
'%d processes left running, stop them with procs.stop(name):',
this._procs.length,
this._procs.map(proc => proc.name)
);
}
const stopWith = signal === 'exit' ? 'SIGKILL' : signal;
await Promise.all(
this._procs.map(proc => proc.stop(stopWith))
);
}
_getProc(name) {
return this._procs.find(proc => proc.name === name);
}
_createProc(name, options) {
const startMs = Date.now();
const proc = createProc(name, options);
this._procs.push(proc);
const remove = () => {
this._procs.splice(this._procs.indexOf(proc), 1);
};
// tie into proc outcome$, remove from _procs on compete
proc.outcome$.subscribe({
next: (code) => {
const duration = moment.duration(Date.now() - startMs);
log.info('[%s] exitted with %s after %s', name, code, duration.humanize());
},
complete: () => {
remove();
},
error: (error) => {
if (this._closing) {
log.error(error);
}
remove();
},
});
return proc;
}
}

View file

@ -0,0 +1,18 @@
import { ProcRunner } from './proc_runner';
/**
* Create a ProcRunner and pass it to an async function. When
* the async function finishes the ProcRunner is torn-down
* automatically
*
* @param {async Function} fn
* @return {Promise<undefined>}
*/
export async function withProcRunner(fn) {
const procs = new ProcRunner();
try {
await fn(procs);
} finally {
await procs.teardown();
}
}

View file

@ -0,0 +1,28 @@
import { createReduceStream } from './reduce_stream';
/**
* Creates a Transform stream that consumes all provided
* values and concatenates them using each values `concat`
* method.
*
* Concatenate strings:
* createListStream(['f', 'o', 'o'])
* .pipe(createConcatStream())
* .on('data', console.log)
* // logs "foo"
*
* Concatenate values into an array:
* createListStream([1,2,3])
* .pipe(createConcatStream([]))
* .pipe(createJsonStringifyStream())
* .on('data', console.log)
* // logs "[1,2,3]"
*
*
* @param {any} initial The initial value that subsequent
* items will concat with
* @return {Transform}
*/
export function createConcatStream(initial) {
return createReduceStream((acc, chunk) => acc.concat(chunk), initial);
}

View file

@ -0,0 +1,2 @@
export { createConcatStream } from './concat_stream';
export { createPromiseFromStreams } from './promise_from_streams';

View file

@ -0,0 +1,67 @@
/**
* Take an array of streams, pipe the output
* from each one into the next, listening for
* errors from any of the streams, and then resolve
* the promise once the final stream has finished
* writing/reading.
*
* If the last stream is readable, it's final value
* will be provided as the promise value.
*
* Errors emmitted from any stream will cause
* the promise to be rejected with that error.
*
* @param {Array<Stream>} streams
* @return {Promise<any>}
*/
export async function createPromiseFromStreams(streams) {
const last = streams[streams.length - 1];
// reject if any of the streams emits an error
const anyStreamFailure = new Promise((resolve, reject) => {
streams.forEach((stream, i) => {
if (i > 0) streams[i - 1].pipe(stream);
stream.on('error', reject);
return stream;
});
});
// resolve when the last stream has finished writing, or
// immediately if the last stream is not writable
const lastFinishedWriting = new Promise(resolve => {
if (typeof last.write !== 'function') {
resolve();
return;
}
last.on('finish', resolve);
});
// resolve with the final value provided by the last stream
// after the last stream has provided it, or immediately if the
// stream is not readable
const lastFinishedReading = new Promise(resolve => {
if (typeof last.read !== 'function') {
resolve();
return;
}
let finalChunk;
last.on('data', (chunk) => {
finalChunk = chunk;
});
last.on('end', () => {
resolve(finalChunk);
});
});
// wait (and rethrow) the first error, or for the last stream
// to both finish writing and providing values to read
await Promise.race([
anyStreamFailure,
Promise.all([lastFinishedWriting, lastFinishedReading])
]);
// return the final chunk read from the last stream
return await lastFinishedReading;
}

View file

@ -0,0 +1,61 @@
import { Transform } from 'stream';
/**
* Create a transform stream that consumes each chunk it receives
* and passes it to the reducer, which will return the new value
* for the stream. Once all chunks have been received the reduce
* stream provides the result of final call to the reducer to
* subscribers.
*
* @param {Function}
* @param {any} initial Initial value for the stream, if undefined
* then the first chunk provided is used as the
* initial value.
* @return {Transform}
*/
export function createReduceStream(reducer, initial) {
let i = -1;
let value = initial;
// if the reducer throws an error then the value is
// considered invalid and the stream will never provide
// it to subscribers. We will also stop calling the
// reducer for any new data that is provided to us
let failed = false;
if (typeof reducer !== 'function') {
throw new TypeError('reducer must be a function');
}
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk, enc, callback) {
try {
if (failed) {
return callback();
}
i += 1;
if (i === 0 && initial === undefined) {
value = chunk;
} else {
value = await reducer(value, chunk, enc);
}
callback();
} catch (err) {
failed = true;
callback(err);
}
},
flush(callback) {
if (!failed) {
this.push(value);
}
callback();
}
});
}

View file

@ -0,0 +1,83 @@
import expect from 'expect.js';
import Chance from 'chance';
import { createConcatStream, createPromiseFromStreams } from '../../streams';
import { createToolingLog } from '../tooling_log';
const chance = new Chance();
const capture = (level, block) => {
const log = createToolingLog(level);
block(log);
log.end();
return createPromiseFromStreams([log, createConcatStream('')]);
};
const nothingTest = (logLevel, method) => {
describe(`#${method}(...any)`, () => {
it('logs nothing', async () => {
const output = await capture(logLevel, log => log[method]('foo'));
expect(output).to.be('');
});
});
};
const somethingTest = (logLevel, method) => {
describe(`#${method}(...any)`, () => {
it('logs to output stream', async () => {
const output = await capture(logLevel, log => log[method]('foo'));
expect(output).to.contain('foo');
});
});
};
describe('utils: createToolingLog(logLevel, output)', () => {
it('is a readable stream', async () => {
const log = createToolingLog('debug');
log.info('Foo');
log.info('Bar');
log.info('Baz');
log.end();
const output = await createPromiseFromStreams([
log,
createConcatStream(''),
]);
expect(output).to.contain('Foo');
expect(output).to.contain('Bar');
expect(output).to.contain('Baz');
});
describe('log level', () => {
describe('logLevel=silent', () => {
nothingTest('silent', 'debug');
nothingTest('silent', 'info');
nothingTest('silent', 'error');
});
describe('logLevel=error', () => {
nothingTest('error', 'debug');
nothingTest('error', 'info');
somethingTest('error', 'error');
});
describe('logLevel=info', () => {
nothingTest('info', 'debug');
somethingTest('info', 'info');
somethingTest('info', 'error');
});
describe('logLevel=debug', () => {
somethingTest('debug', 'debug');
somethingTest('debug', 'info');
somethingTest('debug', 'error');
});
describe('invalid logLevel', () => {
it('throw error', () => {
// avoid the impossiblity that a valid level is generated
// by specifying a long length
const level = chance.word({ length: 10 });
expect(() => createToolingLog(level)).to.throwError(level);
});
});
});
});

View file

@ -0,0 +1,96 @@
import expect from 'expect.js';
import Chance from 'chance';
import { parseLogLevel } from '../log_levels';
const chance = new Chance();
describe('parseLogLevel(logLevel).flags', () => {
describe('logLevel=silent', () => {
it('produces correct map', () => {
expect(parseLogLevel('silent').flags).to.eql({
silent: true,
error: false,
warning: false,
info: false,
debug: false,
verbose: false,
});
});
});
describe('logLevel=error', () => {
it('produces correct map', () => {
expect(parseLogLevel('error').flags).to.eql({
silent: true,
error: true,
warning: false,
info: false,
debug: false,
verbose: false,
});
});
});
describe('logLevel=warning', () => {
it('produces correct map', () => {
expect(parseLogLevel('warning').flags).to.eql({
silent: true,
error: true,
warning: true,
info: false,
debug: false,
verbose: false,
});
});
});
describe('logLevel=info', () => {
it('produces correct map', () => {
expect(parseLogLevel('info').flags).to.eql({
silent: true,
error: true,
warning: true,
info: true,
debug: false,
verbose: false,
});
});
});
describe('logLevel=debug', () => {
it('produces correct map', () => {
expect(parseLogLevel('debug').flags).to.eql({
silent: true,
error: true,
warning: true,
info: true,
debug: true,
verbose: false,
});
});
});
describe('logLevel=verbose', () => {
it('produces correct map', () => {
expect(parseLogLevel('verbose').flags).to.eql({
silent: true,
error: true,
warning: true,
info: true,
debug: true,
verbose: true,
});
});
});
describe('invalid logLevel', () => {
it('throws error', () => {
// avoid the impossiblity that a valid level is generated
// by specifying a long length
const level = chance.word({ length: 10 });
expect(() => parseLogLevel(level))
.to.throwError(level);
});
});
});

View file

@ -0,0 +1,2 @@
export { createToolingLog } from './tooling_log';
export { pickLevelFromFlags } from './log_levels';

View file

@ -0,0 +1,35 @@
const LEVELS = [
'silent',
'error',
'warning',
'info',
'debug',
'verbose',
];
export function pickLevelFromFlags(flags) {
if (flags.verbose) return 'verbose';
if (flags.debug) return 'debug';
if (flags.quiet) return 'error';
if (flags.silent) return 'silent';
return 'info';
}
export function parseLogLevel(name) {
const i = LEVELS.indexOf(name);
if (i === -1) {
const msg = (
`Invalid log level "${name}" ` +
`(expected one of ${LEVELS.join(',')})`
);
throw new Error(msg);
}
const flags = {};
LEVELS.forEach((level, levelI) => {
flags[level] = levelI <= i;
});
return { name, flags };
}

View file

@ -0,0 +1,80 @@
import { format } from 'util';
import { PassThrough } from 'stream';
import { magentaBright, yellow, red, blue, green, dim } from 'chalk';
import { parseLogLevel } from './log_levels';
export function createToolingLog(initialLogLevelName = 'silent') {
// current log level (see logLevel.name and logLevel.flags) changed
// with ToolingLog#setLevel(newLogLevelName);
let logLevel = parseLogLevel(initialLogLevelName);
// current indentation level, changed with ToolingLog#indent(delta)
let indentString = '';
class ToolingLog extends PassThrough {
constructor() {
super({ objectMode: true });
}
verbose(...args) {
if (!logLevel.flags.verbose) return;
this.write(' %s ', magentaBright('sill'), format(...args));
}
debug(...args) {
if (!logLevel.flags.debug) return;
this.write(' %s ', dim('debg'), format(...args));
}
info(...args) {
if (!logLevel.flags.info) return;
this.write(' %s ', blue('info'), format(...args));
}
success(...args) {
if (!logLevel.flags.info) return;
this.write(' %s ', green('succ'), format(...args));
}
warning(...args) {
if (!logLevel.flags.warning) return;
this.write(' %s ', yellow('warn'), format(...args));
}
error(err) {
if (!logLevel.flags.error) return;
if (typeof err !== 'string' && !(err instanceof Error)) {
err = new Error(`"${err}" thrown`);
}
this.write('%s ', red('ERROR'), err.stack || err.message || err);
}
indent(delta = 0) {
const width = Math.max(0, indentString.length + delta);
indentString = ' '.repeat(width);
return indentString.length;
}
getLevel() {
return logLevel.name;
}
setLevel(newLogLevelName) {
logLevel = parseLogLevel(newLogLevelName);
}
write(...args) {
format(...args).split('\n').forEach((line, i) => {
const subLineIndent = i === 0 ? '' : ' ';
const indent = !indentString ? '' : indentString.slice(0, -1) + (i === 0 && line[0] === '-' ? '└' : '│');
super.write(`${indent}${subLineIndent}${line}\n`);
});
}
}
return new ToolingLog();
}

File diff suppressed because it is too large Load diff

View file

@ -6,7 +6,7 @@ export default {
slow: 5000,
ignoreLeaks: false,
reporter: createAutoJunitReporter({
reportName: 'Server Mocha Tests'
reportName: 'Server Mocha Tests',
}),
globals: ['nil'],
},
@ -16,11 +16,12 @@ export default {
'src/**/__tests__/**/*.js',
'packages/kbn-pm/**/__tests__/**/*.js',
'packages/kbn-datemath/test/**/*.js',
'packages/kbn-dev-utils/**/__tests__/**/*.js',
'tasks/**/__tests__/**/*.js',
'test/fixtures/__tests__/*.js',
'!**/__tests__/fixtures/**/*',
'!src/**/public/**',
'!**/_*.js'
]
}
'!**/_*.js',
],
},
};