[migrations v2] Integration test for multi-node cluster. (#100957)

This commit is contained in:
Luke Elmers 2021-06-28 10:49:38 -06:00 committed by GitHub
parent 6a2b968767
commit d2690d8ac8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 476 additions and 44 deletions

View file

@ -133,19 +133,20 @@ exports.Cluster = class Cluster {
}
/**
* Unpakcs a tar or zip file containing the data directory for an
* Unpacks a tar or zip file containing the data directory for an
* ES cluster.
*
* @param {String} installPath
* @param {String} archivePath
* @param {String} [extractDirName]
*/
async extractDataDirectory(installPath, archivePath) {
async extractDataDirectory(installPath, archivePath, extractDirName = 'data') {
this._log.info(chalk.bold(`Extracting data directory`));
this._log.indent(4);
// decompress excludes the root directory as that is how our archives are
// structured. This works in our favor as we can explicitly extract into the data dir
const extractPath = path.resolve(installPath, 'data');
const extractPath = path.resolve(installPath, extractDirName);
this._log.info(`Data archive: ${archivePath}`);
this._log.info(`Extract path: ${extractPath}`);
@ -237,9 +238,12 @@ exports.Cluster = class Cluster {
* @param {Object} options
* @property {string|Array} options.esArgs
* @property {string} options.esJavaOpts
* @property {Boolean} options.skipNativeRealmSetup
* @return {undefined}
*/
_exec(installPath, options = {}) {
_exec(installPath, opts = {}) {
const { skipNativeRealmSetup = false, ...options } = opts;
if (this._process || this._outcome) {
throw new Error('ES has already been started');
}
@ -303,6 +307,10 @@ exports.Cluster = class Cluster {
// once the http port is available setup the native realm
this._nativeRealmSetup = httpPort.then(async (port) => {
if (skipNativeRealmSetup) {
return;
}
const caCert = await this._caCertPromise;
const nativeRealm = new NativeRealm({
port,

View file

@ -7,4 +7,5 @@
*/
export { createTestEsCluster } from './test_es_cluster';
export type { CreateTestEsClusterOptions, EsTestCluster, ICluster } from './test_es_cluster';
export { esTestConfig } from './es_test_config';

View file

@ -19,12 +19,51 @@ import { esTestConfig } from './es_test_config';
import { KIBANA_ROOT } from '../';
interface TestClusterFactoryOptions {
port?: number;
password?: string;
license?: 'basic' | 'trial'; // | 'oss'
interface TestEsClusterNodesOptions {
name: string;
/**
* Depending on the test you are running, it may be necessary to
* configure a separate data archive for each node in the cluster.
* In that case, you can configure each of the archive paths here.
*
* Specifying a top-level `dataArchive` is not necessary if you are using
* this option; per-node archives will always be used if provided.
*/
dataArchive?: string;
}
interface Node {
installSource: (opts: Record<string, unknown>) => Promise<{ installPath: string }>;
installSnapshot: (opts: Record<string, unknown>) => Promise<{ installPath: string }>;
extractDataDirectory: (
installPath: string,
archivePath: string,
extractDirName?: string
) => Promise<{ insallPath: string }>;
start: (installPath: string, opts: Record<string, unknown>) => Promise<void>;
stop: () => Promise<void>;
}
export interface ICluster {
ports: number[];
nodes: Node[];
getStartTimeout: () => number;
start: () => Promise<void>;
stop: () => Promise<void>;
cleanup: () => Promise<void>;
getClient: () => KibanaClient;
getHostUrls: () => string[];
}
export type EsTestCluster<
Options extends CreateTestEsClusterOptions = CreateTestEsClusterOptions
> = Options['nodes'] extends TestEsClusterNodesOptions[]
? ICluster
: ICluster & { getUrl: () => string }; // Only allow use of `getUrl` if `nodes` option isn't provided.
export interface CreateTestEsClusterOptions {
basePath?: string;
esFrom?: string;
clusterName?: string;
/**
* Path to data archive snapshot to run Elasticsearch with.
* To prepare the the snapshot:
@ -33,16 +72,78 @@ interface TestClusterFactoryOptions {
* - stop Elasticsearch server
* - go to Elasticsearch folder: cd .es/${ELASTICSEARCH_VERSION}
* - archive data folder: zip -r my_archive.zip data
* */
*/
dataArchive?: string;
/**
* Elasticsearch configuration options. These are key/value pairs formatted as:
* `['key.1=val1', 'key.2=val2']`
*/
esArgs?: string[];
esFrom?: string;
esJavaOpts?: string;
clusterName?: string;
/**
* License to run your cluster under. Keep in mind that a `trial` license
* has an expiration date. If you are using a `dataArchive` with your tests,
* you'll likely need to use `basic` or `gold` to prevent the test from failing
* when the license expires.
*/
license?: 'basic' | 'gold' | 'trial'; // | 'oss'
log: ToolingLog;
/**
* Node-specific configuration if you wish to run a multi-node
* cluster. One node will be added for each item in the array.
*
* If this option is not provided, the config will default
* to a single-node cluster.
*
* @example
* {
* nodes: [
* {
* name: 'node-01',
* dataArchive: Path.join(__dirname, 'path', 'to', 'data_01')
* . },
* {
* name: 'node-02',
* dataArchive: Path.join(__dirname, 'path', 'to', 'data_02')
* . },
* ],
* }
*/
nodes?: TestEsClusterNodesOptions[];
/**
* Password for the `elastic` user. This is set after the cluster has started.
*
* Defaults to `changeme`.
*/
password?: string;
/**
* Port to run Elasticsearch on. If you configure a
* multi-node cluster with the `nodes` option, this
* port will be incremented by one for each added node.
*
* @example
* {
* nodes: [
* {
* name: 'node-01',
* dataArchive: Path.join(__dirname, 'path', 'to', 'data_01')
* . },
* {
* name: 'node-02',
* dataArchive: Path.join(__dirname, 'path', 'to', 'data_02')
* . },
* ],
* port: 6200, // node-01 will use 6200, node-02 will use 6201
* }
*/
port?: number;
ssl?: boolean;
}
export function createTestEsCluster(options: TestClusterFactoryOptions) {
export function createTestEsCluster<
Options extends CreateTestEsClusterOptions = CreateTestEsClusterOptions
>(options: Options): EsTestCluster<Options> {
const {
port = esTestConfig.getPort(),
password = 'changeme',
@ -51,6 +152,7 @@ export function createTestEsCluster(options: TestClusterFactoryOptions) {
basePath = Path.resolve(KIBANA_ROOT, '.es'),
esFrom = esTestConfig.getBuildFrom(),
dataArchive,
nodes = [{ name: 'node-01' }],
esArgs: customEsArgs = [],
esJavaOpts,
clusterName: customClusterName = 'es-test-cluster',
@ -59,14 +161,17 @@ export function createTestEsCluster(options: TestClusterFactoryOptions) {
const clusterName = `${CI_PARALLEL_PROCESS_PREFIX}${customClusterName}`;
const esArgs = [
const defaultEsArgs = [
`cluster.name=${clusterName}`,
`http.port=${port}`,
'discovery.type=single-node',
`transport.port=${esTestConfig.getTransportPort()}`,
...customEsArgs,
// For multi-node clusters, we make all nodes master-eligible by default.
...(nodes.length > 1
? ['discovery.type=zen', `cluster.initial_master_nodes=${nodes.map((n) => n.name).join(',')}`]
: ['discovery.type=single-node']),
];
const esArgs = assignArgs(defaultEsArgs, customEsArgs);
const config = {
version: esTestConfig.getVersion(),
installPath: Path.resolve(basePath, clusterName),
@ -77,9 +182,18 @@ export function createTestEsCluster(options: TestClusterFactoryOptions) {
esArgs,
};
const cluster = new Cluster({ log, ssl });
return new (class TestCluster {
ports: number[] = [];
nodes: Node[] = [];
constructor() {
for (let i = 0; i < nodes.length; i++) {
this.nodes.push(new Cluster({ log, ssl }));
// If this isn't the first node, we increment the port of the last node
this.ports.push(i === 0 ? port : port + i);
}
}
return new (class EsTestCluster {
getStartTimeout() {
const second = 1000;
const minute = second * 60;
@ -88,31 +202,73 @@ export function createTestEsCluster(options: TestClusterFactoryOptions) {
}
async start() {
let installPath;
let installPath: string;
// We only install once using the first node. If the cluster has
// multiple nodes, they'll all share the same ESinstallation.
const firstNode = this.nodes[0];
if (esFrom === 'source') {
installPath = (await cluster.installSource(config)).installPath;
installPath = (await firstNode.installSource(config)).installPath;
} else if (esFrom === 'snapshot') {
installPath = (await cluster.installSnapshot(config)).installPath;
installPath = (await firstNode.installSnapshot(config)).installPath;
} else if (Path.isAbsolute(esFrom)) {
installPath = esFrom;
} else {
throw new Error(`unknown option esFrom "${esFrom}"`);
}
if (dataArchive) {
await cluster.extractDataDirectory(installPath, dataArchive);
// Collect promises so we can run them in parallel
const extractDirectoryPromises = [];
const nodeStartPromises = [];
for (let i = 0; i < this.nodes.length; i++) {
const node = nodes[i];
const nodePort = this.ports[i];
const overriddenArgs = [`node.name=${node.name}`, `http.port=${nodePort}`];
const archive = node.dataArchive || dataArchive;
if (archive) {
extractDirectoryPromises.push(async () => {
const nodeDataDirectory = node.dataArchive ? `data-${node.name}` : 'data';
overriddenArgs.push(`path.data=${Path.resolve(installPath, nodeDataDirectory)}`);
return await this.nodes[i].extractDataDirectory(
installPath,
archive,
nodeDataDirectory
);
});
}
nodeStartPromises.push(async () => {
log.info(`[es] starting node ${node.name} on port ${nodePort}`);
return await this.nodes[i].start(installPath, {
password: config.password,
esArgs: assignArgs(esArgs, overriddenArgs),
esJavaOpts,
// If we have multiple nodes, we shouldn't try setting up the native realm
// right away, or ES will complain as the cluster isn't ready. So we only
// set it up after the last node is started.
skipNativeRealmSetup: this.nodes.length > 1 && i < this.nodes.length - 1,
});
});
}
await cluster.start(installPath, {
password: config.password,
esArgs,
esJavaOpts,
});
await Promise.all(extractDirectoryPromises.map(async (extract) => await extract()));
for (const start of nodeStartPromises) {
await start();
}
}
async stop() {
await cluster.stop();
const nodeStopPromises = [];
for (let i = 0; i < this.nodes.length; i++) {
nodeStopPromises.push(async () => {
log.info(`[es] stopping node ${nodes[i].name}`);
return await this.nodes[i].stop();
});
}
await Promise.all(nodeStopPromises.map(async (stop) => await stop()));
log.info('[es] stopped');
}
@ -127,15 +283,63 @@ export function createTestEsCluster(options: TestClusterFactoryOptions) {
*/
getClient(): KibanaClient {
return new Client({
node: this.getUrl(),
node: this.getHostUrls()[0],
});
}
getUrl() {
if (this.nodes.length > 1) {
throw new Error(
'`getUrl()` can only be used with a single-node cluster. For multi-node clusters, use `getHostUrls()`.'
);
}
const parts = esTestConfig.getUrlParts();
parts.port = port;
return format(parts);
}
})();
/**
* Returns a list of host URLs for the cluster. Intended for use
* when configuring Kibana's `elasticsearch.hosts` in a test.
*
* If the cluster has multiple nodes, each node URL will be included
* in this list.
*/
getHostUrls(): string[] {
return this.ports.map((p) => format({ ...esTestConfig.getUrlParts(), port: p }));
}
})() as EsTestCluster<Options>;
}
/**
* Like `Object.assign`, but for arrays of `key=val` strings. Takes an arbitrary
* number of arrays, and allows values in subsequent args to override previous
* values for the same key.
*
* @example
*
* assignArgs(['foo=a', 'bar=b'], ['foo=c', 'baz=d']); // returns ['foo=c', 'bar=b', 'baz=d']
*/
function assignArgs(...args: string[][]): string[] {
const toArgsObject = (argList: string[]) => {
const obj: Record<string, string> = {};
argList.forEach((arg) => {
const [key, val] = arg.split('=');
obj[key] = val;
});
return obj;
};
return Object.entries(
args.reduce((acc, cur) => {
return {
...acc,
...toArgsObject(cur),
};
}, {})
).map(([key, val]) => `${key}=${val}`);
}

View file

@ -25,7 +25,13 @@ export { runTests, startServers } from './functional_tests/tasks';
// @internal
export { KIBANA_ROOT } from './functional_tests/lib/paths';
export { esTestConfig, createTestEsCluster } from './es';
export {
esTestConfig,
createTestEsCluster,
CreateTestEsClusterOptions,
EsTestCluster,
ICluster,
} from './es';
export { kbnTestConfig, kibanaServerTestUser, kibanaTestUser, adminTestUser } from './kbn';

View file

@ -0,0 +1,209 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import Path from 'path';
import Fs from 'fs';
import Util from 'util';
import glob from 'glob';
import { kibanaServerTestUser } from '@kbn/test';
import { kibanaPackageJson as pkg } from '@kbn/utils';
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
import type { ElasticsearchClient } from '../../../elasticsearch';
import { Root } from '../../../root';
const LOG_FILE_PREFIX = 'migration_test_multiple_es_nodes';
const asyncUnlink = Util.promisify(Fs.unlink);
async function removeLogFile() {
glob(Path.join(__dirname, `${LOG_FILE_PREFIX}_*.log`), (err, files) => {
files.forEach(async (file) => {
// ignore errors if it doesn't exist
await asyncUnlink(file).catch(() => void 0);
});
});
}
function extractSortNumberFromId(id: string): number {
const parsedId = parseInt(id.split(':')[1], 10); // "foo:123" -> 123
if (isNaN(parsedId)) {
throw new Error(`Failed to parse Saved Object ID [${id}]. Result is NaN`);
}
return parsedId;
}
async function fetchDocs(esClient: ElasticsearchClient, index: string, type: string) {
const { body } = await esClient.search<any>({
index,
size: 10000,
body: {
query: {
bool: {
should: [
{
term: { type },
},
],
},
},
},
});
return body.hits.hits
.map((h) => ({
...h._source,
id: h._id,
}))
.sort((a, b) => extractSortNumberFromId(a.id) - extractSortNumberFromId(b.id));
}
interface RootConfig {
logFileName: string;
hosts: string[];
}
function createRoot({ logFileName, hosts }: RootConfig) {
return kbnTestServer.createRoot({
elasticsearch: {
hosts,
username: kibanaServerTestUser.username,
password: kibanaServerTestUser.password,
},
migrations: {
skip: false,
enableV2: true,
batchSize: 100, // fixture contains 5000 docs
},
logging: {
appenders: {
file: {
type: 'file',
fileName: logFileName,
layout: {
type: 'json',
},
},
},
loggers: [
{
name: 'root',
appenders: ['file'],
},
],
},
});
}
describe('migration v2', () => {
let esServer: kbnTestServer.TestElasticsearchUtils;
let root: Root;
const migratedIndex = `.kibana_${pkg.version}_001`;
beforeAll(async () => {
await removeLogFile();
});
afterAll(async () => {
await new Promise((resolve) => setTimeout(resolve, 10000));
});
afterEach(async () => {
if (root) {
await root.shutdown();
}
if (esServer) {
await esServer.stop();
}
});
it('migrates saved objects normally with multiple ES nodes', async () => {
const { startES } = kbnTestServer.createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
settings: {
es: {
license: 'basic',
clusterName: 'es-test-cluster',
nodes: [
{
name: 'node-01',
// original SO (5000 total; 2500 of type `foo` + 2500 of type `bar`):
// [
// { id: 'foo:1', type: 'foo', foo: { status: 'not_migrated_1' } },
// { id: 'bar:1', type: 'bar', bar: { status: 'not_migrated_1' } },
// { id: 'foo:2', type: 'foo', foo: { status: 'not_migrated_2' } },
// { id: 'bar:2', type: 'bar', bar: { status: 'not_migrated_2' } },
// ];
dataArchive: Path.join(__dirname, 'archives', '7.13.0_5k_so_node_01.zip'),
},
{
name: 'node-02',
dataArchive: Path.join(__dirname, 'archives', '7.13.0_5k_so_node_02.zip'),
},
],
},
},
});
esServer = await startES();
root = createRoot({
logFileName: Path.join(__dirname, `${LOG_FILE_PREFIX}.log`),
hosts: esServer.hosts,
});
const setup = await root.setup();
setup.savedObjects.registerType({
name: 'foo',
hidden: false,
mappings: { properties: { status: { type: 'text' } } },
namespaceType: 'agnostic',
migrations: {
'7.14.0': (doc) => {
if (doc.attributes?.status) {
doc.attributes.status = doc.attributes.status.replace('not_migrated', 'migrated');
}
return doc;
},
},
});
setup.savedObjects.registerType({
name: 'bar',
hidden: false,
mappings: { properties: { status: { type: 'text' } } },
namespaceType: 'agnostic',
migrations: {
'7.14.0': (doc) => {
if (doc.attributes?.status) {
doc.attributes.status = doc.attributes.status.replace('not_migrated', 'migrated');
}
return doc;
},
},
});
await root.start();
const esClient = esServer.es.getClient();
const migratedFooDocs = await fetchDocs(esClient, migratedIndex, 'foo');
expect(migratedFooDocs.length).toBe(2500);
migratedFooDocs.forEach((doc, i) => {
expect(doc.id).toBe(`foo:${i}`);
expect(doc.foo.status).toBe(`migrated_${i}`);
expect(doc.migrationVersion.foo).toBe('7.14.0');
});
const migratedBarDocs = await fetchDocs(esClient, migratedIndex, 'bar');
expect(migratedBarDocs.length).toBe(2500);
migratedBarDocs.forEach((doc, i) => {
expect(doc.id).toBe(`bar:${i}`);
expect(doc.bar.status).toBe(`migrated_${i}`);
expect(doc.migrationVersion.bar).toBe('7.14.0');
});
});
});

View file

@ -7,7 +7,13 @@
*/
import { ToolingLog, REPO_ROOT } from '@kbn/dev-utils';
import { createTestEsCluster, esTestConfig, kibanaServerTestUser, kibanaTestUser } from '@kbn/test';
import {
createTestEsCluster,
CreateTestEsClusterOptions,
esTestConfig,
kibanaServerTestUser,
kibanaTestUser,
} from '@kbn/test';
import { defaultsDeep } from 'lodash';
import { resolve } from 'path';
import { BehaviorSubject } from 'rxjs';
@ -153,10 +159,7 @@ export function createTestServers({
}: {
adjustTimeout: (timeout: number) => void;
settings?: {
es?: {
license: 'basic' | 'gold' | 'trial';
[key: string]: any;
};
es?: Partial<CreateTestEsClusterOptions>;
kbn?: {
/**
* An array of directories paths, passed in via absolute path strings
@ -217,7 +220,7 @@ export function createTestServers({
if (['gold', 'trial'].includes(license)) {
// Override provided configs
kbnSettings.elasticsearch = {
hosts: [esTestConfig.getUrl()],
hosts: es.getHostUrls(),
username: kibanaServerTestUser.username,
password: kibanaServerTestUser.password,
};
@ -226,7 +229,7 @@ export function createTestServers({
return {
stop: async () => await es.cleanup(),
es,
hosts: [esTestConfig.getUrl()],
hosts: es.getHostUrls(),
username: kibanaServerTestUser.username,
password: kibanaServerTestUser.password,
};

View file

@ -85,11 +85,12 @@ def withFunctionalTestEnv(List additionalEnvs = [], Closure closure) {
def parallelId = env.TASK_QUEUE_PROCESS_ID ?: env.CI_PARALLEL_PROCESS_NUMBER
def kibanaPort = "61${parallelId}1"
def esPort = "61${parallelId}2"
def esTransportPort = "61${parallelId}3"
def fleetPackageRegistryPort = "61${parallelId}4"
def alertingProxyPort = "61${parallelId}5"
def corsTestServerPort = "61${parallelId}6"
def esPort = "62${parallelId}1"
// Ports 62x2-62x9 kept open for ES nodes
def esTransportPort = "63${parallelId}1-63${parallelId}9"
def fleetPackageRegistryPort = "64${parallelId}1"
def alertingProxyPort = "64${parallelId}2"
def corsTestServerPort = "64${parallelId}3"
def apmActive = githubPr.isPr() ? "false" : "true"
withEnv([