[kbn-es] convert cluster.js to typescript (#165676)

## Summary

Converting cluster.js to typescript with small logic adjustment. It
should improve readability and simplify maintenance:
keep the following cluster properties solely for stateful
source/snapshot ES instance:
- process
- outcome
- setupPromise
- stdioTarget

Use `serverlessNodes` prop to check if serverless ES was started in
`runServerless/stop/kill`

Remove return statement for `runDocker` & `runDockerContainer` as the
result (e.g. pid) is not used anyways, but assigning to `process` prop
lead to confusion (more in comments)



Adding more tests to increase cluster functionality coverage.
This commit is contained in:
Dzmitry Lemechko 2023-09-12 18:00:25 +02:00 committed by GitHub
parent 51569b4c93
commit b340c27a74
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 1125 additions and 759 deletions

View file

@ -14,3 +14,4 @@ export {
ELASTIC_SERVERLESS_SUPERUSER_PASSWORD,
getDockerFileMountPath,
} from './src/utils';
export type { ArtifactLicense } from './src/artifact';

View file

@ -59,7 +59,11 @@ export const archive = {
throw createCliError('you must provide a path to an ES tar file');
}
const { installPath } = await cluster.installArchive(path, options);
const { installPath } = await cluster.installArchive(path, {
basePath: options.basePath,
installPath: options.installPath,
esArgs: options.esArgs,
});
await cluster.run(installPath, {
...options,
readyTimeout: parseTimeoutToMs(options.readyTimeout),

View file

@ -11,8 +11,14 @@ import getopts from 'getopts';
import { ToolingLog } from '@kbn/tooling-log';
import { getTimeReporter } from '@kbn/ci-stats-reporter';
import { Cluster, type ServerlessOptions } from '../cluster';
import { SERVERLESS_REPO, SERVERLESS_TAG, SERVERLESS_IMG, DEFAULT_PORT } from '../utils';
import { Cluster } from '../cluster';
import {
SERVERLESS_REPO,
SERVERLESS_TAG,
SERVERLESS_IMG,
DEFAULT_PORT,
ServerlessOptions,
} from '../utils';
import { Command } from './types';
export const serverless: Command = {

View file

@ -73,16 +73,30 @@ export const snapshot: Command = {
const cluster = new Cluster({ ssl: options.ssl });
if (options['download-only']) {
await cluster.downloadSnapshot(options);
await cluster.downloadSnapshot({
version: options.version,
license: options.license,
basePath: options.basePath,
log,
useCached: options.useCached,
});
} else {
const installStartTime = Date.now();
const { installPath } = await cluster.installSnapshot(options);
const { installPath } = await cluster.installSnapshot({
version: options.version,
license: options.license,
basePath: options.basePath,
log,
useCached: options.useCached,
password: options.password,
esArgs: options.esArgs,
});
if (options.dataArchive) {
await cluster.extractDataDirectory(installPath, options.dataArchive);
}
if (options.plugins) {
await cluster.installPlugins(installPath, options.plugins, options);
await cluster.installPlugins(installPath, options.plugins, options.esJavaOpts);
}
if (typeof options.secureFiles === 'string' && options.secureFiles) {
const pairs = options.secureFiles

View file

@ -60,13 +60,20 @@ export const source: Command = {
});
const cluster = new Cluster({ ssl: options.ssl });
const { installPath } = await cluster.installSource(options);
const { installPath } = await cluster.installSource({
sourcePath: options.sourcePath,
license: options.license,
password: options.password,
basePath: options.basePath,
installPath: options.installPath,
esArgs: options.esArgs,
});
if (options.dataArchive) {
await cluster.extractDataDirectory(installPath, options.dataArchive);
}
if (options.plugins) {
await cluster.installPlugins(installPath, options.plugins, options);
await cluster.installPlugins(installPath, options.plugins, options.esJavaOpts);
}
if (typeof options.secureFiles === 'string' && options.secureFiles) {
const pairs = options.secureFiles.split(',').map((kv) => kv.split('=').map((v) => v.trim()));

View file

@ -6,42 +6,49 @@
* Side Public License, v 1.
*/
const fs = require('fs');
const fsp = require('fs/promises');
const execa = require('execa');
const chalk = require('chalk');
const path = require('path');
const Rx = require('rxjs');
const { Client } = require('@elastic/elasticsearch');
const { downloadSnapshot, installSnapshot, installSource, installArchive } = require('./install');
const { ES_BIN, ES_PLUGIN_BIN, ES_KEYSTORE_BIN } = require('./paths');
const {
import fs from 'fs';
import fsp from 'fs/promises';
import chalk from 'chalk';
import * as path from 'path';
import execa from 'execa';
import { Readable } from 'stream';
import Rx from 'rxjs';
import { Client } from '@elastic/elasticsearch';
import { promisify } from 'util';
import { CA_CERT_PATH, ES_NOPASSWORD_P12_PATH, extract } from '@kbn/dev-utils';
import { ToolingLog } from '@kbn/tooling-log';
import treeKill from 'tree-kill';
import { downloadSnapshot, installSnapshot, installSource, installArchive } from './install';
import { ES_BIN, ES_PLUGIN_BIN, ES_KEYSTORE_BIN } from './paths';
import {
DockerOptions,
extractConfigFiles,
log: defaultLog,
log as defaultLog,
NativeRealm,
parseEsLog,
parseTimeoutToMs,
runDockerContainer,
runServerlessCluster,
ServerlessOptions,
stopServerlessCluster,
teardownServerlessClusterSync,
} = require('./utils');
const { createCliError } = require('./errors');
const { promisify } = require('util');
const treeKillAsync = promisify(require('tree-kill'));
const { parseSettings, SettingsFilter } = require('./settings');
const { CA_CERT_PATH, ES_NOPASSWORD_P12_PATH, extract } = require('@kbn/dev-utils');
} from './utils';
import { createCliError } from './errors';
const treeKillAsync = promisify<number, string>(treeKill);
import { parseSettings, SettingsFilter } from './settings';
import { EsClusterExecOptions } from './cluster_exec_options';
import {
DownloadSnapshotOptions,
InstallArchiveOptions,
InstallSnapshotOptions,
InstallSourceOptions,
} from './install/types';
const DEFAULT_READY_TIMEOUT = parseTimeoutToMs('1m');
/** @typedef {import('./cluster_exec_options').EsClusterExecOptions} ExecOptions */
/** @typedef {import('./utils').DockerOptions} DockerOptions */
/** @typedef {import('./utils').ServerlessOptions}ServerlessOptions */
const DEFAULT_READY_TIMEOUT = 60 * 1000;
// listen to data on stream until map returns anything but undefined
const first = (stream, map) =>
const first = (stream: Readable, map: (data: Buffer) => string | true | undefined) =>
new Promise((resolve) => {
const onData = (data) => {
const onData = (data: any) => {
const result = map(data);
if (result !== undefined) {
resolve(result);
@ -51,61 +58,66 @@ const first = (stream, map) =>
stream.on('data', onData);
});
exports.Cluster = class Cluster {
interface StopOptions {
gracefully: boolean;
}
export class Cluster {
private log: ToolingLog;
private ssl: boolean;
private stopCalled: boolean;
private process: execa.ExecaChildProcess | null;
private outcome: Promise<void> | null;
private serverlessNodes: string[];
private setupPromise: Promise<unknown> | null;
private stdioTarget: NodeJS.WritableStream | null;
constructor({ log = defaultLog, ssl = false } = {}) {
this._log = log.withType('@kbn/es Cluster');
this._ssl = ssl;
this.log = log.withType('@kbn/es Cluster');
this.ssl = ssl;
this.stopCalled = false;
// Serverless Elasticsearch node names, started via Docker
this.serverlessNodes = [];
// properties used exclusively for the locally started Elasticsearch cluster
this.process = null;
this.outcome = null;
this.setupPromise = null;
this.stdioTarget = null;
}
/**
* Builds and installs ES from source
*
* @param {Object} options
* @property {Array} options.installPath
* @property {Array} options.sourcePath
* @returns {Promise<{installPath}>}
*/
async installSource(options = {}) {
this._log.info(chalk.bold('Installing from source'));
return await this._log.indent(4, async () => {
const { installPath } = await installSource({ log: this._log, ...options });
async installSource(options: InstallSourceOptions) {
this.log.info(chalk.bold('Installing from source'));
return await this.log.indent(4, async () => {
const { installPath } = await installSource({ log: this.log, ...options });
return { installPath };
});
}
/**
* Download ES from a snapshot
*
* @param {Object} options
* @property {Array} options.installPath
* @property {Array} options.sourcePath
* @returns {Promise<{installPath}>}
*/
async downloadSnapshot(options = {}) {
this._log.info(chalk.bold('Downloading snapshot'));
return await this._log.indent(4, async () => {
const { installPath } = await downloadSnapshot({
log: this._log,
async downloadSnapshot(options: DownloadSnapshotOptions) {
this.log.info(chalk.bold('Downloading snapshot'));
return await this.log.indent(4, async () => {
const { downloadPath } = await downloadSnapshot({
log: this.log,
...options,
});
return { installPath };
return { downloadPath };
});
}
/**
* Download and installs ES from a snapshot
*
* @param {Object} options
* @property {Array} options.installPath
* @property {Array} options.sourcePath
* @returns {Promise<{installPath}>}
*/
async installSnapshot(options = {}) {
this._log.info(chalk.bold('Installing from snapshot'));
return await this._log.indent(4, async () => {
async installSnapshot(options: InstallSnapshotOptions) {
this.log.info(chalk.bold('Installing from snapshot'));
return await this.log.indent(4, async () => {
const { installPath } = await installSnapshot({
log: this._log,
log: this.log,
...options,
});
@ -115,18 +127,13 @@ exports.Cluster = class Cluster {
/**
* Installs ES from a local tar
*
* @param {String} path
* @param {Object} options
* @property {Array} options.installPath
* @returns {Promise<{installPath}>}
*/
async installArchive(path, options = {}) {
this._log.info(chalk.bold('Installing from an archive'));
return await this._log.indent(4, async () => {
const { installPath } = await installArchive(path, {
log: this._log,
...options,
async installArchive(archivePath: string, options?: InstallArchiveOptions) {
this.log.info(chalk.bold('Installing from an archive'));
return await this.log.indent(4, async () => {
const { installPath } = await installArchive(archivePath, {
log: this.log,
...(options || {}),
});
return { installPath };
@ -134,21 +141,16 @@ exports.Cluster = class Cluster {
}
/**
* Unpacks a tar or zip file containing the data directory for an
* ES cluster.
*
* @param {String} installPath
* @param {String} archivePath
* @param {String} [extractDirName]
* Unpacks a tar or zip file containing the data directory for an ES cluster.
*/
async extractDataDirectory(installPath, archivePath, extractDirName = 'data') {
this._log.info(chalk.bold(`Extracting data directory`));
await this._log.indent(4, async () => {
async extractDataDirectory(installPath: string, archivePath: string, extractDirName = 'data') {
this.log.info(chalk.bold(`Extracting data directory`));
await this.log.indent(4, async () => {
// stripComponents=1 excludes the root directory as that is how our archives are
// structured. This works in our favor as we can explicitly extract into the data dir
const extractPath = path.resolve(installPath, extractDirName);
this._log.info(`Data archive: ${archivePath}`);
this._log.info(`Extract path: ${extractPath}`);
this.log.info(`Data archive: ${archivePath}`);
this.log.info(`Extract path: ${extractPath}`);
await extract({
archivePath,
@ -159,30 +161,28 @@ exports.Cluster = class Cluster {
}
/**
* Starts ES and returns resolved promise once started
*
* @param {String} installPath
* @param {String} plugins - comma separated list of plugins to install
* @param {Object} options
* @returns {Promise}
* Installs comma separated list of ES plugins to the specified path
*/
async installPlugins(installPath, plugins, options) {
const esJavaOpts = this.javaOptions(options);
async installPlugins(installPath: string, plugins: string, esJavaOpts?: string) {
const javaOpts = this.getJavaOptions(esJavaOpts);
for (const plugin of plugins.split(',')) {
await execa(ES_PLUGIN_BIN, ['install', plugin.trim()], {
cwd: installPath,
env: {
JAVA_HOME: '', // By default, we want to always unset JAVA_HOME so that the bundled JDK will be used
ES_JAVA_OPTS: esJavaOpts.trim(),
ES_JAVA_OPTS: javaOpts,
},
});
}
}
async configureKeystoreWithSecureSettingsFiles(installPath, secureSettingsFiles) {
async configureKeystoreWithSecureSettingsFiles(
installPath: string,
secureSettingsFiles: string[][]
) {
const env = { JAVA_HOME: '' };
for (const [secureSettingName, secureSettingFile] of secureSettingsFiles) {
this._log.info(
this.log.info(
`setting secure setting %s to %s`,
chalk.bold(secureSettingName),
chalk.bold(secureSettingFile)
@ -196,49 +196,45 @@ exports.Cluster = class Cluster {
/**
* Starts ES and returns resolved promise once started
*
* @param {String} installPath
* @param {ExecOptions} options
* @returns {Promise<void>}
*/
async start(installPath, options = {}) {
// _exec indents and we wait for our own end condition, so reset the indent level to it's current state after we're done waiting
await this._log.indent(0, async () => {
this._exec(installPath, options);
async start(installPath: string, options: EsClusterExecOptions) {
// `exec` indents and we wait for our own end condition, so reset the indent level to it's current state after we're done waiting
await this.log.indent(0, async () => {
this.exec(installPath, options);
await Promise.race([
// wait for native realm to be setup and es to be started
Promise.all([
first(this._process.stdout, (data) => {
if (/started/.test(data)) {
first(this.process?.stdout!, (data: Buffer) => {
if (/started/.test(data.toString('utf-8'))) {
return true;
}
}),
this._setupPromise,
this.setupPromise,
]),
// await the outcome of the process in case it exits before starting
this._outcome.then(() => {
this.outcome?.then(() => {
throw createCliError('ES exited without starting');
}),
]);
});
if (options.onEarlyExit) {
this._outcome
.then(
this.outcome
?.then(
() => {
if (!this._stopCalled) {
if (!this.stopCalled && options.onEarlyExit) {
options.onEarlyExit(`ES exitted unexpectedly`);
}
},
(error) => {
if (!this._stopCalled) {
(error: Error) => {
if (!this.stopCalled && options.onEarlyExit) {
options.onEarlyExit(`ES exitted unexpectedly: ${error.stack}`);
}
}
)
.catch((error) => {
.catch((error: Error) => {
throw new Error(`failure handling early exit: ${error.stack}`);
});
}
@ -246,85 +242,75 @@ exports.Cluster = class Cluster {
/**
* Starts Elasticsearch and waits for Elasticsearch to exit
*
* @param {String} installPath
* @param {ExecOptions} options
* @returns {Promise<void>}
*/
async run(installPath, options = {}) {
// _exec indents and we wait for our own end condition, so reset the indent level to it's current state after we're done waiting
await this._log.indent(0, async () => {
this._exec(installPath, options);
async run(installPath: string, options: EsClusterExecOptions) {
// `exec` indents and we wait for our own end condition, so reset the indent level to it's current state after we're done waiting
await this.log.indent(0, async () => {
this.exec(installPath, options);
// log native realm setup errors so they aren't uncaught
this._setupPromise.catch((error) => {
this._log.error(error);
this.setupPromise?.catch((error: Error) => {
this.log.error(error);
this.stop();
});
// await the final outcome of the process
await this._outcome;
await this.outcome;
});
}
/**
* Stops ES process, if it's running
*
* @returns {Promise}
* Stops cluster
*/
async stop() {
if (this._stopCalled) {
private async stopCluster(options: StopOptions) {
if (this.stopCalled) {
return;
}
this._stopCalled = true;
this.stopCalled = true;
if (this._serverlessNodes?.length) {
return await stopServerlessCluster(this._log, this._serverlessNodes);
// Stop ES docker containers
if (this.serverlessNodes.length) {
return await stopServerlessCluster(this.log, this.serverlessNodes);
}
if (!this._process || !this._outcome) {
// Stop local ES process
if (!this.process || !this.outcome) {
throw new Error('ES has not been started');
}
await treeKillAsync(this._process.pid);
const pid = this.process.pid;
await this._outcome;
if (pid) {
await treeKillAsync(pid, options.gracefully ? 'SIGTERM' : 'SIGKILL');
} else {
throw Error(`ES process pid is not defined, can't stop it`);
}
await this.outcome;
}
/**
* Stops ES process, it it's running, without waiting for it to shutdown gracefully
* Stops ES process, if it's running
*/
async stop() {
await this.stopCluster({ gracefully: true });
}
/**
* Stops ES process without waiting for it to shutdown gracefully
*/
async kill() {
if (this._stopCalled) {
return;
}
this._stopCalled;
if (this._serverlessNodes?.length) {
return await stopServerlessCluster(this._log, this._serverlessNodes);
}
if (!this._process || !this._outcome) {
throw new Error('ES has not been started');
}
await treeKillAsync(this._process.pid, 'SIGKILL');
await this._outcome;
await this.stopCluster({ gracefully: false });
}
/**
* Common logic from this.start() and this.run()
*
* Start the elasticsearch process (stored at `this._process`)
* and "pipe" its stdio to `this._log`. Also create `this._outcome`
* Start the Elasticsearch process (stored at `this.process`)
* and "pipe" its stdio to `this.log`. Also create `this.outcome`
* which will be resolved/rejected when the process exits.
*
* @private
* @param {String} installPath
* @param {ExecOptions} opts
*/
_exec(installPath, opts = {}) {
private exec(installPath: string, opts: EsClusterExecOptions) {
const {
skipNativeRealmSetup = false,
reportTime = () => {},
@ -335,24 +321,21 @@ exports.Cluster = class Cluster {
...options
} = opts;
if (this._process || this._outcome) {
if (this.process || this.outcome) {
throw new Error('ES has already been started');
}
/** @type {NodeJS.WritableStream | undefined} */
let stdioTarget;
if (writeLogsToPath) {
stdioTarget = fs.createWriteStream(writeLogsToPath, 'utf8');
this._log.info(
this.stdioTarget = fs.createWriteStream(writeLogsToPath, 'utf8');
this.log.info(
chalk.bold('Starting'),
`and writing logs to ${path.relative(process.cwd(), writeLogsToPath)}`
`and writing logs to ${path.resolve(process.cwd(), writeLogsToPath)}`
);
} else {
this._log.info(chalk.bold('Starting'));
this.log.info(chalk.bold('Starting'));
}
this._log.indent(4);
this.log.indent(4);
const esArgs = new Map([
['action.destructive_requires_name', 'true'],
@ -362,13 +345,18 @@ exports.Cluster = class Cluster {
]);
// options.esArgs overrides the default esArg values
for (const arg of [].concat(options.esArgs || [])) {
const _esArgs = options.esArgs
? Array.isArray(options.esArgs)
? options.esArgs
: [options.esArgs]
: [];
for (const arg of _esArgs) {
const [key, ...value] = arg.split('=');
esArgs.set(key.trim(), value.join('=').trim());
}
// Add to esArgs if ssl is enabled
if (this._ssl) {
if (this.ssl) {
esArgs.set('xpack.security.http.ssl.enabled', 'true');
// Include default keystore settings only if ssl isn't disabled by esArgs and keystore isn't configured.
if (!esArgs.get('xpack.security.http.ssl.keystore.path')) {
@ -383,22 +371,23 @@ exports.Cluster = class Cluster {
extractConfigFiles(
Array.from(esArgs).map((e) => e.join('=')),
installPath,
{ log: this._log }
{ log: this.log }
),
{
filter: SettingsFilter.NonSecureOnly,
}
).reduce(
(acc, [settingName, settingValue]) => acc.concat(['-E', `${settingName}=${settingValue}`]),
(acc: string[], [settingName, settingValue]) =>
acc.concat(['-E', `${settingName}=${settingValue}`]),
[]
);
this._log.info('%s %s', ES_BIN, args.join(' '));
const esJavaOpts = this.javaOptions(options);
this.log.info('%s %s', ES_BIN, args.join(' '));
const esJavaOpts = this.getJavaOptions(options.esJavaOpts);
this._log.info('ES_JAVA_OPTS: %s', esJavaOpts);
this.log.info('ES_JAVA_OPTS: %s', esJavaOpts);
this._process = execa(ES_BIN, args, {
this.process = execa(ES_BIN, args, {
cwd: installPath,
env: {
...(installPath ? { ES_TMPDIR: path.resolve(installPath, 'ES_TMPDIR') } : {}),
@ -409,9 +398,9 @@ exports.Cluster = class Cluster {
stdio: ['ignore', 'pipe', 'pipe'],
});
this._setupPromise = Promise.all([
this.setupPromise = Promise.all([
// parse log output to find http port
first(this._process.stdout, (data) => {
first(this.process.stdout!, (data: Buffer) => {
const match = data.toString('utf8').match(/HttpServer.+publish_address {[0-9.]+:([0-9]+)/);
if (match) {
@ -420,13 +409,13 @@ exports.Cluster = class Cluster {
}),
// load the CA cert from disk if necessary
this._ssl ? fsp.readFile(CA_CERT_PATH) : null,
this.ssl ? fsp.readFile(CA_CERT_PATH) : null,
]).then(async ([port, caCert]) => {
const client = new Client({
node: `${caCert ? 'https:' : 'http:'}//localhost:${port}`,
auth: {
username: 'elastic',
password: options.password,
password: options.password!,
},
tls: caCert
? {
@ -437,13 +426,13 @@ exports.Cluster = class Cluster {
});
if (!skipReadyCheck) {
await this._waitForClusterReady(client, readyTimeout);
await this.waitForClusterReady(client, readyTimeout);
}
// once the cluster is ready setup the native realm
if (!skipNativeRealmSetup) {
const nativeRealm = new NativeRealm({
log: this._log,
log: this.log,
elasticPassword: options.password,
client,
});
@ -451,12 +440,12 @@ exports.Cluster = class Cluster {
await nativeRealm.setPasswords(options);
}
this._log.success('kbn/es setup complete');
this.log.success('kbn/es setup complete');
});
let reportSent = false;
// parse and forward es stdout to the log
this._process.stdout.on('data', (data) => {
this.process.stdout!.on('data', (data) => {
const chunk = data.toString();
const lines = parseEsLog(chunk);
lines.forEach((line) => {
@ -467,40 +456,40 @@ exports.Cluster = class Cluster {
});
}
if (stdioTarget) {
stdioTarget.write(chunk);
if (this.stdioTarget) {
this.stdioTarget.write(chunk);
} else {
this._log.info(line.formattedMessage);
this.log.info(line.formattedMessage);
}
});
});
// forward es stderr to the log
this._process.stderr.on('data', (data) => {
this.process.stderr!.on('data', (data) => {
const chunk = data.toString();
if (stdioTarget) {
stdioTarget.write(chunk);
if (this.stdioTarget) {
this.stdioTarget.write(chunk);
} else {
this._log.error(chalk.red(chunk.trim()));
this.log.error(chalk.red(chunk.trim()));
}
});
// close the stdio target if we have one defined
if (stdioTarget) {
if (this.stdioTarget) {
Rx.combineLatest([
Rx.fromEvent(this._process.stderr, 'end'),
Rx.fromEvent(this._process.stdout, 'end'),
Rx.fromEvent(this.process.stderr!, 'end'),
Rx.fromEvent(this.process.stdout!, 'end'),
])
.pipe(Rx.first())
.subscribe(() => {
stdioTarget.end();
this.stdioTarget?.end();
});
}
// observe the exit code of the process and reflect in _outcome promises
const exitCode = new Promise((resolve) => this._process.once('exit', resolve));
this._outcome = exitCode.then((code) => {
if (this._stopCalled) {
// observe the exit code of the process and reflect in `this.outcome` promises
const exitCode: Promise<number> = new Promise((resolve) => this.process?.once('exit', resolve));
this.outcome = exitCode.then((code) => {
if (this.stopCalled) {
return;
}
@ -520,11 +509,11 @@ exports.Cluster = class Cluster {
});
}
async _waitForClusterReady(client, readyTimeout = DEFAULT_READY_TIMEOUT) {
async waitForClusterReady(client: Client, readyTimeout = DEFAULT_READY_TIMEOUT) {
let attempt = 0;
const start = Date.now();
this._log.info('waiting for ES cluster to report a yellow or green status');
this.log.info('waiting for ES cluster to report a yellow or green status');
while (true) {
attempt += 1;
@ -545,10 +534,10 @@ exports.Cluster = class Cluster {
if (error.message.startsWith('not ready,')) {
if (timeSinceStart > 10_000) {
this._log.warning(error.message);
this.log.warning(error.message);
}
} else {
this._log.warning(
this.log.warning(
`waiting for ES cluster to come online, attempt ${attempt} failed with: ${error.message}`
);
}
@ -559,9 +548,8 @@ exports.Cluster = class Cluster {
}
}
javaOptions(options) {
let esJavaOpts = `${options.esJavaOpts || ''} ${process.env.ES_JAVA_OPTS || ''}`;
private getJavaOptions(opts: string | undefined) {
let esJavaOpts = `${opts || ''} ${process.env.ES_JAVA_OPTS || ''}`;
// ES now automatically sets heap size to 50% of the machine's available memory
// so we need to set it to a smaller size for local dev and CI
// especially because we currently run many instances of ES on the same machine during CI
@ -574,36 +562,38 @@ exports.Cluster = class Cluster {
}
/**
* Run an Elasticsearch Serverless Docker cluster
*
* @param {ServerlessOptions} options
* Runs an Elasticsearch Serverless Docker cluster and returns node names
*/
async runServerless(options = {}) {
if (this._process || this._outcome) {
throw new Error('ES has already been started');
async runServerless(options: ServerlessOptions) {
if (this.process || this.outcome) {
throw new Error('ES stateful cluster has already been started');
}
this._serverlessNodes = await runServerlessCluster(this._log, options);
if (this.serverlessNodes.length > 0) {
throw new Error('ES serverless docker cluster has already been started');
}
this.serverlessNodes = await runServerlessCluster(this.log, options);
if (options.teardown) {
/**
* Ideally would be async and an event like beforeExit or SIGINT,
* but those events are not being triggered in FTR child process.
*/
process.on('exit', () => teardownServerlessClusterSync(this._log, options));
process.on('exit', () => teardownServerlessClusterSync(this.log, options));
}
return this.serverlessNodes;
}
/**
* Run an Elasticsearch Docker container
*
* @param {DockerOptions} options
*/
async runDocker(options = {}) {
if (this._process || this._outcome) {
throw new Error('ES has already been started');
async runDocker(options: DockerOptions) {
if (this.process || this.outcome) {
throw new Error('ES stateful cluster has already been started');
}
this._process = await runDockerContainer(this._log, options);
await runDockerContainer(this.log, options);
}
};
}

View file

@ -19,15 +19,7 @@ import { BASE_PATH, ES_CONFIG, ES_KEYSTORE_BIN } from '../paths';
import { Artifact } from '../artifact';
import { parseSettings, SettingsFilter } from '../settings';
import { log as defaultLog } from '../utils/log';
interface InstallArchiveOptions {
license?: string;
password?: string;
basePath?: string;
installPath?: string;
log?: ToolingLog;
esArgs?: string[];
}
import { InstallArchiveOptions } from './types';
const isHttpUrl = (str: string) => {
try {
@ -40,7 +32,7 @@ const isHttpUrl = (str: string) => {
/**
* Extracts an ES archive and optionally installs plugins
*/
export async function installArchive(archive: string, options: InstallArchiveOptions = {}) {
export async function installArchive(archive: string, options?: InstallArchiveOptions) {
const {
license = 'basic',
password = 'changeme',
@ -48,7 +40,7 @@ export async function installArchive(archive: string, options: InstallArchiveOpt
installPath = path.resolve(basePath, path.basename(archive, '.tar.gz')),
log = defaultLog,
esArgs = [],
} = options;
} = options || {};
let dest = archive;
if (isHttpUrl(archive)) {

View file

@ -7,23 +7,13 @@
*/
import path from 'path';
import chalk from 'chalk';
import { ToolingLog } from '@kbn/tooling-log';
import { BASE_PATH } from '../paths';
import { installArchive } from './install_archive';
import { log as defaultLog } from '../utils/log';
import { Artifact, ArtifactLicense } from '../artifact';
interface DownloadSnapshotOptions {
version: string;
license?: ArtifactLicense;
basePath?: string;
installPath?: string;
log?: ToolingLog;
useCached?: boolean;
}
import { Artifact } from '../artifact';
import { DownloadSnapshotOptions, InstallSnapshotOptions } from './types';
/**
* Download an ES snapshot
@ -49,11 +39,6 @@ export async function downloadSnapshot({
};
}
interface InstallSnapshotOptions extends DownloadSnapshotOptions {
password?: string;
esArgs?: string[];
}
/**
* Installs ES from snapshot
*/

View file

@ -20,16 +20,7 @@ import { log as defaultLog } from '../utils/log';
import { cache } from '../utils/cache';
import { buildSnapshot, archiveForPlatform } from '../utils/build_snapshot';
import { BASE_PATH } from '../paths';
interface InstallSourceOptions {
sourcePath: string;
license?: string;
password?: string;
basePath?: string;
installPath?: string;
log?: ToolingLog;
esArgs?: string[];
}
import { InstallSourceOptions } from './types';
/**
* Installs ES from source

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ToolingLog } from '@kbn/tooling-log';
import type { ArtifactLicense } from '../artifact';
export interface InstallSourceOptions {
sourcePath: string;
license?: ArtifactLicense;
password?: string;
basePath?: string;
installPath?: string;
log?: ToolingLog;
esArgs?: string[];
}
export interface DownloadSnapshotOptions {
version: string;
license?: ArtifactLicense;
basePath?: string;
installPath?: string;
log?: ToolingLog;
useCached?: boolean;
}
export interface InstallSnapshotOptions extends DownloadSnapshotOptions {
password?: string;
esArgs?: string[];
}
export interface InstallArchiveOptions {
license?: ArtifactLicense;
password?: string;
basePath?: string;
installPath?: string;
log?: ToolingLog;
esArgs?: string[];
}

View file

@ -1,490 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
const { ES_NOPASSWORD_P12_PATH } = require('@kbn/dev-utils');
const { ToolingLog, ToolingLogCollectingWriter } = require('@kbn/tooling-log');
const { createAnyInstanceSerializer, createStripAnsiSerializer } = require('@kbn/jest-serializers');
const execa = require('execa');
const { Cluster } = require('../cluster');
const { installSource, installSnapshot, installArchive } = require('../install');
const { extractConfigFiles } = require('../utils/extract_config_files');
expect.addSnapshotSerializer(createAnyInstanceSerializer(ToolingLog));
expect.addSnapshotSerializer(createStripAnsiSerializer());
jest.mock('../install', () => ({
installSource: jest.fn(),
installSnapshot: jest.fn(),
installArchive: jest.fn(),
}));
jest.mock('execa', () => jest.fn());
jest.mock('../utils/extract_config_files', () => ({
extractConfigFiles: jest.fn(),
}));
const log = new ToolingLog();
const logWriter = new ToolingLogCollectingWriter();
log.setWriters([logWriter]);
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function ensureNoResolve(promise) {
await Promise.race([
sleep(100),
promise.then(() => {
throw new Error('promise was not supposed to resolve');
}),
]);
}
async function ensureResolve(promise) {
return await Promise.race([
promise,
sleep(100).then(() => {
throw new Error('promise was supposed to resolve with installSource() resolution');
}),
]);
}
function mockEsBin({ exitCode, start }) {
execa.mockImplementationOnce((cmd, args, options) =>
jest.requireActual('execa')(
process.execPath,
[
'--require=@kbn/babel-register/install',
require.resolve('./__fixtures__/es_bin.js'),
JSON.stringify({
exitCode,
start,
ssl: args.includes('xpack.security.http.ssl.enabled=true'),
}),
],
options
)
);
}
const initialEnv = { ...process.env };
beforeEach(() => {
jest.resetAllMocks();
extractConfigFiles.mockImplementation((config) => config);
log.indent(-log.getIndent());
logWriter.messages.length = 0;
});
afterEach(() => {
process.env = { ...initialEnv };
});
describe('#installSource()', () => {
it('awaits installSource() promise and returns { installPath }', async () => {
let resolveInstallSource;
installSource.mockImplementationOnce(
() =>
new Promise((resolve) => {
resolveInstallSource = () => {
resolve({ installPath: 'foo' });
};
})
);
const cluster = new Cluster({ log });
const promise = cluster.installSource();
await ensureNoResolve(promise);
resolveInstallSource();
await expect(ensureResolve(promise)).resolves.toEqual({
installPath: 'foo',
});
});
it('passes through all options+log to installSource()', async () => {
installSource.mockResolvedValue({});
const cluster = new Cluster({ log });
await cluster.installSource({ foo: 'bar' });
expect(installSource.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
Object {
"foo": "bar",
"log": <ToolingLog>,
},
],
]
`);
expect(logWriter.messages).toMatchInlineSnapshot(`
Array [
" info source[@kbn/es Cluster] Installing from source",
]
`);
});
it('rejects if installSource() rejects', async () => {
installSource.mockRejectedValue(new Error('foo'));
const cluster = new Cluster({ log });
await expect(cluster.installSource()).rejects.toThrowError('foo');
});
});
describe('#installSnapshot()', () => {
it('awaits installSnapshot() promise and returns { installPath }', async () => {
let resolveInstallSnapshot;
installSnapshot.mockImplementationOnce(
() =>
new Promise((resolve) => {
resolveInstallSnapshot = () => {
resolve({ installPath: 'foo' });
};
})
);
const cluster = new Cluster({ log });
const promise = cluster.installSnapshot();
await ensureNoResolve(promise);
resolveInstallSnapshot();
await expect(ensureResolve(promise)).resolves.toEqual({
installPath: 'foo',
});
});
it('passes through all options+log to installSnapshot()', async () => {
installSnapshot.mockResolvedValue({});
const cluster = new Cluster({ log });
await cluster.installSnapshot({ foo: 'bar' });
expect(installSnapshot.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
Object {
"foo": "bar",
"log": <ToolingLog>,
},
],
]
`);
expect(logWriter.messages).toMatchInlineSnapshot(`
Array [
" info source[@kbn/es Cluster] Installing from snapshot",
]
`);
});
it('rejects if installSnapshot() rejects', async () => {
installSnapshot.mockRejectedValue(new Error('foo'));
const cluster = new Cluster({ log });
await expect(cluster.installSnapshot()).rejects.toThrowError('foo');
});
});
describe('#installArchive(path)', () => {
it('awaits installArchive() promise and returns { installPath }', async () => {
let resolveInstallArchive;
installArchive.mockImplementationOnce(
() =>
new Promise((resolve) => {
resolveInstallArchive = () => {
resolve({ installPath: 'foo' });
};
})
);
const cluster = new Cluster({ log });
const promise = cluster.installArchive();
await ensureNoResolve(promise);
resolveInstallArchive();
await expect(ensureResolve(promise)).resolves.toEqual({
installPath: 'foo',
});
});
it('passes through path and all options+log to installArchive()', async () => {
installArchive.mockResolvedValue({});
const cluster = new Cluster({ log });
await cluster.installArchive('path', { foo: 'bar' });
expect(installArchive.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
"path",
Object {
"foo": "bar",
"log": <ToolingLog>,
},
],
]
`);
expect(logWriter.messages).toMatchInlineSnapshot(`
Array [
" info source[@kbn/es Cluster] Installing from an archive",
]
`);
});
it('rejects if installArchive() rejects', async () => {
installArchive.mockRejectedValue(new Error('foo'));
const cluster = new Cluster({ log });
await expect(cluster.installArchive()).rejects.toThrowError('foo');
});
});
describe('#start(installPath)', () => {
it('rejects when bin/elasticsearch exists with 0 before starting', async () => {
mockEsBin({ exitCode: 0, start: false });
await expect(new Cluster({ log }).start()).rejects.toThrowError('ES exited without starting');
});
it('rejects when bin/elasticsearch exists with 143 before starting', async () => {
mockEsBin({ exitCode: 143, start: false });
await expect(new Cluster({ log }).start()).rejects.toThrowError('ES exited without starting');
});
it('rejects when bin/elasticsearch exists with 130 before starting', async () => {
mockEsBin({ exitCode: 130, start: false });
await expect(new Cluster({ log }).start()).rejects.toThrowError('ES exited without starting');
});
it('rejects when bin/elasticsearch exists with 1 before starting', async () => {
mockEsBin({ exitCode: 1, start: false });
await expect(new Cluster({ log }).start()).rejects.toThrowError('ES exited with code 1');
});
it('resolves when bin/elasticsearch logs "started"', async () => {
mockEsBin({ start: true });
await new Cluster({ log }).start();
});
it('rejects if #start() was called previously', async () => {
mockEsBin({ start: true });
const cluster = new Cluster({ log });
await cluster.start();
await expect(cluster.start()).rejects.toThrowError('ES has already been started');
});
it('rejects if #run() was called previously', async () => {
mockEsBin({ start: true });
const cluster = new Cluster({ log });
await cluster.run();
await expect(cluster.start()).rejects.toThrowError('ES has already been started');
});
it('sets up SSL when enabled', async () => {
mockEsBin({ start: true, ssl: true });
const cluster = new Cluster({ log, ssl: true });
await cluster.start();
const config = extractConfigFiles.mock.calls[0][0];
expect(config).toContain('xpack.security.http.ssl.enabled=true');
expect(config).toContain(`xpack.security.http.ssl.keystore.path=${ES_NOPASSWORD_P12_PATH}`);
expect(config).toContain(`xpack.security.http.ssl.keystore.type=PKCS12`);
});
it(`doesn't setup SSL when disabled`, async () => {
mockEsBin({ start: true });
extractConfigFiles.mockReturnValueOnce([]);
const cluster = new Cluster({ log, ssl: false });
await cluster.start();
expect(extractConfigFiles.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
Array [
"action.destructive_requires_name=true",
"cluster.routing.allocation.disk.threshold_enabled=false",
"ingest.geoip.downloader.enabled=false",
"search.check_ccs_compatibility=true",
],
undefined,
Object {
"log": <ToolingLog>,
},
],
]
`);
});
it(`allows overriding search.check_ccs_compatibility`, async () => {
mockEsBin({ start: true });
extractConfigFiles.mockReturnValueOnce([]);
const cluster = new Cluster({
log,
ssl: false,
});
await cluster.start(undefined, {
esArgs: ['search.check_ccs_compatibility=false'],
});
expect(extractConfigFiles.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
Array [
"action.destructive_requires_name=true",
"cluster.routing.allocation.disk.threshold_enabled=false",
"ingest.geoip.downloader.enabled=false",
"search.check_ccs_compatibility=false",
],
undefined,
Object {
"log": <ToolingLog>,
},
],
]
`);
});
});
describe('#run()', () => {
it('resolves when bin/elasticsearch exists with 0', async () => {
mockEsBin({ exitCode: 0 });
await new Cluster({ log }).run();
});
it('resolves when bin/elasticsearch exists with 143', async () => {
mockEsBin({ exitCode: 143 });
await new Cluster({ log }).run();
});
it('resolves when bin/elasticsearch exists with 130', async () => {
mockEsBin({ exitCode: 130 });
await new Cluster({ log }).run();
});
it('rejects when bin/elasticsearch exists with 1', async () => {
mockEsBin({ exitCode: 1 });
await expect(new Cluster({ log }).run()).rejects.toThrowError('ES exited with code 1');
});
it('rejects if #start() was called previously', async () => {
mockEsBin({ exitCode: 0, start: true });
const cluster = new Cluster({ log });
await cluster.start();
await expect(cluster.run()).rejects.toThrowError('ES has already been started');
});
it('rejects if #run() was called previously', async () => {
mockEsBin({ exitCode: 0 });
const cluster = new Cluster({ log });
await cluster.run();
await expect(cluster.run()).rejects.toThrowError('ES has already been started');
});
it('sets up SSL when enabled', async () => {
mockEsBin({ start: true, ssl: true });
const cluster = new Cluster({ log, ssl: true });
await cluster.run();
const config = extractConfigFiles.mock.calls[0][0];
expect(config).toContain('xpack.security.http.ssl.enabled=true');
expect(config).toContain(`xpack.security.http.ssl.keystore.path=${ES_NOPASSWORD_P12_PATH}`);
expect(config).toContain(`xpack.security.http.ssl.keystore.type=PKCS12`);
});
it(`doesn't setup SSL when disabled`, async () => {
mockEsBin({ start: true });
extractConfigFiles.mockReturnValueOnce([]);
const cluster = new Cluster({ log, ssl: false });
await cluster.run();
expect(extractConfigFiles.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
Array [
"action.destructive_requires_name=true",
"cluster.routing.allocation.disk.threshold_enabled=false",
"ingest.geoip.downloader.enabled=false",
"search.check_ccs_compatibility=true",
],
undefined,
Object {
"log": <ToolingLog>,
},
],
]
`);
});
it('sets default Java heap', async () => {
mockEsBin({ start: true });
const cluster = new Cluster({ log });
await cluster.run();
expect(execa.mock.calls[0][2].env.ES_JAVA_OPTS).toMatchInlineSnapshot(`"-Xms1536m -Xmx1536m"`);
});
it('allows Java heap to be overwritten', async () => {
mockEsBin({ start: true });
process.env.ES_JAVA_OPTS = '-Xms5g -Xmx5g';
const cluster = new Cluster({ log });
await cluster.run();
expect(execa.mock.calls[0][2].env.ES_JAVA_OPTS).toMatchInlineSnapshot(`"-Xms5g -Xmx5g"`);
});
});
describe('#stop()', () => {
it('rejects if #run() or #start() was not called', async () => {
const cluster = new Cluster({ log });
await expect(cluster.stop()).rejects.toThrowError('ES has not been started');
});
it('resolves when ES exits with 0', async () => {
mockEsBin({ exitCode: 0, start: true });
const cluster = new Cluster({ log });
await cluster.start();
await cluster.stop();
});
it('resolves when ES exits with 143', async () => {
mockEsBin({ exitCode: 143, start: true });
const cluster = new Cluster({ log });
await cluster.start();
await cluster.stop();
});
it('resolves when ES exits with 130', async () => {
mockEsBin({ exitCode: 130, start: true });
const cluster = new Cluster({ log });
await cluster.start();
await cluster.stop();
});
it('rejects when ES exits with 1', async () => {
mockEsBin({ exitCode: 1, start: true });
const cluster = new Cluster({ log });
await expect(cluster.run()).rejects.toThrowError('ES exited with code 1');
await expect(cluster.stop()).rejects.toThrowError('ES exited with code 1');
});
});

View file

@ -0,0 +1,815 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ToolingLog, ToolingLogCollectingWriter } from '@kbn/tooling-log';
import * as extractConfig from '../utils/extract_config_files';
import * as dockerUtils from '../utils/docker';
import { createAnyInstanceSerializer, createStripAnsiSerializer } from '@kbn/jest-serializers';
import * as installUtils from '../install';
import { Cluster } from '../cluster';
import { ES_NOPASSWORD_P12_PATH } from '@kbn/dev-utils/src/certs';
import {
DownloadSnapshotOptions,
InstallArchiveOptions,
InstallSnapshotOptions,
InstallSourceOptions,
} from '../install/types';
expect.addSnapshotSerializer(createAnyInstanceSerializer(ToolingLog));
expect.addSnapshotSerializer(createStripAnsiSerializer());
const log = new ToolingLog();
const logWriter = new ToolingLogCollectingWriter();
log.setWriters([logWriter]);
const KIBANA_ROOT = process.cwd();
const installPath = `${KIBANA_ROOT}/.es`;
const esClusterExecOptions = {};
const initialEnv = { ...process.env };
const sleep = (ms: number) => {
return new Promise((resolve) => setTimeout(resolve, ms));
};
const ensureResolve = async (promise: Promise<unknown>, name: string) => {
return await Promise.race([
promise,
sleep(100).then(() => {
throw new Error(`promise was supposed to resolve with ${name} resolution`);
}),
]);
};
const ensureNoResolve = async (promise: Promise<unknown>) => {
await Promise.race([
sleep(100),
promise.then(() => {
throw new Error('promise was not supposed to resolve');
}),
]);
};
jest.mock('execa');
const execaMock = jest.requireMock('execa');
const mockEsBin = (
{
exitCode,
start,
ssl,
}: {
exitCode?: number;
start?: boolean;
ssl?: boolean;
} = { start: false, ssl: false }
) => {
execaMock.mockImplementationOnce((args: string[], options: {}) =>
jest.requireActual('execa')(
process.execPath,
[
'--require=@kbn/babel-register/install',
require.resolve('./__fixtures__/es_bin.js'),
JSON.stringify({
exitCode,
start,
ssl: ssl || args.includes('xpack.security.http.ssl.enabled=true'),
}),
],
options
)
);
};
jest.mock('../install', () => ({
downloadSnapshot: jest.fn(),
installSource: jest.fn(),
installSnapshot: jest.fn(),
installArchive: jest.fn(),
}));
jest.mock('../utils/extract_config_files', () => ({
extractConfigFiles: jest.fn(),
}));
jest.mock('../utils/docker', () => ({
runServerlessCluster: jest.fn(),
runDockerContainer: jest.fn(),
}));
const downloadSnapshotMock = jest.spyOn(installUtils, 'downloadSnapshot');
const installSourceMock = jest.spyOn(installUtils, 'installSource');
const installSnapshotMock = jest.spyOn(installUtils, 'installSnapshot');
const installArchiveMock = jest.spyOn(installUtils, 'installArchive');
const extractConfigFilesMock = jest.spyOn(extractConfig, 'extractConfigFiles');
const runServerlessClusterMock = jest.spyOn(dockerUtils, 'runServerlessCluster');
const runDockerContainerMock = jest.spyOn(dockerUtils, 'runDockerContainer');
beforeEach(() => {
jest.resetAllMocks();
extractConfigFilesMock.mockImplementation((config) =>
Array.isArray(config) ? config : [config]
);
log.indent(-log.getIndent());
logWriter.messages.length = 0;
});
afterEach(() => {
process.env = { ...initialEnv };
});
describe('#downloadSnapshot()', () => {
test('awaits downloadSnapshot() promise and returns { downloadPath }', async () => {
let resolveDownloadSnapshot: Function;
downloadSnapshotMock.mockImplementationOnce(
() =>
new Promise((resolve) => {
resolveDownloadSnapshot = () => {
resolve({ downloadPath: 'foo' });
};
})
);
const cluster = new Cluster({ log });
const promise = cluster.downloadSnapshot({ version: '8.10.0' });
await ensureNoResolve(promise);
resolveDownloadSnapshot!();
await expect(ensureResolve(promise, 'downloadSnapshot()')).resolves.toEqual({
downloadPath: 'foo',
});
});
test('passes through all options+log to downloadSnapshot()', async () => {
downloadSnapshotMock.mockResolvedValue({ downloadPath: 'foo' });
const options: DownloadSnapshotOptions = {
version: '8.10.0',
license: 'trial',
basePath: 'someBasePath',
installPath: 'someInstallPath',
log,
useCached: true,
};
const cluster = new Cluster({ log });
await cluster.downloadSnapshot(options);
expect(downloadSnapshotMock.mock.calls[0][0]).toMatchObject(options);
expect(logWriter.messages).toMatchInlineSnapshot(`
Array [
" info source[@kbn/es Cluster] Downloading snapshot",
]
`);
});
test('rejects if downloadSnapshot() rejects', async () => {
downloadSnapshotMock.mockRejectedValue(new Error('foo'));
const cluster = new Cluster({ log });
await expect(cluster.downloadSnapshot({ version: '8.10.0' })).rejects.toThrowError('foo');
});
});
describe('#installSource()', () => {
test('awaits installSource() promise and returns { installPath }', async () => {
let resolveInstallSource: Function;
installSourceMock.mockImplementationOnce(
() =>
new Promise((resolve) => {
resolveInstallSource = () => {
resolve({ installPath: 'foo' });
};
})
);
const cluster = new Cluster({ log });
const promise = cluster.installSource({ sourcePath: 'bar' });
await ensureNoResolve(promise);
resolveInstallSource!();
await expect(ensureResolve(promise, 'installSource()')).resolves.toEqual({
installPath: 'foo',
});
});
test('passes through all options+log to installSource()', async () => {
installSourceMock.mockResolvedValue({ installPath: 'foo' });
const options: InstallSourceOptions = {
sourcePath: 'bar',
license: 'trial',
password: 'changeme',
basePath: 'someBasePath',
installPath: 'someInstallPath',
esArgs: ['foo=true'],
log,
};
const cluster = new Cluster({ log });
await cluster.installSource(options);
expect(installSourceMock.mock.calls[0][0]).toMatchObject(options);
expect(logWriter.messages).toMatchInlineSnapshot(`
Array [
" info source[@kbn/es Cluster] Installing from source",
]
`);
});
test('rejects if installSource() rejects', async () => {
installSourceMock.mockRejectedValue(new Error('foo'));
const cluster = new Cluster({ log });
await expect(cluster.installSource({ sourcePath: 'bar' })).rejects.toThrowError('foo');
});
});
describe('#installSnapshot()', () => {
test('awaits installSnapshot() promise and returns { installPath }', async () => {
let resolveInstallSnapshot: Function;
installSnapshotMock.mockImplementationOnce(
() =>
new Promise((resolve) => {
resolveInstallSnapshot = () => {
resolve({ installPath: 'foo' });
};
})
);
const cluster = new Cluster({ log });
const promise = cluster.installSnapshot({ version: '8.10.0' });
await ensureNoResolve(promise);
resolveInstallSnapshot!();
await expect(ensureResolve(promise, 'installSnapshot()')).resolves.toEqual({
installPath: 'foo',
});
});
test('passes through all options+log to installSnapshot()', async () => {
installSnapshotMock.mockResolvedValue({ installPath: 'foo' });
const options: InstallSnapshotOptions = {
version: '8.10.0',
license: 'trial',
password: 'changeme',
basePath: 'someBasePath',
installPath: 'someInstallPath',
esArgs: ['foo=true'],
useCached: true,
log,
};
const cluster = new Cluster({ log });
await cluster.installSnapshot(options);
expect(installSnapshotMock.mock.calls[0][0]).toMatchObject(options);
expect(logWriter.messages).toMatchInlineSnapshot(`
Array [
" info source[@kbn/es Cluster] Installing from snapshot",
]
`);
});
test('rejects if installSnapshot() rejects', async () => {
installSnapshotMock.mockRejectedValue(new Error('foo'));
const cluster = new Cluster({ log });
await expect(cluster.installSnapshot({ version: '8.10.0' })).rejects.toThrowError('foo');
});
});
describe('#installArchive()', () => {
test('awaits installArchive() promise and returns { installPath }', async () => {
let resolveInstallArchive: Function;
installArchiveMock.mockImplementationOnce(
() =>
new Promise((resolve) => {
resolveInstallArchive = () => {
resolve({ installPath: 'foo' });
};
})
);
const cluster = new Cluster({ log });
const promise = cluster.installArchive('bar');
await ensureNoResolve(promise);
resolveInstallArchive!();
await expect(ensureResolve(promise, 'installArchive()')).resolves.toEqual({
installPath: 'foo',
});
});
test('passes through all options+log to installArchive()', async () => {
installArchiveMock.mockResolvedValue({ installPath: 'foo' });
const options: InstallArchiveOptions = {
license: 'trial',
password: 'changeme',
basePath: 'someBasePath',
installPath: 'someInstallPath',
esArgs: ['foo=true'],
log,
};
const cluster = new Cluster({ log });
await cluster.installArchive('bar', options);
expect(installArchiveMock.mock.calls[0]).toMatchObject(['bar', options]);
expect(logWriter.messages).toMatchInlineSnapshot(`
Array [
" info source[@kbn/es Cluster] Installing from an archive",
]
`);
});
test('rejects if installArchive() rejects', async () => {
installArchiveMock.mockRejectedValue(new Error('foo'));
const cluster = new Cluster({ log });
await expect(cluster.installArchive('bar')).rejects.toThrowError('foo');
});
});
describe('#start(installPath)', () => {
test('rejects when bin/elasticsearch exists with 0 before starting', async () => {
mockEsBin({ exitCode: 0, start: false });
await expect(
new Cluster({ log }).start(installPath, esClusterExecOptions)
).rejects.toThrowError('ES exited without starting');
});
test('rejects when bin/elasticsearch exists with 143 before starting', async () => {
mockEsBin({ exitCode: 143, start: false });
await expect(
new Cluster({ log }).start(installPath, esClusterExecOptions)
).rejects.toThrowError('ES exited without starting');
});
test('rejects when bin/elasticsearch exists with 130 before starting', async () => {
mockEsBin({ exitCode: 130, start: false });
await expect(
new Cluster({ log }).start(installPath, esClusterExecOptions)
).rejects.toThrowError('ES exited without starting');
});
test('rejects when bin/elasticsearch exists with 1 before starting', async () => {
mockEsBin({ exitCode: 1, start: false });
await expect(
new Cluster({ log }).start(installPath, esClusterExecOptions)
).rejects.toThrowError('ES exited with code 1');
});
test('resolves when bin/elasticsearch logs "started"', async () => {
mockEsBin({ start: true });
await new Cluster({ log }).start(installPath, esClusterExecOptions);
});
test('rejects if #start() was called previously', async () => {
mockEsBin({ start: true });
const cluster = new Cluster({ log });
await cluster.start(installPath, esClusterExecOptions);
await expect(cluster.start(installPath, esClusterExecOptions)).rejects.toThrowError(
'ES has already been started'
);
});
test('rejects if #run() was called previously', async () => {
mockEsBin({ start: true });
const cluster = new Cluster({ log });
await cluster.run(installPath, esClusterExecOptions);
await expect(cluster.start(installPath, esClusterExecOptions)).rejects.toThrowError(
'ES has already been started'
);
});
test('sets up SSL when enabled', async () => {
mockEsBin({ start: true, ssl: true });
const cluster = new Cluster({ log, ssl: true });
await cluster.start(installPath, esClusterExecOptions);
const config = extractConfigFilesMock.mock.calls[0][0];
expect(config).toContain('xpack.security.http.ssl.enabled=true');
expect(config).toContain(`xpack.security.http.ssl.keystore.path=${ES_NOPASSWORD_P12_PATH}`);
expect(config).toContain(`xpack.security.http.ssl.keystore.type=PKCS12`);
});
test(`doesn't setup SSL when disabled`, async () => {
mockEsBin({ start: true });
extractConfigFilesMock.mockReturnValueOnce([]);
const cluster = new Cluster({ log, ssl: false });
await cluster.start(installPath, esClusterExecOptions);
expect(extractConfigFilesMock.mock.calls[0][0]).toMatchObject([
'action.destructive_requires_name=true',
'cluster.routing.allocation.disk.threshold_enabled=false',
'ingest.geoip.downloader.enabled=false',
'search.check_ccs_compatibility=true',
]);
});
test('allows overriding search.check_ccs_compatibility', async () => {
mockEsBin({ start: true });
extractConfigFilesMock.mockReturnValueOnce([]);
const cluster = new Cluster({
log,
ssl: false,
});
await cluster.start('undefined', {
esArgs: ['search.check_ccs_compatibility=false'],
});
expect(extractConfigFilesMock.mock.calls[0][0]).toMatchObject([
'action.destructive_requires_name=true',
'cluster.routing.allocation.disk.threshold_enabled=false',
'ingest.geoip.downloader.enabled=false',
'search.check_ccs_compatibility=false',
]);
});
});
describe('#run()', () => {
test('resolves when bin/elasticsearch exists with 0', async () => {
mockEsBin({ exitCode: 0 });
await new Cluster({ log }).run(installPath, esClusterExecOptions);
});
test('resolves when bin/elasticsearch exists with 143', async () => {
mockEsBin({ exitCode: 143 });
await new Cluster({ log }).run(installPath, esClusterExecOptions);
});
test('resolves when bin/elasticsearch exists with 130', async () => {
mockEsBin({ exitCode: 130 });
await new Cluster({ log }).run(installPath, esClusterExecOptions);
});
test('rejects when bin/elasticsearch exists with 1', async () => {
mockEsBin({ exitCode: 1 });
await expect(new Cluster({ log }).run(installPath, esClusterExecOptions)).rejects.toThrowError(
'ES exited with code 1'
);
});
test('rejects if #start() was called previously', async () => {
mockEsBin({ exitCode: 0, start: true });
const cluster = new Cluster({ log });
await cluster.start(installPath, esClusterExecOptions);
await expect(cluster.run(installPath, esClusterExecOptions)).rejects.toThrowError(
'ES has already been started'
);
});
test('rejects if #run() was called previously', async () => {
mockEsBin({ exitCode: 0 });
const cluster = new Cluster({ log });
await cluster.run(installPath, esClusterExecOptions);
await expect(cluster.run(installPath, esClusterExecOptions)).rejects.toThrowError(
'ES has already been started'
);
});
test('sets up SSL when enabled', async () => {
mockEsBin({ start: true, ssl: true });
const cluster = new Cluster({ log, ssl: true });
await cluster.run(installPath, esClusterExecOptions);
const config = extractConfigFilesMock.mock.calls[0][0];
expect(config).toContain('xpack.security.http.ssl.enabled=true');
expect(config).toContain(`xpack.security.http.ssl.keystore.path=${ES_NOPASSWORD_P12_PATH}`);
expect(config).toContain(`xpack.security.http.ssl.keystore.type=PKCS12`);
});
test(`doesn't setup SSL when disabled`, async () => {
mockEsBin({ start: true });
extractConfigFilesMock.mockReturnValueOnce([]);
const cluster = new Cluster({ log, ssl: false });
await cluster.run(installPath, esClusterExecOptions);
expect(extractConfigFilesMock.mock.calls[0][0]).toMatchObject([
'action.destructive_requires_name=true',
'cluster.routing.allocation.disk.threshold_enabled=false',
'ingest.geoip.downloader.enabled=false',
'search.check_ccs_compatibility=true',
]);
});
test('sets default Java heap', async () => {
mockEsBin({ start: true });
const cluster = new Cluster({ log });
await cluster.run(installPath, esClusterExecOptions);
expect(execaMock.mock.calls[0][2].env.ES_JAVA_OPTS).toMatchInlineSnapshot(
`"-Xms1536m -Xmx1536m"`
);
});
test('allows Java heap to be overwritten', async () => {
mockEsBin({ start: true });
process.env.ES_JAVA_OPTS = '-Xms5g -Xmx5g';
const cluster = new Cluster({ log });
await cluster.run(installPath, esClusterExecOptions);
expect(execaMock.mock.calls[0][2].env.ES_JAVA_OPTS).toMatchInlineSnapshot(`"-Xms5g -Xmx5g"`);
});
});
describe('#installPlugins()', () => {
test('passes through installPath and runs execa for each plugin', async () => {
const cluster = new Cluster({ log });
await cluster.installPlugins('foo', 'esPlugin1,esPlugin2', '');
expect(execaMock.mock.calls.length).toBe(2);
expect(execaMock.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"bin/elasticsearch-plugin",
Array [
"install",
"esPlugin1",
],
Object {
"cwd": "foo",
"env": Object {
"ES_JAVA_OPTS": "-Xms1536m -Xmx1536m",
"JAVA_HOME": "",
},
},
]
`);
expect(execaMock.mock.calls[1]).toMatchInlineSnapshot(`
Array [
"bin/elasticsearch-plugin",
Array [
"install",
"esPlugin2",
],
Object {
"cwd": "foo",
"env": Object {
"ES_JAVA_OPTS": "-Xms1536m -Xmx1536m",
"JAVA_HOME": "",
},
},
]
`);
});
test(`allows 'esJavaOpts' to be overwritten`, async () => {
mockEsBin({ start: true });
const cluster = new Cluster({ log });
await cluster.installPlugins('foo', 'esPlugin1', '-Xms2g -Xmx2g');
expect(execaMock.mock.calls[0][2].env.ES_JAVA_OPTS).toMatchInlineSnapshot(`"-Xms2g -Xmx2g"`);
});
});
describe('#configureKeystoreWithSecureSettingsFiles()', () => {
test('passes through installPath and runs execa for each pair of settings', async () => {
const cluster = new Cluster({ log });
await cluster.configureKeystoreWithSecureSettingsFiles('foo', [
['name1', 'file1'],
['name2', 'file2'],
]);
expect(execaMock.mock.calls.length).toBe(2);
expect(execaMock.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"./bin/elasticsearch-keystore",
Array [
"add-file",
"name1",
"file1",
],
Object {
"cwd": "foo",
"env": Object {
"JAVA_HOME": "",
},
},
]
`);
expect(execaMock.mock.calls[1]).toMatchInlineSnapshot(`
Array [
"./bin/elasticsearch-keystore",
Array [
"add-file",
"name2",
"file2",
],
Object {
"cwd": "foo",
"env": Object {
"JAVA_HOME": "",
},
},
]
`);
});
});
describe('#stop()', () => {
test('rejects if #run() or #start() was not called', async () => {
const cluster = new Cluster({ log });
await expect(cluster.stop()).rejects.toThrowError('ES has not been started');
});
test('resolves when ES exits with 0', async () => {
mockEsBin({ exitCode: 0, start: true });
const cluster = new Cluster({ log });
await cluster.start(installPath, esClusterExecOptions);
await cluster.stop();
});
test('resolves when ES exits with 143', async () => {
mockEsBin({ exitCode: 143, start: true });
const cluster = new Cluster({ log });
await cluster.start(installPath, esClusterExecOptions);
await cluster.stop();
});
test('resolves when ES exits with 130', async () => {
mockEsBin({ exitCode: 130, start: true });
const cluster = new Cluster({ log });
await cluster.start(installPath, esClusterExecOptions);
await cluster.stop();
});
test('rejects when ES exits with 1', async () => {
mockEsBin({ exitCode: 1, start: true });
const cluster = new Cluster({ log });
await expect(cluster.run(installPath, esClusterExecOptions)).rejects.toThrowError(
'ES exited with code 1'
);
await expect(cluster.stop()).rejects.toThrowError('ES exited with code 1');
});
});
describe('#kill()', () => {
test('rejects if #run() or #start() was not called', async () => {
const cluster = new Cluster({ log });
await expect(cluster.kill()).rejects.toThrowError('ES has not been started');
});
test('resolves when ES exits with 0', async () => {
mockEsBin({ exitCode: 0, start: true });
const cluster = new Cluster({ log });
await cluster.start(installPath, esClusterExecOptions);
await cluster.kill();
});
});
describe('#runServerless()', () => {
test(`rejects if #start() was called before`, async () => {
mockEsBin({ start: true });
const cluster = new Cluster({ log });
await cluster.start(installPath, esClusterExecOptions);
await expect(cluster.runServerless({ basePath: installPath })).rejects.toThrowError(
'ES stateful cluster has already been started'
);
});
test(`rejects if #run() was called before`, async () => {
mockEsBin({ start: true });
const cluster = new Cluster({ log });
await cluster.run(installPath, esClusterExecOptions);
await expect(cluster.runServerless({ basePath: installPath })).rejects.toThrowError(
'ES stateful cluster has already been started'
);
});
test('awaits runServerlessCluster() promise and returns node names as string[]', async () => {
const nodeNames = ['es1', 'es2', 'es3'];
let resolveRunServerlessCluster: Function;
runServerlessClusterMock.mockImplementationOnce(
() =>
new Promise((resolve) => {
resolveRunServerlessCluster = () => {
resolve(nodeNames);
};
})
);
const cluster = new Cluster({ log });
const promise = cluster.runServerless({ basePath: installPath });
await ensureNoResolve(promise);
resolveRunServerlessCluster!();
await expect(ensureResolve(promise, 'runServerless()')).resolves.toEqual(nodeNames);
});
test('rejects if #runServerless() was called before', async () => {
const nodeNames = ['es1', 'es2', 'es3'];
runServerlessClusterMock.mockResolvedValueOnce(nodeNames);
const cluster = new Cluster({ log });
await cluster.runServerless({ basePath: installPath });
await expect(cluster.runServerless({ basePath: installPath })).rejects.toThrowError(
'ES serverless docker cluster has already been started'
);
});
test('rejects if #runServerlessCluster() rejects', async () => {
runServerlessClusterMock.mockRejectedValueOnce(new Error('foo'));
const cluster = new Cluster({ log });
await expect(cluster.runServerless({ basePath: installPath })).rejects.toThrowError('foo');
});
test('passes through all options+log to #runServerlessCluster()', async () => {
const nodeNames = ['es1', 'es2', 'es3'];
runServerlessClusterMock.mockResolvedValueOnce(nodeNames);
const cluster = new Cluster({ log });
const serverlessOptions = {
clean: true,
basePath: installPath,
teardown: true,
background: true,
waitForReady: true,
};
await cluster.runServerless(serverlessOptions);
expect(runServerlessClusterMock.mock.calls[0]).toMatchInlineSnapshot(`
Array [
<ToolingLog>,
Object {
"background": true,
"basePath": "${installPath}",
"clean": true,
"teardown": true,
"waitForReady": true,
},
]
`);
});
});
describe('#runDocker()', () => {
const dockerOptions = {};
test(`rejects if #start() was called before`, async () => {
mockEsBin({ start: true });
const cluster = new Cluster({ log });
await cluster.start(installPath, esClusterExecOptions);
await expect(cluster.runDocker(dockerOptions)).rejects.toThrowError(
'ES stateful cluster has already been started'
);
});
test('rejects if #run() was called before', async () => {
mockEsBin({ start: true });
const cluster = new Cluster({ log });
await cluster.run(installPath, esClusterExecOptions);
await expect(cluster.runDocker(dockerOptions)).rejects.toThrowError(
'ES stateful cluster has already been started'
);
});
test('await #runDockerContainer() promise', async () => {
let resolveRunDockerContainer: Function;
runDockerContainerMock.mockImplementationOnce(
() =>
new Promise((resolve) => {
resolveRunDockerContainer = () => {
resolve();
};
})
);
const cluster = new Cluster({ log });
const promise = cluster.runDocker(dockerOptions);
await ensureNoResolve(promise);
resolveRunDockerContainer!();
await expect(ensureResolve(promise, 'runDocker()')).resolves.toBeUndefined();
});
test('rejects if #runDockerContainer() rejects', async () => {
runDockerContainerMock.mockRejectedValueOnce(new Error('foo'));
const cluster = new Cluster({ log });
await expect(cluster.runDocker(dockerOptions)).rejects.toThrowError('foo');
});
test('passes through all options+log to #runDockerContainer()', async () => {
runDockerContainerMock.mockResolvedValueOnce();
const cluster = new Cluster({ log });
await cluster.runDocker({ dockerCmd: 'start -a es01' });
expect(runDockerContainerMock.mock.calls[0]).toMatchInlineSnapshot(`
Array [
<ToolingLog>,
Object {
"dockerCmd": "start -a es01",
},
]
`);
});
});

View file

@ -559,8 +559,7 @@ describe('resolveDockerCmd()', () => {
describe('runDockerContainer()', () => {
test('should resolve', async () => {
execa.mockImplementation(() => Promise.resolve({ stdout: '' }));
await expect(runDockerContainer(log, {})).resolves.toEqual({ stdout: '' });
await expect(runDockerContainer(log, {})).resolves.toBeUndefined();
// setupDocker execa calls then run container
expect(execa.mock.calls).toHaveLength(5);
});

View file

@ -700,7 +700,7 @@ export async function runDockerContainer(log: ToolingLog, options: DockerOptions
const dockerCmd = resolveDockerCmd(options, image);
log.info(chalk.dim(`docker ${dockerCmd.join(' ')}`));
return await execa('docker', dockerCmd, {
await execa('docker', dockerCmd, {
// inherit is required to show Docker output and Java console output for pw, enrollment token, etc
stdio: ['ignore', 'inherit', 'inherit'],
});

View file

@ -18,7 +18,7 @@ import { Cluster } from '@kbn/es';
import { Client, HttpConnection } from '@elastic/elasticsearch';
import type { ToolingLog } from '@kbn/tooling-log';
import { REPO_ROOT } from '@kbn/repo-info';
import type { ArtifactLicense } from '@kbn/es';
import { CI_PARALLEL_PROCESS_PREFIX } from '../ci_parallel_process_prefix';
import { esTestConfig } from './es_test_config';
@ -77,7 +77,7 @@ export interface CreateTestEsClusterOptions {
* you'll likely need to use `basic` or `gold` to prevent the test from failing
* when the license expires.
*/
license?: 'basic' | 'gold' | 'trial'; // | 'oss'
license?: ArtifactLicense;
log: ToolingLog;
writeLogsToPath?: string;
/**
@ -224,7 +224,15 @@ export function createTestEsCluster<
// multiple nodes, they'll all share the same ESinstallation.
const firstNode = this.nodes[0];
if (esFrom === 'source') {
installPath = (await firstNode.installSource(config)).installPath;
installPath = (
await firstNode.installSource({
sourcePath: config.sourcePath,
license: config.license,
password: config.password,
basePath: config.basePath,
esArgs: config.esArgs,
})
).installPath;
} else if (esFrom === 'snapshot') {
installPath = (await firstNode.installSnapshot(config)).installPath;
} else if (esFrom === 'serverless') {

View file

@ -10,6 +10,7 @@ import { resolve } from 'path';
import type { ToolingLog } from '@kbn/tooling-log';
import getPort from 'get-port';
import { REPO_ROOT } from '@kbn/repo-info';
import type { ArtifactLicense } from '@kbn/es';
import type { Config } from '../../functional_test_runner';
import { createTestEsCluster } from '../../es';
@ -33,7 +34,7 @@ function getEsConfig({
esFrom = config.get('esTestCluster.from'),
}: RunElasticsearchOptions) {
const ssl = !!config.get('esTestCluster.ssl');
const license: 'basic' | 'trial' | 'gold' = config.get('esTestCluster.license');
const license: ArtifactLicense = config.get('esTestCluster.license');
const esArgs: string[] = config.get('esTestCluster.serverArgs');
const esJavaOpts: string | undefined = config.get('esTestCluster.esJavaOpts');
const isSecurityEnabled = esArgs.includes('xpack.security.enabled=true');