[kbn/es] add support for --ready-timeout (#126217) (#126398)

(cherry picked from commit a16c20b7b4)

Co-authored-by: Spencer <spencer@elastic.co>
This commit is contained in:
Kibana Machine 2022-02-24 19:14:56 -05:00 committed by GitHub
parent 06110bd973
commit ab4f4329af
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 246 additions and 75 deletions

View file

@ -10,6 +10,7 @@ const dedent = require('dedent');
const getopts = require('getopts');
const { Cluster } = require('../cluster');
const { createCliError } = require('../errors');
const { parseTimeoutToMs } = require('../utils');
exports.description = 'Install and run from an Elasticsearch tar';
@ -27,6 +28,8 @@ exports.help = (defaults = {}) => {
--password.[user] Sets password for native realm user [default: ${password}]
--ssl Sets up SSL on Elasticsearch
-E Additional key=value settings to pass to Elasticsearch
--skip-ready-check Disable the ready check,
--ready-timeout Customize the ready check timeout, in seconds or "Xm" format, defaults to 1m
Example:
@ -41,8 +44,13 @@ exports.run = async (defaults = {}) => {
basePath: 'base-path',
installPath: 'install-path',
esArgs: 'E',
skipReadyCheck: 'skip-ready-check',
readyTimeout: 'ready-timeout',
},
string: ['ready-timeout'],
boolean: ['skip-ready-check'],
default: defaults,
});
@ -54,5 +62,8 @@ exports.run = async (defaults = {}) => {
}
const { installPath } = await cluster.installArchive(path, options);
await cluster.run(installPath, options);
await cluster.run(installPath, {
...options,
readyTimeout: parseTimeoutToMs(options.readyTimeout),
});
};

View file

@ -10,6 +10,7 @@ const dedent = require('dedent');
const getopts = require('getopts');
import { ToolingLog, getTimeReporter } from '@kbn/dev-utils';
const { Cluster } = require('../cluster');
const { parseTimeoutToMs } = require('../utils');
exports.description = 'Downloads and run from a nightly snapshot';
@ -30,6 +31,8 @@ exports.help = (defaults = {}) => {
--download-only Download the snapshot but don't actually start it
--ssl Sets up SSL on Elasticsearch
--use-cached Skips cache verification and use cached ES snapshot.
--skip-ready-check Disable the ready check,
--ready-timeout Customize the ready check timeout, in seconds or "Xm" format, defaults to 1m
Example:
@ -53,11 +56,12 @@ exports.run = async (defaults = {}) => {
dataArchive: 'data-archive',
esArgs: 'E',
useCached: 'use-cached',
skipReadyCheck: 'skip-ready-check',
readyTimeout: 'ready-timeout',
},
string: ['version'],
boolean: ['download-only', 'use-cached'],
string: ['version', 'ready-timeout'],
boolean: ['download-only', 'use-cached', 'skip-ready-check'],
default: defaults,
});
@ -82,6 +86,7 @@ exports.run = async (defaults = {}) => {
reportTime,
startTime: runStartTime,
...options,
readyTimeout: parseTimeoutToMs(options.readyTimeout),
});
}
};

View file

@ -9,6 +9,7 @@
const dedent = require('dedent');
const getopts = require('getopts');
const { Cluster } = require('../cluster');
const { parseTimeoutToMs } = require('../utils');
exports.description = 'Build and run from source';
@ -27,6 +28,8 @@ exports.help = (defaults = {}) => {
--password.[user] Sets password for native realm user [default: ${password}]
--ssl Sets up SSL on Elasticsearch
-E Additional key=value settings to pass to Elasticsearch
--skip-ready-check Disable the ready check,
--ready-timeout Customize the ready check timeout, in seconds or "Xm" format, defaults to 1m
Example:
@ -42,9 +45,14 @@ exports.run = async (defaults = {}) => {
installPath: 'install-path',
sourcePath: 'source-path',
dataArchive: 'data-archive',
skipReadyCheck: 'skip-ready-check',
readyTimeout: 'ready-timeout',
esArgs: 'E',
},
string: ['ready-timeout'],
boolean: ['skip-ready-check'],
default: defaults,
});
@ -55,5 +63,8 @@ exports.run = async (defaults = {}) => {
await cluster.extractDataDirectory(installPath, options.dataArchive);
}
await cluster.run(installPath, options);
await cluster.run(installPath, {
...options,
readyTimeout: parseTimeoutToMs(options.readyTimeout),
});
};

