[kbn-es] use the same method to wait for cluster status (#166332)

## Summary

Removing `cluster.waitForClusterReady` in favour of unified
`waitUntilClusterReady` method.

We will keep waiting for `yellow` cluster status in the FTR stateful
tests and `green` in serverless tests.
This commit is contained in:
Dzmitry Lemechko 2023-09-18 15:39:52 +02:00 committed by GitHub
parent faf8774098
commit 73d1c354d7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 178 additions and 86 deletions

View file

@ -42,8 +42,7 @@ import {
InstallSnapshotOptions,
InstallSourceOptions,
} from './install/types';
const DEFAULT_READY_TIMEOUT = 60 * 1000;
import { waitUntilClusterReady } from './utils/wait_until_cluster_ready';
// listen to data on stream until map returns anything but undefined
const firstResult = (stream: Readable, map: (data: Buffer) => string | true | undefined) =>
@ -426,7 +425,12 @@ export class Cluster {
});
if (!skipReadyCheck) {
await this.waitForClusterReady(client, readyTimeout);
await waitUntilClusterReady({
client,
expectedStatus: 'yellow',
log: this.log,
readyTimeout,
});
}
// once the cluster is ready setup the native realm
@ -509,45 +513,6 @@ export class Cluster {
});
}
async waitForClusterReady(client: Client, readyTimeout = DEFAULT_READY_TIMEOUT) {
let attempt = 0;
const start = Date.now();
this.log.info('waiting for ES cluster to report a yellow or green status');
while (true) {
attempt += 1;
try {
const resp = await client.cluster.health();
if (resp.status !== 'red') {
return;
}
throw new Error(`not ready, cluster health is ${resp.status}`);
} catch (error) {
const timeSinceStart = Date.now() - start;
if (timeSinceStart > readyTimeout) {
const sec = readyTimeout / 1000;
throw new Error(`ES cluster failed to come online with the ${sec} second timeout`);
}
if (error.message.startsWith('not ready,')) {
if (timeSinceStart > 10_000) {
this.log.warning(error.message);
}
} else {
this.log.warning(
`waiting for ES cluster to come online, attempt ${attempt} failed with: ${error.message}`
);
}
const waitSec = attempt * 1.5;
await new Promise((resolve) => setTimeout(resolve, waitSec * 1000));
}
}
}
private getJavaOptions(opts: string | undefined) {
let esJavaOpts = `${opts || ''} ${process.env.ES_JAVA_OPTS || ''}`;
// ES now automatically sets heap size to 50% of the machine's available memory

View file

@ -12,6 +12,7 @@ import * as extractConfig from '../utils/extract_config_files';
import * as dockerUtils from '../utils/docker';
import { createAnyInstanceSerializer, createStripAnsiSerializer } from '@kbn/jest-serializers';
import * as installUtils from '../install';
import * as waitClusterUtil from '../utils/wait_until_cluster_ready';
import { Cluster } from '../cluster';
import { ES_NOPASSWORD_P12_PATH } from '@kbn/dev-utils/src/certs';
import {
@ -20,8 +21,10 @@ import {
InstallSnapshotOptions,
InstallSourceOptions,
} from '../install/types';
import { Client } from '@elastic/elasticsearch';
expect.addSnapshotSerializer(createAnyInstanceSerializer(ToolingLog));
expect.addSnapshotSerializer(createAnyInstanceSerializer(Client));
expect.addSnapshotSerializer(createStripAnsiSerializer());
const log = new ToolingLog();
@ -101,6 +104,10 @@ jest.mock('../utils/docker', () => ({
runDockerContainer: jest.fn(),
}));
jest.mock('../utils/wait_until_cluster_ready', () => ({
waitUntilClusterReady: jest.fn(),
}));
const downloadSnapshotMock = jest.spyOn(installUtils, 'downloadSnapshot');
const installSourceMock = jest.spyOn(installUtils, 'installSource');
const installSnapshotMock = jest.spyOn(installUtils, 'installSnapshot');
@ -108,6 +115,7 @@ const installArchiveMock = jest.spyOn(installUtils, 'installArchive');
const extractConfigFilesMock = jest.spyOn(extractConfig, 'extractConfigFiles');
const runServerlessClusterMock = jest.spyOn(dockerUtils, 'runServerlessCluster');
const runDockerContainerMock = jest.spyOn(dockerUtils, 'runDockerContainer');
const waitUntilClusterReadyMock = jest.spyOn(waitClusterUtil, 'waitUntilClusterReady');
beforeEach(() => {
jest.resetAllMocks();
@ -366,6 +374,40 @@ describe('#start(installPath)', () => {
expect(fs.existsSync(writeLogsToPath)).toBe(true);
});
test('calls waitUntilClusterReady() by default', async () => {
mockEsBin({ start: true });
waitUntilClusterReadyMock.mockResolvedValue();
await new Cluster({ log }).start(installPath, esClusterExecOptions);
expect(waitUntilClusterReadyMock).toHaveBeenCalledTimes(1);
expect(waitUntilClusterReadyMock.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Object {
"client": <Client>,
"expectedStatus": "yellow",
"log": <ToolingLog>,
"readyTimeout": undefined,
},
]
`);
});
test(`doesn't call waitUntilClusterReady() if 'skipReadyCheck' is passed`, async () => {
mockEsBin({ start: true });
waitUntilClusterReadyMock.mockResolvedValue();
await new Cluster({ log }).start(installPath, { skipReadyCheck: true });
expect(waitUntilClusterReadyMock).toHaveBeenCalledTimes(0);
});
test(`rejects if waitUntilClusterReady() rejects`, async () => {
mockEsBin({ start: true });
waitUntilClusterReadyMock.mockRejectedValue(new Error('foo'));
await expect(
new Cluster({ log }).start(installPath, esClusterExecOptions)
).rejects.toThrowError('foo');
});
test('rejects if #start() was called previously', async () => {
mockEsBin({ start: true });
@ -750,18 +792,8 @@ describe('#runServerless()', () => {
waitForReady: true,
};
await cluster.runServerless(serverlessOptions);
expect(runServerlessClusterMock.mock.calls[0]).toMatchInlineSnapshot(`
Array [
<ToolingLog>,
Object {
"background": true,
"basePath": "${installPath}",
"clean": true,
"teardown": true,
"waitForReady": true,
},
]
`);
expect(runServerlessClusterMock.mock.calls[0][0]).toMatchInlineSnapshot(`<ToolingLog>`);
expect(runServerlessClusterMock.mock.calls[0][1]).toBe(serverlessOptions);
});
});
@ -810,17 +842,12 @@ describe('#runDocker()', () => {
});
test('passes through all options+log to #runDockerContainer()', async () => {
const options = { dockerCmd: 'start -a es01' };
runDockerContainerMock.mockResolvedValueOnce();
const cluster = new Cluster({ log });
await cluster.runDocker({ dockerCmd: 'start -a es01' });
expect(runDockerContainerMock.mock.calls[0]).toMatchInlineSnapshot(`
Array [
<ToolingLog>,
Object {
"dockerCmd": "start -a es01",
},
]
`);
await cluster.runDocker(options);
expect(runDockerContainerMock.mock.calls[0][0]).toMatchInlineSnapshot(`<ToolingLog>`);
expect(runDockerContainerMock.mock.calls[0][1]).toBe(options);
});
});

View file

@ -33,6 +33,7 @@ import {
import { ToolingLog, ToolingLogCollectingWriter } from '@kbn/tooling-log';
import { ES_P12_PATH } from '@kbn/dev-utils';
import { ESS_CONFIG_PATH, ESS_RESOURCES_PATHS, ESS_SECRETS_PATH, ESS_JWKS_PATH } from '../paths';
import * as waitClusterUtil from './wait_until_cluster_ready';
jest.mock('execa');
const execa = jest.requireMock('execa');
@ -42,6 +43,10 @@ jest.mock('@elastic/elasticsearch', () => {
};
});
jest.mock('./wait_until_cluster_ready', () => ({
waitUntilClusterReady: jest.fn(),
}));
const log = new ToolingLog();
const logWriter = new ToolingLogCollectingWriter();
log.setWriters([logWriter]);
@ -51,6 +56,8 @@ const baseEsPath = `${KIBANA_ROOT}/.es`;
const serverlessDir = 'stateless';
const serverlessObjectStorePath = `${baseEsPath}/${serverlessDir}`;
const waitUntilClusterReadyMock = jest.spyOn(waitClusterUtil, 'waitUntilClusterReady');
beforeEach(() => {
jest.resetAllMocks();
log.indent(-log.getIndent());
@ -473,24 +480,18 @@ describe('runServerlessCluster()', () => {
// setupDocker execa calls then run three nodes and attach logger
expect(execa.mock.calls).toHaveLength(8);
});
describe('waitForReady', () => {
test('should wait for serverless nodes to be ready to serve requests', async () => {
mockFs({
[baseEsPath]: {},
});
execa.mockImplementation(() => Promise.resolve({ stdout: '' }));
const health = jest.fn();
jest
.requireMock('@elastic/elasticsearch')
.Client.mockImplementation(() => ({ cluster: { health } }));
health.mockImplementationOnce(() => Promise.reject()); // first call fails
health.mockImplementationOnce(() => Promise.resolve({ status: 'red' })); // second call return wrong status
health.mockImplementationOnce(() => Promise.resolve({ status: 'green' })); // then succeeds
test(`should wait for serverless nodes to return 'green' status`, async () => {
waitUntilClusterReadyMock.mockResolvedValue();
mockFs({
[baseEsPath]: {},
});
execa.mockImplementation(() => Promise.resolve({ stdout: '' }));
await runServerlessCluster(log, { basePath: baseEsPath, waitForReady: true });
expect(health).toHaveBeenCalledTimes(3);
}, 10000);
await runServerlessCluster(log, { basePath: baseEsPath, waitForReady: true });
expect(waitUntilClusterReadyMock).toHaveBeenCalledTimes(1);
expect(waitUntilClusterReadyMock.mock.calls[0][0].expectedStatus).toEqual('green');
expect(waitUntilClusterReadyMock.mock.calls[0][0].readyTimeout).toEqual(undefined);
});
});

View file

@ -349,7 +349,7 @@ export async function maybePullDockerImage(log: ToolingLog, image: string) {
stdio: ['ignore', 'inherit', 'pipe'],
}).catch(({ message }) => {
throw createCliError(
`Error pulling image. This is likely an issue authenticating with ${DOCKER_REGISTRY}.
`Error pulling image. This is likely an issue authenticating with ${DOCKER_REGISTRY}.
Visit ${chalk.bold.cyan('https://docker-auth.elastic.co/github_auth')} to login.
${message}`
@ -622,8 +622,7 @@ export async function runServerlessCluster(log: ToolingLog, options: ServerlessO
}
: {}),
});
await waitUntilClusterReady({ client, log });
log.success('ES is ready');
await waitUntilClusterReady({ client, expectedStatus: 'green', log });
}
if (options.teardown) {

View file

@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { ToolingLog, ToolingLogCollectingWriter } from '@kbn/tooling-log';
import { waitUntilClusterReady } from './wait_until_cluster_ready';
jest.mock('@elastic/elasticsearch', () => {
return {
Client: jest.fn(),
};
});
const log = new ToolingLog();
const logWriter = new ToolingLogCollectingWriter();
log.setWriters([logWriter]);
const health = jest.fn();
beforeEach(() => {
jest.resetAllMocks();
jest
.requireMock('@elastic/elasticsearch')
.Client.mockImplementation(() => ({ cluster: { health } }));
log.indent(-log.getIndent());
logWriter.messages.length = 0;
});
afterEach(() => {
jest.clearAllMocks();
});
describe('waitUntilClusterReady', () => {
test(`waits for node to return 'green' status`, async () => {
health.mockImplementationOnce(() => Promise.reject(new Error('foo')));
health.mockImplementationOnce(() => Promise.resolve({ status: 'red' }));
health.mockImplementationOnce(() => Promise.resolve({ status: 'yellow' }));
health.mockImplementationOnce(() => Promise.resolve({ status: 'green' })); // 4th call returns expected status
const client = new Client({});
await waitUntilClusterReady({ client, log, expectedStatus: 'green' });
expect(health).toHaveBeenCalledTimes(4);
expect(logWriter.messages).toMatchInlineSnapshot(`
Array [
" info waiting for ES cluster to report a green status",
" warn waiting for ES cluster to come online, attempt 1 failed with: foo",
" succ ES cluster is ready",
]
`);
}, 10000);
test(`waits for node to return 'yellow' status`, async () => {
health.mockImplementationOnce(() => Promise.reject(new Error('foo')));
health.mockImplementationOnce(() => Promise.resolve({ status: 'red' }));
health.mockImplementationOnce(() => Promise.resolve({ status: 'YELLOW' })); // 3rd call returns expected status
health.mockImplementationOnce(() => Promise.resolve({ status: 'yellow' }));
health.mockImplementationOnce(() => Promise.resolve({ status: 'green' }));
const client = new Client({});
await waitUntilClusterReady({ client, log, expectedStatus: 'yellow' });
expect(health).toHaveBeenCalledTimes(3);
expect(logWriter.messages).toMatchInlineSnapshot(`
Array [
" info waiting for ES cluster to report a yellow status",
" warn waiting for ES cluster to come online, attempt 1 failed with: foo",
" succ ES cluster is ready",
]
`);
}, 10000);
test(`rejects when 'readyTimeout' is exceeded`, async () => {
health.mockImplementationOnce(() => Promise.reject(new Error('foo')));
health.mockImplementationOnce(() => Promise.resolve({ status: 'red' }));
const client = new Client({});
await expect(
waitUntilClusterReady({ client, log, expectedStatus: 'yellow', readyTimeout: 1000 })
).rejects.toThrow('ES cluster failed to come online with the 1 second timeout');
});
});

View file

@ -7,38 +7,52 @@
*/
import { Client } from '@elastic/elasticsearch';
import { HealthStatus } from '@elastic/elasticsearch/lib/api/types';
import { ToolingLog } from '@kbn/tooling-log';
const DEFAULT_READY_TIMEOUT = 60 * 1000; // 1 minute
export type ClusterReadyStatus = 'green' | 'yellow';
export interface WaitOptions {
client: Client;
expectedStatus: ClusterReadyStatus;
log: ToolingLog;
readyTimeout?: number;
}
const checkStatus = (readyStatus: ClusterReadyStatus) => {
return readyStatus === 'yellow'
? (status: HealthStatus) => status.toLocaleLowerCase() !== 'red'
: (status: HealthStatus) => status.toLocaleLowerCase() === 'green';
};
/**
* General method to wait for the ES cluster status to be green
* General method to wait for the ES cluster status to be yellow or green
*/
export async function waitUntilClusterReady({
client,
expectedStatus,
log,
readyTimeout = DEFAULT_READY_TIMEOUT,
}: WaitOptions) {
let attempt = 0;
const start = Date.now();
log.info('waiting for ES cluster to report a green status');
log.info(`waiting for ES cluster to report a ${expectedStatus} status`);
const isReady = checkStatus(expectedStatus);
while (true) {
attempt += 1;
try {
const resp = await client.cluster.health();
if (resp.status === 'green') {
const status: HealthStatus = resp.status;
if (isReady(status)) {
log.success('ES cluster is ready');
return;
}
throw new Error(`not ready, cluster health is ${resp.status}`);
throw new Error(`not ready, cluster health is ${status}`);
} catch (error) {
const timeSinceStart = Date.now() - start;
if (timeSinceStart > readyTimeout) {