[kbn_pm] use more async in all commands (#141454)

This commit is contained in:
Spencer 2022-09-22 11:52:56 -05:00 committed by GitHub
parent fd9774c974
commit 7bc93503c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 278 additions and 83 deletions

1
.github/CODEOWNERS vendored
View file

@ -243,6 +243,7 @@ x-pack/examples/files_example @elastic/kibana-app-services
/.bazelversion @elastic/kibana-operations
/WORKSPACE.bazel @elastic/kibana-operations
/.buildkite/ @elastic/kibana-operations
/kbn_pm/ @elastic/kibana-operations
# Quality Assurance
/src/dev/code_coverage @elastic/kibana-qa

View file

@ -6,7 +6,7 @@
* Side Public License, v 1.
*/
import { spawnSync } from '../../lib/spawn.mjs';
import { run } from '../../lib/spawn.mjs';
import * as Bazel from '../../lib/bazel.mjs';
import { haveNodeModulesBeenManuallyDeleted, removeYarnIntegrityFileIfExists } from './yarn.mjs';
import { setupRemoteCache } from './setup_remote_cache.mjs';
@ -116,7 +116,7 @@ export const command = {
if (vscodeConfig) {
await time('update vscode config', async () => {
// Update vscode settings
spawnSync('node', ['scripts/update_vscode_config']);
await run('node', ['scripts/update_vscode_config']);
log.success('vscode config updated');
});

View file

@ -7,19 +7,16 @@
*/
import Path from 'path';
import Fs from 'fs';
import { spawnSync } from 'child_process';
import Fsp from 'fs/promises';
import { run } from '../../lib/spawn.mjs';
import { isFile } from '../../lib/fs.mjs';
import { dedent } from '../../lib/indent.mjs';
import { REPO_ROOT } from '../../lib/paths.mjs';
function isElasticCommitter() {
async function isElasticCommitter() {
try {
const { stdout: email } = spawnSync('git', ['config', 'user.email'], {
encoding: 'utf8',
});
const email = await run('git', ['config', 'user.email']);
return email.trim().endsWith('@elastic.co');
} catch {
return false;
@ -31,23 +28,23 @@ function isElasticCommitter() {
* @param {string} settingsPath
* @returns
*/
function upToDate(settingsPath) {
if (!isFile(settingsPath)) {
async function upToDate(settingsPath) {
if (!(await isFile(settingsPath))) {
return false;
}
const readSettingsFile = Fs.readFileSync(settingsPath, 'utf8');
const readSettingsFile = await Fsp.readFile(settingsPath, 'utf8');
return readSettingsFile.startsWith('# V2 ');
}
/**
* @param {import('@kbn/some-dev-log').SomeDevLog} log
*/
export function setupRemoteCache(log) {
export async function setupRemoteCache(log) {
// The remote cache is only for Elastic employees working locally (CI cache settings are handled elsewhere)
if (
process.env.FORCE_BOOTSTRAP_REMOTE_CACHE !== 'true' &&
(process.env.CI || !isElasticCommitter())
(process.env.CI || !(await isElasticCommitter()))
) {
return;
}
@ -57,7 +54,7 @@ export function setupRemoteCache(log) {
const settingsPath = Path.resolve(REPO_ROOT, '.bazelrc.cache');
// Checks if we should upgrade or install the config file
if (upToDate(settingsPath)) {
if (await upToDate(settingsPath)) {
log.debug(`remote cache config already exists and is up-to-date, skipping`);
return;
}
@ -70,6 +67,6 @@ export function setupRemoteCache(log) {
build --incompatible_remote_results_ignore_disk
`;
Fs.writeFileSync(settingsPath, contents);
await Fsp.writeFile(settingsPath, contents);
log.info(`remote cache settings written to ${settingsPath}`);
}

View file

@ -15,11 +15,11 @@ import { maybeRealpath, isFile, isDirectory } from '../../lib/fs.mjs';
// yarn integrity file checker
export async function removeYarnIntegrityFileIfExists() {
try {
const nodeModulesRealPath = maybeRealpath(Path.resolve(REPO_ROOT, 'node_modules'));
const nodeModulesRealPath = await maybeRealpath(Path.resolve(REPO_ROOT, 'node_modules'));
const yarnIntegrityFilePath = Path.resolve(nodeModulesRealPath, '.yarn-integrity');
// check if the file exists and delete it in that case
if (isFile(yarnIntegrityFilePath)) {
if (await isFile(yarnIntegrityFilePath)) {
await Fsp.unlink(yarnIntegrityFilePath);
}
} catch {
@ -28,26 +28,18 @@ export async function removeYarnIntegrityFileIfExists() {
}
// yarn and bazel integration checkers
function areNodeModulesPresent() {
try {
return isDirectory(Path.resolve(REPO_ROOT, 'node_modules'));
} catch {
return false;
}
async function areNodeModulesPresent() {
return await isDirectory(Path.resolve(REPO_ROOT, 'node_modules'));
}
function haveBazelFoldersBeenCreatedBefore() {
try {
return (
isDirectory(Path.resolve(REPO_ROOT, 'bazel-bin/packages')) ||
isDirectory(Path.resolve(REPO_ROOT, 'bazel-kibana/packages')) ||
isDirectory(Path.resolve(REPO_ROOT, 'bazel-out/host'))
);
} catch {
return false;
}
async function haveBazelFoldersBeenCreatedBefore() {
return (
(await isDirectory(Path.resolve(REPO_ROOT, 'bazel-bin/packages'))) ||
(await isDirectory(Path.resolve(REPO_ROOT, 'bazel-kibana/packages'))) ||
(await isDirectory(Path.resolve(REPO_ROOT, 'bazel-out/host')))
);
}
export function haveNodeModulesBeenManuallyDeleted() {
return !areNodeModulesPresent() && haveBazelFoldersBeenCreatedBefore();
export async function haveNodeModulesBeenManuallyDeleted() {
return !(await areNodeModulesPresent()) && (await haveBazelFoldersBeenCreatedBefore());
}

View file

@ -35,7 +35,7 @@ export const command = {
await cleanPaths(log, await findPluginCleanPaths(log));
// Runs Bazel soft clean
if (Bazel.isInstalled(log)) {
if (await Bazel.isInstalled(log)) {
await Bazel.clean(log, {
quiet: args.getBooleanValue('quiet'),
});

View file

@ -9,7 +9,7 @@
import Path from 'path';
import { REPO_ROOT } from '../lib/paths.mjs';
import { spawnSync, spawnStreaming } from '../lib/spawn.mjs';
import { run, spawnStreaming } from '../lib/spawn.mjs';
/** @type {import('../lib/command').Command} */
export const command = {
@ -59,7 +59,7 @@ export const command = {
const cwd = Path.resolve(REPO_ROOT, normalizedRepoRelativeDir);
if (args.getBooleanValue('quiet')) {
spawnSync('yarn', ['run', scriptName, ...scriptArgs], {
await run('yarn', ['run', scriptName, ...scriptArgs], {
cwd,
description: `${scriptName} in ${pkg.name}`,
});

105
kbn_pm/src/lib/async.mjs Normal file
View file

@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
/**
*
* @template T
* @template T2
* @param {(v: T) => Promise<T2>} fn
* @param {T} item
* @returns {Promise<PromiseSettledResult<T2>>}
*/
const settle = async (fn, item) => {
const [result] = await Promise.allSettled([(async () => await fn(item))()]);
return result;
};
/**
* @template T
* @template T2
* @param {Array<T>} source
* @param {number} limit
* @param {(v: T) => Promise<T2>} mapFn
* @returns {Promise<T2[]>}
*/
export function asyncMapWithLimit(source, limit, mapFn) {
return new Promise((resolve, reject) => {
if (limit < 1) {
reject(new Error('invalid limit, must be greater than 0'));
return;
}
let failed = false;
let inProgress = 0;
const queue = [...source.entries()];
/** @type {T2[]} */
const results = new Array(source.length);
/**
* this is run for each item, manages the inProgress state,
* calls the mapFn with that item, writes the map result to
* the result array, and calls runMore() after each item
* completes to either start another item or resolve the
* returned promise.
*
* @param {number} index
* @param {T} item
*/
function run(index, item) {
inProgress += 1;
settle(mapFn, item).then((result) => {
inProgress -= 1;
if (failed) {
return;
}
if (result.status === 'fulfilled') {
results[index] = result.value;
runMore();
return;
}
// when an error occurs we update the state to prevent
// holding onto old results and ignore future results
// from in-progress promises
failed = true;
results.length = 0;
reject(result.reason);
});
}
/**
* If there is work in the queue, schedule it, if there isn't
* any work to be scheduled and there isn't anything in progress
* then we're done. This function is called every time a mapFn
* promise resolves and once after initialization
*/
function runMore() {
if (!queue.length) {
if (inProgress === 0) {
resolve(results);
}
return;
}
while (inProgress < limit) {
const entry = queue.shift();
if (!entry) {
break;
}
run(...entry);
}
}
runMore();
});
}

View file

@ -7,9 +7,9 @@
*/
import Path from 'path';
import Fs from 'fs';
import Fsp from 'fs/promises';
import { spawnSync } from './spawn.mjs';
import { run } from './spawn.mjs';
import * as Color from './colors.mjs';
import { createCliError } from './cli_error.mjs';
import { REPO_ROOT } from './paths.mjs';
@ -125,7 +125,7 @@ export async function expungeCache(log, opts = undefined) {
export async function cleanDiskCache(log) {
const args = ['info', 'repository_cache'];
log.debug(`> bazel ${args.join(' ')}`);
const repositoryCachePath = spawnSync('bazel', args);
const repositoryCachePath = (await run('bazel', args)).trim();
await cleanPaths(log, [
Path.resolve(Path.dirname(repositoryCachePath), 'disk-cache'),
@ -171,8 +171,9 @@ export async function buildPackages(log, opts = undefined) {
* @param {string} versionFilename
* @returns
*/
function readBazelToolsVersionFile(versionFilename) {
const version = Fs.readFileSync(Path.resolve(REPO_ROOT, versionFilename), 'utf8').trim();
async function readBazelToolsVersionFile(versionFilename) {
const path = Path.resolve(REPO_ROOT, versionFilename);
const version = (await Fsp.readFile(path, 'utf8')).trim();
if (!version) {
throw new Error(
@ -186,14 +187,14 @@ function readBazelToolsVersionFile(versionFilename) {
/**
* @param {import('./log.mjs').Log} log
*/
export function tryRemovingBazeliskFromYarnGlobal(log) {
export async function tryRemovingBazeliskFromYarnGlobal(log) {
try {
log.debug('Checking if Bazelisk is installed on the yarn global scope');
const stdout = spawnSync('yarn', ['global', 'list']);
const stdout = await run('yarn', ['global', 'list']);
if (stdout.includes(`@bazel/bazelisk@`)) {
log.debug('Bazelisk was found on yarn global scope, removing it');
spawnSync('yarn', ['global', 'remove', `@bazel/bazelisk`]);
await run('yarn', ['global', 'remove', `@bazel/bazelisk`]);
log.info(`bazelisk was installed on Yarn global packages and is now removed`);
return true;
@ -208,16 +209,20 @@ export function tryRemovingBazeliskFromYarnGlobal(log) {
/**
* @param {import('./log.mjs').Log} log
*/
export function isInstalled(log) {
export async function isInstalled(log) {
try {
log.debug('getting bazel version');
const stdout = spawnSync('bazel', ['--version']).trim();
const bazelVersion = readBazelToolsVersionFile('.bazelversion');
const [stdout, bazelVersion] = await Promise.all([
run('bazel', ['--version']),
readBazelToolsVersionFile('.bazelversion'),
]);
if (stdout === `bazel ${bazelVersion}`) {
const installed = stdout.trim();
if (installed === `bazel ${bazelVersion}`) {
return true;
} else {
log.info(`Bazel is installed (${stdout}), but was expecting ${bazelVersion}`);
log.info(`Bazel is installed (${installed}), but was expecting ${bazelVersion}`);
return false;
}
} catch {
@ -228,22 +233,24 @@ export function isInstalled(log) {
/**
* @param {import('./log.mjs').Log} log
*/
export function ensureInstalled(log) {
if (isInstalled(log)) {
export async function ensureInstalled(log) {
if (await isInstalled(log)) {
return;
}
// Install bazelisk if not installed
log.debug(`reading bazel tools versions from version files`);
const bazeliskVersion = readBazelToolsVersionFile('.bazeliskversion');
const bazelVersion = readBazelToolsVersionFile('.bazelversion');
const [bazeliskVersion, bazelVersion] = await Promise.all([
readBazelToolsVersionFile('.bazeliskversion'),
readBazelToolsVersionFile('.bazelversion'),
]);
log.info(`installing Bazel tools`);
log.debug(
`bazelisk is not installed. Installing @bazel/bazelisk@${bazeliskVersion} and bazel@${bazelVersion}`
);
spawnSync('npm', ['install', '--global', `@bazel/bazelisk@${bazeliskVersion}`], {
await run('npm', ['install', '--global', `@bazel/bazelisk@${bazeliskVersion}`], {
env: {
USE_BAZEL_VERSION: bazelVersion,
},

View file

@ -6,15 +6,15 @@
* Side Public License, v 1.
*/
import Fs from 'fs';
import Fsp from 'fs/promises';
/**
* @param {string} path
* @returns {string}
* @returns {Promise<string>}
*/
export function maybeRealpath(path) {
export async function maybeRealpath(path) {
try {
return Fs.realpathSync.native(path);
return Fsp.realpath(path);
} catch (error) {
if (error.code !== 'ENOENT') {
throw error;
@ -26,11 +26,11 @@ export function maybeRealpath(path) {
/**
* @param {string} path
* @returns {boolean}
* @returns {Promise<boolean>}
*/
export function isDirectory(path) {
export async function isDirectory(path) {
try {
const stat = Fs.statSync(path);
const stat = await Fsp.stat(path);
return stat.isDirectory();
} catch (error) {
return false;
@ -39,11 +39,11 @@ export function isDirectory(path) {
/**
* @param {string} path
* @returns {boolean}
* @returns {Promise<boolean>}
*/
export function isFile(path) {
export async function isFile(path) {
try {
const stat = Fs.statSync(path);
const stat = await Fsp.stat(path);
return stat.isFile();
} catch (error) {
return false;

View file

@ -15,6 +15,115 @@ import { indent } from './indent.mjs';
/** @typedef {{ cwd?: string, env?: Record<string, string> }} SpawnOpts */
/**
* @param {NodeJS.ReadableStream} readable
*/
function getLines(readable) {
return Readline.createInterface({
input: readable,
crlfDelay: Infinity,
});
}
/**
* Wait for the exit of a child process, if the process emits "error" the promise
* will reject, if it emits "exit" the promimse will resolve with the exit code or `null`
* @param {ChildProcess.ChildProcess} proc
* @returns {Promise<number | null>}
*/
function getExit(proc) {
return new Promise((resolve, reject) => {
/**
* @param {Error | null} err
* @param {number | null} code
*/
function teardown(err = null, code = null) {
proc.removeListener('error', onError);
proc.removeListener('exit', onExit);
if (err) {
reject(err);
} else {
resolve(code);
}
}
/**
*
* @param {Error} err
*/
function onError(err) {
teardown(err);
}
/**
* @param {number | null} code
* @param {string | null} signal
*/
function onExit(code, signal) {
teardown(null, typeof signal === 'string' || typeof code !== 'number' ? null : code);
}
proc.on('error', onError);
proc.on('exit', onExit);
});
}
/**
* Print each line of output to the console
* @param {NodeJS.ReadableStream} readable
* @param {string | undefined} prefix
*/
async function printLines(readable, prefix) {
for await (const line of getLines(readable)) {
console.log(prefix ? `${prefix} ${line}` : line);
}
}
/**
* @param {NodeJS.ReadableStream} readable
* @param {string[]} output
*/
async function read(readable, output) {
for await (const line of getLines(readable)) {
output.push(line);
}
}
/**
* Run a child process and return it's stdout
* @param {string} cmd
* @param {string[]} args
* @param {undefined | (SpawnOpts & { description?: string })} opts
*/
export async function run(cmd, args, opts = undefined) {
const proc = ChildProcess.spawn(cmd === 'node' ? process.execPath : cmd, args, {
cwd: opts?.cwd ?? REPO_ROOT,
env: {
...process.env,
...opts?.env,
},
});
/** @type {string[]} */
const output = [];
const [, , exitCode] = await Promise.all([
read(proc.stdout, output),
read(proc.stderr, output),
getExit(proc),
]);
if (typeof exitCode === 'number' && exitCode > 0) {
throw createCliError(
`[${opts?.description ?? cmd}] exitted with ${exitCode}:\n` +
` output:\n${indent(4, output.join('\n'))}`
);
}
return output.join('\n');
}
/**
* Run a child process and return it's stdout
* @param {string} cmd
@ -48,22 +157,6 @@ export function spawnSync(cmd, args, opts = undefined) {
return result.stdout;
}
/**
* Print each line of output to the console
* @param {import('stream').Readable} stream
* @param {string | undefined} prefix
*/
async function printLines(stream, prefix) {
const int = Readline.createInterface({
input: stream,
crlfDelay: Infinity,
});
for await (const line of int) {
console.log(prefix ? `${prefix} ${line}` : line);
}
}
/**
* @param {import('events').EventEmitter} emitter
* @param {string} event