View file

@ -6,20 +6,29 @@
* Side Public License, v 1.
*/
const fs = require('fs');
const util = require('util');
const fsp = require('fs/promises');
const execa = require('execa');
const chalk = require('chalk');
const path = require('path');
const { Client } = require('@elastic/elasticsearch');
const { downloadSnapshot, installSnapshot, installSource, installArchive } = require('./install');
const { ES_BIN } = require('./paths');
const { log: defaultLog, parseEsLog, extractConfigFiles, NativeRealm } = require('./utils');
const {
log: defaultLog,
parseEsLog,
extractConfigFiles,
NativeRealm,
parseTimeoutToMs,
} = require('./utils');
const { createCliError } = require('./errors');
const { promisify } = require('util');
const treeKillAsync = promisify(require('tree-kill'));
const { parseSettings, SettingsFilter } = require('./settings');
const { CA_CERT_PATH, ES_NOPASSWORD_P12_PATH, extract } = require('@kbn/dev-utils');
const readFile = util.promisify(fs.readFile);
const DEFAULT_READY_TIMEOUT = parseTimeoutToMs('1m');
/** @typedef {import('./cluster_exec_options').EsClusterExecOptions} ExecOptions */
// listen to data on stream until map returns anything but undefined
const first = (stream, map) =>
@ -38,7 +47,6 @@ exports.Cluster = class Cluster {
constructor({ log = defaultLog, ssl = false } = {}) {
this._log = log.withType('@kbn/es Cluster');
this._ssl = ssl;
this._caCertPromise = ssl ? readFile(CA_CERT_PATH) : undefined;
}
/**
@ -157,10 +165,8 @@ exports.Cluster = class Cluster {
* Starts ES and returns resolved promise once started
*
* @param {String} installPath
* @param {Object} options
* @property {Array} options.esArgs
* @property {String} options.password - super user password used to bootstrap
* @returns {Promise}
* @param {ExecOptions} options
* @returns {Promise<void>}
*/
async start(installPath, options = {}) {
this._exec(installPath, options);
@ -173,7 +179,7 @@ exports.Cluster = class Cluster {
return true;
}
}),
this._nativeRealmSetup,
this._setupPromise,
]),
// await the outcome of the process in case it exits before starting
@ -187,15 +193,14 @@ exports.Cluster = class Cluster {
* Starts Elasticsearch and waits for Elasticsearch to exit
*
* @param {String} installPath
* @param {Object} options
* @property {Array} options.esArgs
* @returns {Promise<undefined>}
* @param {ExecOptions} options
* @returns {Promise<void>}
*/
async run(installPath, options = {}) {
this._exec(installPath, options);
// log native realm setup errors so they aren't uncaught
this._nativeRealmSetup.catch((error) => {
this._setupPromise.catch((error) => {
this._log.error(error);
this.stop();
});
@ -233,14 +238,17 @@ exports.Cluster = class Cluster {
*
* @private
* @param {String} installPath
* @param {Object} options
* @property {string|Array} options.esArgs
* @property {string} options.esJavaOpts
* @property {Boolean} options.skipNativeRealmSetup
* @return {undefined}
* @param {ExecOptions} opts
*/
_exec(installPath, opts = {}) {
const { skipNativeRealmSetup = false, reportTime = () => {}, startTime, ...options } = opts;
const {
skipNativeRealmSetup = false,
reportTime = () => {},
startTime,
skipReadyCheck,
readyTimeout,
...options
} = opts;
if (this._process || this._outcome) {
throw new Error('ES has already been started');
@ -300,30 +308,49 @@ exports.Cluster = class Cluster {
stdio: ['ignore', 'pipe', 'pipe'],
});
// parse log output to find http port
const httpPort = first(this._process.stdout, (data) => {
const match = data.toString('utf8').match(/HttpServer.+publish_address {[0-9.]+:([0-9]+)/);
this._setupPromise = Promise.all([
// parse log output to find http port
first(this._process.stdout, (data) => {
const match = data.toString('utf8').match(/HttpServer.+publish_address {[0-9.]+:([0-9]+)/);
if (match) {
return match[1];
}
});
if (match) {
return match[1];
}
}),
// once the http port is available setup the native realm
this._nativeRealmSetup = httpPort.then(async (port) => {
if (skipNativeRealmSetup) {
return;
}
const caCert = await this._caCertPromise;
const nativeRealm = new NativeRealm({
port,
caCert,
log: this._log,
elasticPassword: options.password,
ssl: this._ssl,
// load the CA cert from disk if necessary
this._ssl ? fsp.readFile(CA_CERT_PATH) : null,
]).then(async ([port, caCert]) => {
const client = new Client({
node: `${caCert ? 'https:' : 'http:'}//localhost:${port}`,
auth: {
username: 'elastic',
password: options.password,
},
tls: caCert
? {
ca: caCert,
rejectUnauthorized: true,
}
: undefined,
});
await nativeRealm.setPasswords(options);
if (!skipReadyCheck) {
await this._waitForClusterReady(client, readyTimeout);
}
// once the cluster is ready setup the native realm
if (!skipNativeRealmSetup) {
const nativeRealm = new NativeRealm({
log: this._log,
elasticPassword: options.password,
client,
});
await nativeRealm.setPasswords(options);
}
this._log.success('kbn/es setup complete');
});
let reportSent = false;
@ -366,4 +393,43 @@ exports.Cluster = class Cluster {
}
});
}
async _waitForClusterReady(client, readyTimeout = DEFAULT_READY_TIMEOUT) {
let attempt = 0;
const start = Date.now();
this._log.info('waiting for ES cluster to report a yellow or green status');
while (true) {
attempt += 1;
try {
const resp = await client.cluster.health();
if (resp.status !== 'red') {
return;
}
throw new Error(`not ready, cluster health is ${resp.status}`);
} catch (error) {
const timeSinceStart = Date.now() - start;
if (timeSinceStart > readyTimeout) {
const sec = readyTimeout / 1000;
throw new Error(`ES cluster failed to come online with the ${sec} second timeout`);
}
if (error.message.startsWith('not ready,')) {
if (timeSinceStart > 10_000) {
this._log.warning(error.message);
}
} else {
this._log.warning(
`waiting for ES cluster to come online, attempt ${attempt} failed with: ${error.message}`
);
}
const waitSec = attempt * 1.5;
await new Promise((resolve) => setTimeout(resolve, waitSec * 1000));
}
}
}
};

View file

@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export interface EsClusterExecOptions {
skipNativeRealmSetup?: boolean;
reportTime?: (...args: any[]) => void;
startTime?: number;
esArgs?: string[];
esJavaOpts?: string;
password?: string;
skipReadyCheck?: boolean;
readyTimeout?: number;
}

View file

@ -17,3 +17,4 @@ export { extractConfigFiles } from './extract_config_files';
export { NativeRealm, SYSTEM_INDICES_SUPERUSER } from './native_realm';
export { buildSnapshot } from './build_snapshot';
export { archiveForPlatform } from './build_snapshot';
export * from './parse_timeout_to_ms';

View file

@ -6,7 +6,6 @@
* Side Public License, v 1.
*/
const { Client } = require('@elastic/elasticsearch');
const chalk = require('chalk');
const { log: defaultLog } = require('./log');
@ -15,14 +14,9 @@ export const SYSTEM_INDICES_SUPERUSER =
process.env.TEST_ES_SYSTEM_INDICES_USER || 'system_indices_superuser';
exports.NativeRealm = class NativeRealm {
constructor({ elasticPassword, port, log = defaultLog, ssl = false, caCert }) {
const auth = { username: 'elastic', password: elasticPassword };
this._client = new Client(
ssl
? { node: `https://localhost:${port}`, tls: { ca: caCert, rejectUnauthorized: true }, auth }
: { node: `http://localhost:${port}`, auth }
);
constructor({ elasticPassword, log = defaultLog, client }) {
this._elasticPassword = elasticPassword;
this._client = client;
this._log = log;
}
@ -53,24 +47,14 @@ exports.NativeRealm = class NativeRealm {
});
}
async clusterReady() {
return await this._autoRetry({ maxAttempts: 10 }, async () => {
const { status } = await this._client.cluster.health();
if (status === 'red') {
throw new Error(`not ready, cluster health is ${status}`);
}
});
}
async setPasswords(options) {
await this.clusterReady();
if (!(await this.isSecurityEnabled())) {
this._log.info('security is not enabled, unable to set native realm passwords');
return;
}
const reservedUsers = await this.getReservedUsers();
this._log.info(`Set up ${reservedUsers.length} ES users`);
await Promise.all([
...reservedUsers.map(async (user) => {
await this.setPassword(user, options[`password.${user}`]);
@ -108,7 +92,7 @@ exports.NativeRealm = class NativeRealm {
}
async _autoRetry(opts, fn) {
const { attempt = 1, maxAttempts = 3, sleep = 1000 } = opts;
const { attempt = 1, maxAttempts = 3 } = opts;
try {
return await fn(attempt);
@ -119,7 +103,7 @@ exports.NativeRealm = class NativeRealm {
const sec = 1.5 * attempt;
this._log.warning(`assuming ES isn't initialized completely, trying again in ${sec} seconds`);
await new Promise((resolve) => setTimeout(resolve, sleep));
await new Promise((resolve) => setTimeout(resolve, sec * 1000));
const nextOpts = {
...opts,

View file

@ -7,12 +7,7 @@
*/
const { NativeRealm } = require('./native_realm');
jest.genMockFromModule('@elastic/elasticsearch');
jest.mock('@elastic/elasticsearch');
const { ToolingLog } = require('@kbn/dev-utils');
const { Client } = require('@elastic/elasticsearch');
const mockClient = {
xpack: {
@ -28,13 +23,12 @@ const mockClient = {
putUser: jest.fn(),
},
};
Client.mockImplementation(() => mockClient);
const log = new ToolingLog();
let nativeRealm;
beforeEach(() => {
nativeRealm = new NativeRealm({ elasticPassword: 'changeme', port: '9200', log });
nativeRealm = new NativeRealm({ elasticPassword: 'changeme', client: mockClient, log });
});
afterAll(() => {

View file

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { parseTimeoutToMs } from './parse_timeout_to_ms';
it('handles empty values', () => {
expect(parseTimeoutToMs(undefined)).toMatchInlineSnapshot(`undefined`);
expect(parseTimeoutToMs('')).toMatchInlineSnapshot(`undefined`);
});
it('returns numbers', () => {
expect(parseTimeoutToMs(10)).toMatchInlineSnapshot(`10`);
});
it('parses seconds', () => {
expect(parseTimeoutToMs('10')).toMatchInlineSnapshot(`10000`);
});
it('parses minutes', () => {
expect(parseTimeoutToMs('10m')).toMatchInlineSnapshot(`600000`);
});
it('throws for invalid values', () => {
expect(() => parseTimeoutToMs(true)).toThrowErrorMatchingInlineSnapshot(
`"[true] is not a valid timeout value"`
);
expect(() => parseTimeoutToMs([true])).toThrowErrorMatchingInlineSnapshot(
`"[[ true ]] is not a valid timeout value"`
);
expect(() => parseTimeoutToMs(['true'])).toThrowErrorMatchingInlineSnapshot(
`"[[ 'true' ]] is not a valid timeout value"`
);
expect(() => parseTimeoutToMs(NaN)).toThrowErrorMatchingInlineSnapshot(
`"[NaN] is not a valid timeout value"`
);
});

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { inspect } from 'util';
function parseInt(n: string) {
const number = Number.parseInt(n, 10);
if (Number.isNaN(number)) {
throw new Error(`invalid number [${n}]`);
}
return number;
}
/**
* Parse a timeout value to milliseconds. Supports undefined, a number, an
* empty string, a string representing a number of minutes eg 1m, or a string
* representing a number of seconds eg 60. All other values throw an error
*/
export function parseTimeoutToMs(seconds: any): number | undefined {
if (seconds === undefined || seconds === '') {
return undefined;
}
if (typeof seconds === 'number' && !Number.isNaN(seconds)) {
return seconds;
}
if (typeof seconds !== 'string') {
throw new Error(`[${inspect(seconds)}] is not a valid timeout value`);
}
if (seconds.endsWith('m')) {
const m = parseInt(seconds.slice(0, -1));
return m * 60 * 1000;
}
return parseInt(seconds) * 1000;
}

View file

@ -247,9 +247,10 @@ export function createTestEsCluster<
esArgs: assignArgs(esArgs, overriddenArgs),
esJavaOpts,
// If we have multiple nodes, we shouldn't try setting up the native realm
// right away, or ES will complain as the cluster isn't ready. So we only
// right away or wait for ES to be green, the cluster isn't ready. So we only
// set it up after the last node is started.
skipNativeRealmSetup: this.nodes.length > 1 && i < this.nodes.length - 1,
skipReadyCheck: this.nodes.length > 1 && i < this.nodes.length - 1,
});
});
}