[7.x] ts-ify @kbn/dev-utils (#43383) (#43401)

This commit is contained in:
Spencer 2019-08-16 07:12:42 -07:00 committed by GitHub
parent 4eaaf74e0a
commit d1fdb9e4f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
44 changed files with 473 additions and 813 deletions

View file

@ -371,7 +371,7 @@
"eslint-plugin-prettier": "3.1.0",
"eslint-plugin-react": "7.13.0",
"eslint-plugin-react-hooks": "1.6.0",
"exit-hook": "^2.1.0",
"exit-hook": "^2.2.0",
"faker": "1.1.0",
"fetch-mock": "7.3.3",
"geckodriver": "1.16.2",

View file

@ -1,3 +0,0 @@
{
"presets": ["@kbn/babel-preset/node_preset"]
}

View file

@ -1,26 +1,25 @@
{
"name": "@kbn/dev-utils",
"main": "./target/index.js",
"types": "./index.d.ts",
"version": "1.0.0",
"license": "Apache-2.0",
"private": true,
"scripts": {
"build": "babel src --out-dir target --copy-files",
"kbn:bootstrap": "yarn build --quiet",
"build": "tsc",
"kbn:bootstrap": "yarn build",
"kbn:watch": "yarn build --watch"
},
"dependencies": {
"chalk": "^2.4.1",
"execa": "^1.0.0",
"exit-hook": "^2.2.0",
"moment": "^2.20.1",
"rxjs": "^6.2.1",
"tree-kill": "^1.2.0",
"tslib": "^1.9.3"
},
"devDependencies": {
"@babel/cli": "7.4.4",
"@kbn/babel-preset": "1.0.0",
"typescript": "3.5.3",
"@kbn/expect": "1.0.0",
"chance": "1.0.18"
}

View file

@ -19,6 +19,6 @@
import { resolve } from 'path';
export const CA_CERT_PATH = resolve(__dirname, 'ca.crt');
export const ES_KEY_PATH = resolve(__dirname, 'elasticsearch.key');
export const ES_CERT_PATH = resolve(__dirname, 'elasticsearch.crt');
export const CA_CERT_PATH = resolve(__dirname, '../certs/ca.crt');
export const ES_KEY_PATH = resolve(__dirname, '../certs/elasticsearch.key');
export const ES_CERT_PATH = resolve(__dirname, '../certs/elasticsearch.crt');

View file

@ -19,12 +19,16 @@
const $isCliError = Symbol('isCliError');
export function createCliError(message) {
const error = new Error(message);
error[$isCliError] = true;
return error;
interface CliError extends Error {
[$isCliError]: boolean;
}
export function isCliError(error) {
export function createCliError(message: string) {
const error: Partial<CliError> = new Error(message);
error[$isCliError] = true;
return error as CliError;
}
export function isCliError(error: any): error is CliError {
return error && !!error[$isCliError];
}

View file

@ -17,6 +17,8 @@
* under the License.
*/
import { Readable } from 'stream';
import * as Rx from 'rxjs';
import { scan, takeUntil, share, materialize, mergeMap, last, catchError } from 'rxjs/operators';
@ -33,17 +35,25 @@ import { observeReadable } from './observe_readable';
* @param {ReadableStream} readable
* @return {Rx.Observable}
*/
export function observeLines(readable) {
export function observeLines(readable: Readable): Rx.Observable<string> {
const done$ = observeReadable(readable).pipe(share());
const scan$ = Rx.fromEvent(readable, 'data').pipe(
const scan$: Rx.Observable<{ buffer: string; lines?: string[] }> = Rx.fromEvent(
readable,
'data'
).pipe(
scan(
({ buffer }, chunk) => {
buffer += chunk;
let match;
const lines = [];
while ((match = buffer.match(SEP))) {
while (true) {
const match = buffer.match(SEP);
if (!match || match.index === undefined) {
break;
}
lines.push(buffer.slice(0, match.index));
buffer = buffer.slice(match.index + match[0].length);
}
@ -54,7 +64,9 @@ export function observeLines(readable) {
),
// stop if done completes or errors
takeUntil(done$.pipe(materialize()))
takeUntil(done$.pipe(materialize())),
share()
);
return Rx.merge(
@ -62,7 +74,7 @@ export function observeLines(readable) {
done$,
// merge in the "lines" from each step
scan$.pipe(mergeMap(({ lines }) => lines)),
scan$.pipe(mergeMap(({ lines }) => lines || [])),
// inject the "unsplit" data at the end
scan$.pipe(

View file

@ -17,18 +17,17 @@
* under the License.
*/
import { Readable } from 'stream';
import * as Rx from 'rxjs';
import { first, ignoreElements, map } from 'rxjs/operators';
import { first, ignoreElements, mergeMap } from 'rxjs/operators';
/**
* Produces an Observable from a ReadableSteam that:
* - completes on the first "end" event
* - fails on the first "error" event
*
* @param {ReadableStream} readable
* @return {Rx.Observable}
*/
export function observeReadable(readable) {
export function observeReadable(readable: Readable): Rx.Observable<never> {
return Rx.race(
Rx.fromEvent(readable, 'end').pipe(
first(),
@ -37,7 +36,7 @@ export function observeReadable(readable) {
Rx.fromEvent(readable, 'error').pipe(
first(),
map(err => Rx.throwError(err))
mergeMap(err => Rx.throwError(err))
)
);
}

View file

@ -1,36 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import * as Rx from 'rxjs';
import { mapTo } from 'rxjs/operators';
/**
* Creates an Observable from a Process object that:
* - emits "exit", "SIGINT", or "SIGTERM" events that occur
*
* @param {ReadableStream} readable
* @return {Rx.Observable}
*/
export function observeSignals(process) {
return Rx.merge(
Rx.fromEvent(process, 'exit').pipe(mapTo('exit')),
Rx.fromEvent(process, 'SIGINT').pipe(mapTo('SIGINT')),
Rx.fromEvent(process, 'SIGTERM').pipe(mapTo('SIGTERM'))
);
}

View file

@ -1,160 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import execa from 'execa';
import { statSync } from 'fs';
import * as Rx from 'rxjs';
import { tap, share, take, mergeMap, map, ignoreElements } from 'rxjs/operators';
import { gray } from 'chalk';
import treeKill from 'tree-kill';
import { promisify } from 'util';
const treeKillAsync = promisify(treeKill);
import { observeLines } from './observe_lines';
import { createCliError } from './errors';
const SECOND = 1000;
const STOP_TIMEOUT = 30 * SECOND;
async function withTimeout(attempt, ms, onTimeout) {
const TIMEOUT = Symbol('timeout');
try {
await Promise.race([
attempt(),
new Promise((resolve, reject) => setTimeout(() => reject(TIMEOUT), STOP_TIMEOUT)),
]);
} catch (error) {
if (error === TIMEOUT) {
await onTimeout();
} else {
throw error;
}
}
}
export function createProc(name, { cmd, args, cwd, env, stdin, log }) {
log.info('[%s] > %s', name, cmd, args.join(' '));
// spawn fails with ENOENT when either the
// cmd or cwd don't exist, so we check for the cwd
// ahead of time so that the error is less ambiguous
try {
if (!statSync(cwd).isDirectory()) {
throw new Error(`cwd "${cwd}" exists but is not a directory`);
}
} catch (err) {
if (err.code === 'ENOENT') {
throw new Error(`cwd "${cwd}" does not exist`);
}
}
const childProcess = execa(cmd, args, {
cwd,
env,
stdio: ['pipe', 'pipe', 'pipe'],
});
if (stdin) {
childProcess.stdin.end(stdin, 'utf8');
} else {
childProcess.stdin.end();
}
return new (class Proc {
name = name;
lines$ = Rx.merge(observeLines(childProcess.stdout), observeLines(childProcess.stderr)).pipe(
tap(line => log.write(` ${gray('proc')} [${gray(name)}] ${line}`)),
share()
);
outcome$ = Rx.defer(() => {
// observe first exit event
const exit$ = Rx.fromEvent(childProcess, 'exit').pipe(
take(1),
map(([code]) => {
if (this._stopCalled) {
return null;
}
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat then as errors
if (code > 0 && !(code === 143 || code === 130)) {
throw createCliError(`[${name}] exited with code ${code}`);
}
return code;
})
);
// observe first error event until there is a close event
const error$ = Rx.fromEvent(childProcess, 'error').pipe(
take(1),
mergeMap(err => Rx.throwError(err))
);
return Rx.race(exit$, error$);
}).pipe(share());
_outcomePromise = Rx.merge(this.lines$.pipe(ignoreElements()), this.outcome$).toPromise();
getOutcomePromise() {
return this._outcomePromise;
}
_stopCalled = false;
async stop(signal) {
if (this._stopCalled) {
return;
}
this._stopCalled = true;
await withTimeout(
async () => {
log.debug(`Sending "${signal}" to proc "${name}"`);
await treeKillAsync(childProcess.pid, signal);
await this.getOutcomePromise();
},
STOP_TIMEOUT,
async () => {
log.warning(
`Proc "${name}" was sent "${signal}" didn't emit the "exit" or "error" events after ${STOP_TIMEOUT} ms, sending SIGKILL`
);
await treeKillAsync(childProcess.pid, 'SIGKILL');
}
);
await withTimeout(
async () => {
try {
await this.getOutcomePromise();
} catch (error) {
// ignore
}
},
STOP_TIMEOUT,
async () => {
throw new Error(
`Proc "${name}" was stopped but never emitted either the "exit" or "error" event after ${STOP_TIMEOUT} ms`
);
}
);
}
})();
}

View file

@ -0,0 +1,180 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import execa from 'execa';
import { statSync } from 'fs';
import * as Rx from 'rxjs';
import { tap, share, take, mergeMap, map, ignoreElements } from 'rxjs/operators';
import chalk from 'chalk';
import treeKill from 'tree-kill';
import { promisify } from 'util';
const treeKillAsync = promisify(treeKill);
import { ToolingLog } from '../tooling_log';
import { observeLines } from './observe_lines';
import { createCliError } from './errors';
const SECOND = 1000;
const STOP_TIMEOUT = 30 * SECOND;
export interface ProcOptions {
cmd: string;
args: string[];
cwd: string;
env?: Record<string, string | undefined>;
stdin?: string;
}
async function withTimeout(
attempt: () => Promise<void>,
ms: number,
onTimeout: () => Promise<void>
) {
const TIMEOUT = Symbol('timeout');
try {
await Promise.race([
attempt(),
new Promise((_, reject) => setTimeout(() => reject(TIMEOUT), ms)),
]);
} catch (error) {
if (error === TIMEOUT) {
await onTimeout();
} else {
throw error;
}
}
}
export type Proc = ReturnType<typeof startProc>;
export function startProc(name: string, options: ProcOptions, log: ToolingLog) {
const { cmd, args, cwd, env, stdin } = options;
log.info('[%s] > %s', name, cmd, args.join(' '));
// spawn fails with ENOENT when either the
// cmd or cwd don't exist, so we check for the cwd
// ahead of time so that the error is less ambiguous
try {
if (!statSync(cwd).isDirectory()) {
throw new Error(`cwd "${cwd}" exists but is not a directory`);
}
} catch (err) {
if (err.code === 'ENOENT') {
throw new Error(`cwd "${cwd}" does not exist`);
}
}
const childProcess = execa(cmd, args, {
cwd,
env,
stdio: ['pipe', 'pipe', 'pipe'],
});
if (stdin) {
childProcess.stdin.end(stdin, 'utf8');
} else {
childProcess.stdin.end();
}
let stopCalled = false;
const outcome$: Rx.Observable<number | null> = Rx.race(
// observe first exit event
Rx.fromEvent(childProcess, 'exit').pipe(
take(1),
map(([code]: [number]) => {
if (stopCalled) {
return null;
}
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat then as errors
if (code > 0 && !(code === 143 || code === 130)) {
throw createCliError(`[${name}] exited with code ${code}`);
}
return code;
})
),
// observe first error event
Rx.fromEvent(childProcess, 'error').pipe(
take(1),
mergeMap(err => Rx.throwError(err))
)
).pipe(share());
const lines$ = Rx.merge(
observeLines(childProcess.stdout),
observeLines(childProcess.stderr)
).pipe(
tap(line => log.write(` ${chalk.gray('proc')} [${chalk.gray(name)}] ${line}`)),
share()
);
const outcomePromise = Rx.merge(lines$.pipe(ignoreElements()), outcome$).toPromise();
async function stop(signal: NodeJS.Signals) {
if (stopCalled) {
return;
}
stopCalled = true;
await withTimeout(
async () => {
log.debug(`Sending "${signal}" to proc "${name}"`);
await treeKillAsync(childProcess.pid, signal);
await outcomePromise;
},
STOP_TIMEOUT,
async () => {
log.warning(
`Proc "${name}" was sent "${signal}" didn't emit the "exit" or "error" events after ${STOP_TIMEOUT} ms, sending SIGKILL`
);
await treeKillAsync(childProcess.pid, 'SIGKILL');
}
);
await withTimeout(
async () => {
try {
await outcomePromise;
} catch (error) {
// ignore
}
},
STOP_TIMEOUT,
async () => {
throw new Error(
`Proc "${name}" was stopped but never emitted either the "exit" or "error" event after ${STOP_TIMEOUT} ms`
);
}
);
}
return {
name,
lines$,
outcome$,
outcomePromise,
stop,
};
}

View file

@ -19,13 +19,18 @@
import moment from 'moment';
import { filter, first, catchError } from 'rxjs/operators';
import exitHook from 'exit-hook';
import { ToolingLog } from '../tooling_log';
import { createCliError } from './errors';
import { createProc } from './proc';
import { observeSignals } from './observe_signals';
import { Proc, ProcOptions, startProc } from './proc';
const noop = () => {};
interface RunOptions extends ProcOptions {
wait: true | RegExp;
}
/**
* Helper for starting and managing processes. In many ways it resembles the
* API from `grunt_run`, processes are named and can be started, waited for,
@ -34,20 +39,15 @@ const noop = () => {};
* @class ProcRunner
*/
export class ProcRunner {
constructor(options) {
const { log } = options;
private closing = false;
private procs: Proc[] = [];
private signalUnsubscribe: () => void;
this._closing = false;
this._procs = [];
this._log = log;
this._signalSubscription = observeSignals(process).subscribe({
next: async signal => {
await this.teardown(signal);
if (signal !== 'exit') {
// resend the signal
process.kill(process.pid, signal);
}
},
constructor(private log: ToolingLog) {
this.signalUnsubscribe = exitHook(() => {
this.teardown().catch(error => {
log.error(`ProcRunner teardown error: ${error.stack}`);
});
});
}
@ -64,17 +64,17 @@ export class ProcRunner {
* is found
* @return {Promise<undefined>}
*/
async run(name, options) {
async run(name: string, options: RunOptions) {
const {
cmd,
args = [],
cwd = process.cwd(),
stdin = null,
stdin = undefined,
wait = false,
env = process.env,
} = options;
if (this._closing) {
if (this.closing) {
throw new Error('ProcRunner is closing');
}
@ -82,15 +82,21 @@ export class ProcRunner {
throw new TypeError('wait param should either be a RegExp or `true`');
}
if (!!this._getProc(name)) {
if (!!this.getProc(name)) {
throw new Error(`Process with name "${name}" already running`);
}
const proc = this._createProc(name, { cmd, args, cwd, env, stdin });
const proc = this.startProc(name, {
cmd,
args,
cwd,
env,
stdin,
});
try {
// wait for process to log matching line
if (wait instanceof RegExp) {
// wait for process to log matching line
await proc.lines$
.pipe(
filter(line => wait.test(line)),
@ -106,15 +112,15 @@ export class ProcRunner {
.toPromise();
}
// wait for process to complete
if (wait === true) {
await proc.getOutcomePromise();
// wait for process to complete
await proc.outcomePromise;
}
} finally {
// while the procRunner closes promises will resolve/reject because
// processes and stopping, but consumers of run() shouldn't have to
// prepare for that, so just return a never-resolving promise
if (this._closing) {
if (this.closing) {
await new Promise(noop);
}
}
@ -122,16 +128,13 @@ export class ProcRunner {
/**
* Stop a named proc
* @param {String} name
* @param {String} [signal='SIGTERM']
* @return {Promise<undefined>}
*/
async stop(name, signal = 'SIGTERM') {
const proc = this._getProc(name);
async stop(name: string, signal: NodeJS.Signals = 'SIGTERM') {
const proc = this.getProc(name);
if (proc) {
await proc.stop(signal);
} else {
this._log.warning('[%s] already stopped', name);
this.log.warning('[%s] already stopped', name);
}
}
@ -140,7 +143,7 @@ export class ProcRunner {
* @return {Promise<undefined>}
*/
async waitForAllToStop() {
await Promise.all(this._procs.map(proc => proc.getOutcomePromise()));
await Promise.all(this.procs.map(proc => proc.outcomePromise));
}
/**
@ -150,53 +153,56 @@ export class ProcRunner {
* @param {String} [signal=undefined]
* @return {Promise}
*/
async teardown(signal) {
if (this._closing) return;
async teardown(signal: NodeJS.Signals | 'exit' = 'exit') {
if (this.closing) {
return;
}
this._closing = true;
this._signalSubscription.unsubscribe();
this._signalSubscription = null;
this.closing = true;
this.signalUnsubscribe();
if (!signal && this._procs.length > 0) {
this._log.warning(
if (!signal && this.procs.length > 0) {
this.log.warning(
'%d processes left running, stop them with procs.stop(name):',
this._procs.length,
this._procs.map(proc => proc.name)
this.procs.length,
this.procs.map(proc => proc.name)
);
}
const stopWith = signal === 'exit' ? 'SIGKILL' : signal;
await Promise.all(this._procs.map(proc => proc.stop(stopWith)));
await Promise.all(
this.procs.map(async proc => {
await proc.stop(signal === 'exit' ? 'SIGKILL' : signal);
})
);
}
_getProc(name) {
return this._procs.find(proc => proc.name === name);
}
_createProc(name, options) {
const startMs = Date.now();
const proc = createProc(name, {
...options,
log: this._log,
private getProc(name: string) {
return this.procs.find(proc => {
return proc.name === name;
});
}
this._procs.push(proc);
private startProc(name: string, options: ProcOptions) {
const startMs = Date.now();
const proc = startProc(name, options, this.log);
this.procs.push(proc);
const remove = () => {
this._procs.splice(this._procs.indexOf(proc), 1);
this.procs.splice(this.procs.indexOf(proc), 1);
};
// tie into proc outcome$, remove from _procs on compete
proc.outcome$.subscribe({
next: code => {
const duration = moment.duration(Date.now() - startMs);
this._log.info('[%s] exited with %s after %s', name, code, duration.humanize());
this.log.info('[%s] exited with %s after %s', name, code, duration.humanize());
},
complete: () => {
remove();
},
error: error => {
if (this._closing) {
this._log.error(error);
if (this.closing) {
this.log.error(error);
}
remove();
},

View file

@ -22,14 +22,14 @@ import { withProcRunner } from './with_proc_runner';
import { ProcRunner } from './proc_runner';
it('passes proc runner to a function', async () => {
await withProcRunner(new ToolingLog(), proc => {
await withProcRunner(new ToolingLog(), async proc => {
expect(proc).toBeInstanceOf(ProcRunner);
});
});
it('calls procRunner.teardown() if function returns synchronously', async () => {
let teardownSpy;
await withProcRunner(new ToolingLog(), proc => {
await withProcRunner(new ToolingLog(), async proc => {
teardownSpy = jest.spyOn(proc, 'teardown');
});
@ -41,7 +41,7 @@ it('calls procRunner.teardown() if function throw synchronous error, and rejects
let teardownSpy;
await expect(
withProcRunner(new ToolingLog(), proc => {
withProcRunner(new ToolingLog(), async proc => {
teardownSpy = jest.spyOn(proc, 'teardown');
throw error;
})

View file

@ -18,6 +18,7 @@
*/
import { ProcRunner } from './proc_runner';
import { ToolingLog } from '../tooling_log';
/**
* Create a ProcRunner and pass it to an async function. When
@ -28,8 +29,12 @@ import { ProcRunner } from './proc_runner';
* @param {async Function} fn
* @return {Promise<undefined>}
*/
export async function withProcRunner(log, fn) {
const procs = new ProcRunner({ log });
export async function withProcRunner(
log: ToolingLog,
fn: (procs: ProcRunner) => Promise<void>
): Promise<void> {
const procs = new ProcRunner(log);
try {
await fn(procs);
} finally {

View file

@ -17,9 +17,9 @@
* under the License.
*/
export function createAbsolutePathSerializer(rootPath) {
export function createAbsolutePathSerializer(rootPath: string) {
return {
print: value => value.replace(rootPath, '<absolute path>').replace(/\\/g, '/'),
test: value => typeof value === 'string' && value.startsWith(rootPath),
print: (value: string) => value.replace(rootPath, '<absolute path>').replace(/\\/g, '/'),
test: (value: any) => typeof value === 'string' && value.startsWith(rootPath),
};
}

View file

@ -1,20 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { createAbsolutePathSerializer } from './absolute_path_serializer';

View file

@ -1,46 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { createReduceStream } from './reduce_stream';
/**
* Creates a Transform stream that consumes all provided
* values and concatenates them using each values `concat`
* method.
*
* Concatenate strings:
* createListStream(['f', 'o', 'o'])
* .pipe(createConcatStream())
* .on('data', console.log)
* // logs "foo"
*
* Concatenate values into an array:
* createListStream([1,2,3])
* .pipe(createConcatStream([]))
* .on('data', console.log)
* // logs "[1,2,3]"
*
*
* @param {any} initial The initial value that subsequent
* items will concat with
* @return {Transform}
*/
export function createConcatStream(initial) {
return createReduceStream((acc, chunk) => acc.concat(chunk), initial);
}

View file

@ -1,21 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { createConcatStream } from './concat_stream';
export { createPromiseFromStreams } from './promise_from_streams';

View file

@ -1,83 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Take an array of streams, pipe the output
* from each one into the next, listening for
* errors from any of the streams, and then resolve
* the promise once the final stream has finished
* writing/reading.
*
* If the last stream is readable, it's final value
* will be provided as the promise value.
*
* Errors emitted from any stream will cause
* the promise to be rejected with that error.
*
* @param {Array<Stream>} streams
* @return {Promise<any>}
*/
export async function createPromiseFromStreams(streams) {
const last = streams[streams.length - 1];
// reject if any of the streams emits an error
const anyStreamFailure = new Promise((resolve, reject) => {
streams.forEach((stream, i) => {
if (i > 0) streams[i - 1].pipe(stream);
stream.on('error', reject);
return stream;
});
});
// resolve when the last stream has finished writing, or
// immediately if the last stream is not writable
const lastFinishedWriting = new Promise(resolve => {
if (typeof last.write !== 'function') {
resolve();
return;
}
last.on('finish', resolve);
});
// resolve with the final value provided by the last stream
// after the last stream has provided it, or immediately if the
// stream is not readable
const lastFinishedReading = new Promise(resolve => {
if (typeof last.read !== 'function') {
resolve();
return;
}
let finalChunk;
last.on('data', chunk => {
finalChunk = chunk;
});
last.on('end', () => {
resolve(finalChunk);
});
});
// wait (and rethrow) the first error, or for the last stream
// to both finish writing and providing values to read
await Promise.race([anyStreamFailure, Promise.all([lastFinishedWriting, lastFinishedReading])]);
// return the final chunk read from the last stream
return await lastFinishedReading;
}

View file

@ -1,80 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Transform } from 'stream';
/**
* Create a transform stream that consumes each chunk it receives
* and passes it to the reducer, which will return the new value
* for the stream. Once all chunks have been received the reduce
* stream provides the result of final call to the reducer to
* subscribers.
*
* @param {Function}
* @param {any} initial Initial value for the stream, if undefined
* then the first chunk provided is used as the
* initial value.
* @return {Transform}
*/
export function createReduceStream(reducer, initial) {
let i = -1;
let value = initial;
// if the reducer throws an error then the value is
// considered invalid and the stream will never provide
// it to subscribers. We will also stop calling the
// reducer for any new data that is provided to us
let failed = false;
if (typeof reducer !== 'function') {
throw new TypeError('reducer must be a function');
}
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk, enc, callback) {
try {
if (failed) {
return callback();
}
i += 1;
if (i === 0 && initial === undefined) {
value = chunk;
} else {
value = await reducer(value, chunk, enc);
}
callback();
} catch (err) {
failed = true;
callback(err);
}
},
flush(callback) {
if (!failed) {
this.push(value);
}
callback();
},
});
}

View file

@ -1,22 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { ToolingLog } from './tooling_log';
export { ToolingLogTextWriter } from './tooling_log_text_writer';
export { pickLevelFromFlags } from './log_levels';

View file

@ -18,5 +18,5 @@
*/
export { ToolingLog } from './tooling_log';
export { ToolingLogTextWriter, WriterConfig } from './tooling_log_text_writer';
export { ToolingLogTextWriter, ToolingLogTextWriterConfig } from './tooling_log_text_writer';
export { pickLevelFromFlags, LogLevel } from './log_levels';

View file

@ -1,32 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export type LogLevel = 'silent' | 'error' | 'warning' | 'info' | 'debug' | 'verbose';
export interface ParsedLogLevel {
name: LogLevel;
flags: { [key in LogLevel]: boolean };
}
export function pickLevelFromFlags(
flags: { [key: string]: any },
options?: { default?: LogLevel }
): LogLevel;
export function parseLogLevel(level: LogLevel): ParsedLogLevel;

View file

@ -29,7 +29,10 @@ it('parses valid log levels correctly', () => {
});
it('throws error for invalid levels', () => {
// @ts-ignore
expect(() => parseLogLevel('warn')).toThrowErrorMatchingSnapshot('warn');
// @ts-ignore
expect(() => parseLogLevel('foo')).toThrowErrorMatchingSnapshot('foo');
// @ts-ignore
expect(() => parseLogLevel('bar')).toThrowErrorMatchingSnapshot('bar');
});

View file

@ -17,9 +17,13 @@
* under the License.
*/
const LEVELS = ['silent', 'error', 'warning', 'info', 'debug', 'verbose'];
export type LogLevel = 'silent' | 'error' | 'warning' | 'info' | 'debug' | 'verbose';
const LEVELS: LogLevel[] = ['silent', 'error', 'warning', 'info', 'debug', 'verbose'];
export function pickLevelFromFlags(flags, options = {}) {
export function pickLevelFromFlags(
flags: Record<string, string | boolean | string[] | undefined>,
options: { default?: LogLevel } = {}
) {
if (flags.verbose) return 'verbose';
if (flags.debug) return 'debug';
if (flags.quiet) return 'error';
@ -27,7 +31,9 @@ export function pickLevelFromFlags(flags, options = {}) {
return options.default || 'info';
}
export function parseLogLevel(name) {
export type ParsedLogLevel = ReturnType<typeof parseLogLevel>;
export function parseLogLevel(name: LogLevel) {
const i = LEVELS.indexOf(name);
if (i === -1) {
@ -35,10 +41,13 @@ export function parseLogLevel(name) {
throw new Error(msg);
}
const flags = {};
const flags: { [key: string]: boolean } = {};
LEVELS.forEach((level, levelI) => {
flags[level] = levelI <= i;
});
return { name, flags };
return {
name,
flags: flags as { [key in LogLevel]: boolean },
};
}

View file

@ -17,6 +17,10 @@
* under the License.
*/
export function createAbsolutePathSerializer(
rootPath: string
): { print(...args: any[]): string; test(value: any): boolean };
export type MessageTypes = 'verbose' | 'debug' | 'info' | 'success' | 'warning' | 'error' | 'write';
export interface Message {
type: MessageTypes;
indent: number;
args: any[];
}

View file

@ -1,45 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// eslint-disable max-classes-per-file
import * as Rx from 'rxjs';
import { ToolingLogWriter, WriterConfig } from './tooling_log_text_writer';
export interface LogMessage {
type: 'verbose' | 'debug' | 'info' | 'success' | 'warning' | 'error' | 'write';
indent: number;
args: any[];
}
export class ToolingLog {
constructor(config?: WriterConfig);
public verbose(...args: any[]): void;
public debug(...args: any[]): void;
public info(...args: any[]): void;
public success(...args: any[]): void;
public warning(...args: any[]): void;
public error(errOrMsg: string | Error): void;
public write(...args: any[]): void;
public indent(spaces?: number): void;
public getWriters(): ToolingLogWriter[];
public setWriters(reporters: ToolingLogWriter[]): void;
public getWritten$(): Rx.Observable<LogMessage>;
}

View file

@ -1,101 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import * as Rx from 'rxjs';
import { EventEmitter } from 'events';
import { ToolingLogTextWriter } from './tooling_log_text_writer';
export class ToolingLog extends EventEmitter {
/**
* Create a ToolingLog object
* @param {WriterConfig} writerConfig
*/
constructor(writerConfig) {
super();
this._indent = 0;
this._writers = writerConfig ? [new ToolingLogTextWriter(writerConfig)] : [];
this._written$ = new Rx.Subject();
}
indent(delta = 0) {
this._indent = Math.max(this._indent + delta, 0);
return this._indent;
}
verbose(...args) {
this._write('verbose', args);
}
debug(...args) {
this._write('debug', args);
}
info(...args) {
this._write('info', args);
}
success(...args) {
this._write('success', args);
}
warning(...args) {
this._write('warning', args);
}
error(error) {
this._write('error', [error]);
}
write(...args) {
this._write('write', args);
}
getWriters() {
return this._writers.slice(0);
}
setWriters(writers) {
this._writers = [...writers];
}
getWritten$() {
return this._written$.asObservable();
}
_write(type, args) {
const msg = {
type,
indent: this._indent,
args,
};
let written = false;
for (const writer of this._writers) {
if (writer.write(msg)) {
written = true;
}
}
if (written) {
this._written$.next(msg);
}
}
}

View file

@ -21,6 +21,7 @@ import * as Rx from 'rxjs';
import { toArray, takeUntil } from 'rxjs/operators';
import { ToolingLog } from './tooling_log';
import { Writer } from './writer';
import { ToolingLogTextWriter } from './tooling_log_text_writer';
it('creates zero writers without a config', () => {
@ -31,7 +32,7 @@ it('creates zero writers without a config', () => {
it('creates a single writer with a single object', () => {
const log = new ToolingLog({ level: 'warning', writeTo: process.stdout });
expect(log.getWriters()).toHaveLength(1);
const [writer] = log.getWriters();
const [writer] = log.getWriters() as [ToolingLogTextWriter];
expect(writer.level).toHaveProperty('name', 'warning');
expect(writer.writeTo).toBe(process.stdout);
});
@ -79,7 +80,7 @@ describe('#indent()', () => {
});
});
['verbose', 'debug', 'info', 'success', 'warning', 'error', 'write'].forEach(method => {
(['verbose', 'debug', 'info', 'success', 'warning', 'error', 'write'] as const).forEach(method => {
describe(`#${method}()`, () => {
it(`sends a msg of type "${method}" to each writer with indent and arguments`, () => {
const log = new ToolingLog();
@ -104,7 +105,7 @@ describe('#indent()', () => {
});
describe('#getWritten$()', () => {
async function testWrittenMsgs(writers) {
async function testWrittenMsgs(writers: Writer[]) {
const log = new ToolingLog();
log.setWriters(writers);

View file

@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import * as Rx from 'rxjs';
import { ToolingLogTextWriter, ToolingLogTextWriterConfig } from './tooling_log_text_writer';
import { Writer } from './writer';
import { Message, MessageTypes } from './message';
export class ToolingLog {
private identWidth = 0;
private writers: Writer[];
private readonly written$: Rx.Subject<Message>;
constructor(writerConfig?: ToolingLogTextWriterConfig) {
this.writers = writerConfig ? [new ToolingLogTextWriter(writerConfig)] : [];
this.written$ = new Rx.Subject();
}
public indent(delta = 0) {
this.identWidth = Math.max(this.identWidth + delta, 0);
return this.identWidth;
}
public verbose(...args: any[]) {
this.sendToWriters('verbose', args);
}
public debug(...args: any[]) {
this.sendToWriters('debug', args);
}
public info(...args: any[]) {
this.sendToWriters('info', args);
}
public success(...args: any[]) {
this.sendToWriters('success', args);
}
public warning(...args: any[]) {
this.sendToWriters('warning', args);
}
public error(error: Error | string) {
this.sendToWriters('error', [error]);
}
public write(...args: any[]) {
this.sendToWriters('write', args);
}
public getWriters() {
return this.writers.slice(0);
}
public setWriters(writers: Writer[]) {
this.writers = [...writers];
}
public getWritten$() {
return this.written$.asObservable();
}
private sendToWriters(type: MessageTypes, args: any[]) {
const msg = {
type,
indent: this.identWidth,
args,
};
let written = false;
for (const writer of this.writers) {
if (writer.write(msg)) {
written = true;
}
}
if (written) {
this.written$.next(msg);
}
}
}

View file

@ -1,42 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { LogLevel, ParsedLogLevel } from './log_levels';
import { LogMessage } from './tooling_log';
export interface ToolingLogWriter {
write(msg: LogMessage): boolean;
}
export interface WriteTarget {
write(chunk: string): void;
}
export interface WriterConfig {
level: LogLevel;
writeTo: WriteTarget;
}
export class ToolingLogTextWriter implements ToolingLogTextWriter {
public level: ParsedLogLevel;
public writeTo: WriteTarget;
constructor(config: WriterConfig);
public write(msg: LogMessage): boolean;
}

View file

@ -23,6 +23,7 @@ it('throws error if created with invalid level', () => {
expect(
() =>
new ToolingLogTextWriter({
// @ts-ignore
level: 'foo',
})
).toThrowErrorMatchingSnapshot();
@ -32,6 +33,7 @@ it("throws error if writeTo config is not defined or doesn't have a write method
expect(() => {
new ToolingLogTextWriter({
level: 'verbose',
// @ts-ignore
writeTo: null,
});
}).toThrowErrorMatchingSnapshot();
@ -39,13 +41,14 @@ it("throws error if writeTo config is not defined or doesn't have a write method
expect(() => {
new ToolingLogTextWriter({
level: 'verbose',
// @ts-ignore
writeTo: 'foo',
});
}).toThrowErrorMatchingSnapshot();
});
const levels = ['silent', 'verbose', 'debug', 'info', 'warning', 'error'];
const types = ['verbose', 'debug', 'info', 'warning', 'error', 'success'];
const levels = ['silent', 'verbose', 'debug', 'info', 'warning', 'error'] as const;
const types = ['verbose', 'debug', 'info', 'warning', 'error', 'success'] as const;
for (const level of levels) {
for (const type of types) {
it(`level:${level}/type:${type} snapshots`, () => {
@ -58,7 +61,7 @@ for (const level of levels) {
});
const written = writer.write({
type: type,
type,
indent: 0,
args: ['foo'],
});

View file

@ -19,10 +19,13 @@
import { format } from 'util';
import { magentaBright, yellow, red, blue, green, dim } from 'chalk';
import chalk from 'chalk';
import { parseLogLevel } from './log_levels';
import { LogLevel, parseLogLevel, ParsedLogLevel } from './log_levels';
import { Writer } from './writer';
import { Message, MessageTypes } from './message';
const { magentaBright, yellow, red, blue, green, dim } = chalk;
const PREFIX_INDENT = ' '.repeat(6);
const MSG_PREFIXES = {
verbose: ` ${magentaBright('sill')} `,
@ -33,7 +36,16 @@ const MSG_PREFIXES = {
error: `${red('ERROR')} `,
};
function shouldWriteType(level, type) {
const has = <T extends object>(obj: T, key: any): key is keyof T => obj.hasOwnProperty(key);
export interface ToolingLogTextWriterConfig {
level: LogLevel;
writeTo: {
write(s: string): void;
};
}
function shouldWriteType(level: ParsedLogLevel, type: MessageTypes) {
if (type === 'write') {
return true;
}
@ -41,16 +53,25 @@ function shouldWriteType(level, type) {
return Boolean(level.flags[type === 'success' ? 'info' : type]);
}
function stringifyError(error) {
function stringifyError(error: string | Error) {
if (typeof error !== 'string' && !(error instanceof Error)) {
error = new Error(`"${error}" thrown`);
}
if (typeof error === 'string') {
return error;
}
return error.stack || error.message || error;
}
export class ToolingLogTextWriter {
constructor(config) {
export class ToolingLogTextWriter implements Writer {
public readonly level: ParsedLogLevel;
public readonly writeTo: {
write(msg: string): void;
};
constructor(config: ToolingLogTextWriterConfig) {
this.level = parseLogLevel(config.level);
this.writeTo = config.writeTo;
@ -61,13 +82,13 @@ export class ToolingLogTextWriter {
}
}
write({ type, indent, args }) {
write({ type, indent, args }: Message) {
if (!shouldWriteType(this.level, type)) {
return false;
}
const txt = type === 'error' ? stringifyError(args[0]) : format(...args);
const prefix = MSG_PREFIXES[type] || '';
const txt = type === 'error' ? stringifyError(args[0]) : format(args[0], ...args.slice(1));
const prefix = has(MSG_PREFIXES, type) ? MSG_PREFIXES[type] : '';
(prefix + txt).split('\n').forEach((line, i) => {
let lineIndent = '';

View file

@ -17,5 +17,8 @@
* under the License.
*/
export * from './src/tooling_log';
export * from './src/serializers';
import { Message } from './message';
export interface Writer {
write(msg: Message): boolean;
}

View file

@ -1,7 +1,10 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "target",
"declaration": true
},
"include": [
"index.d.ts",
"src/**/*.d.ts"
"src/**/*"
],
}

View file

@ -11956,10 +11956,10 @@ exit-hook@^1.0.0:
resolved "https://registry.yarnpkg.com/exit-hook/-/exit-hook-1.1.1.tgz#f05ca233b48c05d54fff07765df8507e95c02ff8"
integrity sha1-8FyiM7SMBdVP/wd2XfhQfpXAL/g=
exit-hook@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/exit-hook/-/exit-hook-2.1.0.tgz#2be08d8d01220050878577bfa017e104a6c3bcf3"
integrity sha512-JCSm9Znc/KW6hoKYHOIqLxM2Z88+AQcabo07rJHZSyXcQIq6HsXkSWRVZRp13RFkGVIDcz1DRIbKR5cnU1uzCA==
exit-hook@^2.2.0:
version "2.2.0"
resolved "https://registry.yarnpkg.com/exit-hook/-/exit-hook-2.2.0.tgz#f5502f92179018e867f2d8ee4428392da7f3894e"
integrity sha512-YFH+2oGdldRH5GqGpnaiKbBxWHMmuXHmKTMtUC58kWSOrnTf95rKITVSFTTtas14DWvWpih429+ffAvFetPwNA==
exit-on-epipe@~1.0.1:
version "1.0.1"