mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
Partial backport of https://github.com/elastic/kibana/pull/17763, originally attempted in https://github.com/elastic/kibana/pull/17849 but was abandoned, and I'd like it to be easier to backport https://github.com/elastic/kibana/pull/22110 and future PRs.
This commit is contained in:
parent
cba2c02135
commit
1878913e3f
9 changed files with 41 additions and 54 deletions
|
@ -27,6 +27,7 @@ module.exports = {
|
|||
'packages/kbn-pm/**/*',
|
||||
'packages/kbn-es/**/*',
|
||||
'packages/kbn-datemath/**/*.js',
|
||||
'packages/kbn-dev-utils/**/*',
|
||||
'packages/kbn-i18n/**/*',
|
||||
'packages/kbn-plugin-generator/**/*',
|
||||
'packages/kbn-test/**/*',
|
||||
|
|
|
@ -37,18 +37,21 @@ export function observeLines(readable) {
|
|||
const done$ = observeReadable(readable).pipe(share());
|
||||
|
||||
const scan$ = Rx.fromEvent(readable, 'data').pipe(
|
||||
scan(({ buffer }, chunk) => {
|
||||
buffer += chunk;
|
||||
scan(
|
||||
({ buffer }, chunk) => {
|
||||
buffer += chunk;
|
||||
|
||||
let match;
|
||||
const lines = [];
|
||||
while (match = buffer.match(SEP)) {
|
||||
lines.push(buffer.slice(0, match.index));
|
||||
buffer = buffer.slice(match.index + match[0].length);
|
||||
}
|
||||
let match;
|
||||
const lines = [];
|
||||
while ((match = buffer.match(SEP))) {
|
||||
lines.push(buffer.slice(0, match.index));
|
||||
buffer = buffer.slice(match.index + match[0].length);
|
||||
}
|
||||
|
||||
return { buffer, lines };
|
||||
}, { buffer: '' }),
|
||||
return { buffer, lines };
|
||||
},
|
||||
{ buffer: '' }
|
||||
),
|
||||
|
||||
// stop if done completes or errors
|
||||
takeUntil(done$.pipe(materialize()))
|
||||
|
@ -59,9 +62,7 @@ export function observeLines(readable) {
|
|||
done$,
|
||||
|
||||
// merge in the "lines" from each step
|
||||
scan$.pipe(
|
||||
mergeMap(({ lines }) => lines)
|
||||
),
|
||||
scan$.pipe(mergeMap(({ lines }) => lines)),
|
||||
|
||||
// inject the "unsplit" data at the end
|
||||
scan$.pipe(
|
||||
|
|
|
@ -39,10 +39,7 @@ async function withTimeout(attempt, ms, onTimeout) {
|
|||
try {
|
||||
await Promise.race([
|
||||
attempt(),
|
||||
new Promise((resolve, reject) => setTimeout(
|
||||
() => reject(TIMEOUT),
|
||||
STOP_TIMEOUT
|
||||
))
|
||||
new Promise((resolve, reject) => setTimeout(() => reject(TIMEOUT), STOP_TIMEOUT)),
|
||||
]);
|
||||
} catch (error) {
|
||||
if (error === TIMEOUT) {
|
||||
|
@ -143,7 +140,9 @@ export function createProc(name, { cmd, args, cwd, env, stdin, log }) {
|
|||
},
|
||||
STOP_TIMEOUT,
|
||||
async () => {
|
||||
throw new Error(`Proc "${name}" was stopped but never emitted either the "exit" or "error" event after ${STOP_TIMEOUT} ms`);
|
||||
throw new Error(
|
||||
`Proc "${name}" was stopped but never emitted either the "exit" or "error" event after ${STOP_TIMEOUT} ms`
|
||||
);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
@ -41,13 +41,13 @@ export class ProcRunner {
|
|||
this._procs = [];
|
||||
this._log = log;
|
||||
this._signalSubscription = observeSignals(process).subscribe({
|
||||
next: async (signal) => {
|
||||
next: async signal => {
|
||||
await this.teardown(signal);
|
||||
if (signal !== 'exit') {
|
||||
// resend the signal
|
||||
process.kill(process.pid, signal);
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -71,7 +71,7 @@ export class ProcRunner {
|
|||
cwd = process.cwd(),
|
||||
stdin = null,
|
||||
wait = false,
|
||||
env = process.env
|
||||
env = process.env,
|
||||
} = options;
|
||||
|
||||
if (this._closing) {
|
||||
|
@ -140,9 +140,7 @@ export class ProcRunner {
|
|||
* @return {Promise<undefined>}
|
||||
*/
|
||||
async waitForAllToStop() {
|
||||
await Promise.all(
|
||||
this._procs.map(proc => proc.getOutcomePromise())
|
||||
);
|
||||
await Promise.all(this._procs.map(proc => proc.getOutcomePromise()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -168,9 +166,7 @@ export class ProcRunner {
|
|||
}
|
||||
|
||||
const stopWith = signal === 'exit' ? 'SIGKILL' : signal;
|
||||
await Promise.all(
|
||||
this._procs.map(proc => proc.stop(stopWith))
|
||||
);
|
||||
await Promise.all(this._procs.map(proc => proc.stop(stopWith)));
|
||||
}
|
||||
|
||||
_getProc(name) {
|
||||
|
@ -191,14 +187,14 @@ export class ProcRunner {
|
|||
|
||||
// tie into proc outcome$, remove from _procs on compete
|
||||
proc.outcome$.subscribe({
|
||||
next: (code) => {
|
||||
next: code => {
|
||||
const duration = moment.duration(Date.now() - startMs);
|
||||
this._log.info('[%s] exited with %s after %s', name, code, duration.humanize());
|
||||
},
|
||||
complete: () => {
|
||||
remove();
|
||||
},
|
||||
error: (error) => {
|
||||
error: error => {
|
||||
if (this._closing) {
|
||||
this._log.error(error);
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ export async function createPromiseFromStreams(streams) {
|
|||
}
|
||||
|
||||
let finalChunk;
|
||||
last.on('data', (chunk) => {
|
||||
last.on('data', chunk => {
|
||||
finalChunk = chunk;
|
||||
});
|
||||
last.on('end', () => {
|
||||
|
@ -76,10 +76,7 @@ export async function createPromiseFromStreams(streams) {
|
|||
|
||||
// wait (and rethrow) the first error, or for the last stream
|
||||
// to both finish writing and providing values to read
|
||||
await Promise.race([
|
||||
anyStreamFailure,
|
||||
Promise.all([lastFinishedWriting, lastFinishedReading])
|
||||
]);
|
||||
await Promise.race([anyStreamFailure, Promise.all([lastFinishedWriting, lastFinishedReading])]);
|
||||
|
||||
// return the final chunk read from the last stream
|
||||
return await lastFinishedReading;
|
||||
|
|
|
@ -75,6 +75,6 @@ export function createReduceStream(reducer, initial) {
|
|||
}
|
||||
|
||||
callback();
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
|
|
@ -108,8 +108,7 @@ describe('parseLogLevel(logLevel).flags', () => {
|
|||
// by specifying a long length
|
||||
const level = chance.word({ length: 10 });
|
||||
|
||||
expect(() => parseLogLevel(level))
|
||||
.to.throwError(level);
|
||||
expect(() => parseLogLevel(level)).to.throwError(level);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -17,14 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
const LEVELS = [
|
||||
'silent',
|
||||
'error',
|
||||
'warning',
|
||||
'info',
|
||||
'debug',
|
||||
'verbose',
|
||||
];
|
||||
const LEVELS = ['silent', 'error', 'warning', 'info', 'debug', 'verbose'];
|
||||
|
||||
export function pickLevelFromFlags(flags) {
|
||||
if (flags.verbose) return 'verbose';
|
||||
|
@ -38,10 +31,7 @@ export function parseLogLevel(name) {
|
|||
const i = LEVELS.indexOf(name);
|
||||
|
||||
if (i === -1) {
|
||||
const msg = (
|
||||
`Invalid log level "${name}" ` +
|
||||
`(expected one of ${LEVELS.join(',')})`
|
||||
);
|
||||
const msg = `Invalid log level "${name}" ` + `(expected one of ${LEVELS.join(',')})`;
|
||||
throw new Error(msg);
|
||||
}
|
||||
|
||||
|
|
|
@ -87,11 +87,15 @@ export function createToolingLog(initialLogLevelName = 'silent') {
|
|||
}
|
||||
|
||||
write(...args) {
|
||||
format(...args).split('\n').forEach((line, i) => {
|
||||
const subLineIndent = i === 0 ? '' : ' ';
|
||||
const indent = !indentString ? '' : indentString.slice(0, -1) + (i === 0 && line[0] === '-' ? '└' : '│');
|
||||
super.write(`${indent}${subLineIndent}${line}\n`);
|
||||
});
|
||||
format(...args)
|
||||
.split('\n')
|
||||
.forEach((line, i) => {
|
||||
const subLineIndent = i === 0 ? '' : ' ';
|
||||
const indent = !indentString
|
||||
? ''
|
||||
: indentString.slice(0, -1) + (i === 0 && line[0] === '-' ? '└' : '│');
|
||||
super.write(`${indent}${subLineIndent}${line}\n`);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue