mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 09:19:04 -04:00
Use a shorter ES health check interval before initial green (#179023)
## Summary Use a shorter interval for Elasticsearch healthchecks before the first green status, to overall reduce the time spent waiting for ES when both Kibana and ES are starting at the same time.
This commit is contained in:
parent
7b79885fe2
commit
c0ad45f527
9 changed files with 180 additions and 97 deletions
|
@ -34,6 +34,7 @@ test('set correct defaults', () => {
|
|||
"compression": false,
|
||||
"customHeaders": Object {},
|
||||
"healthCheckDelay": "PT2.5S",
|
||||
"healthCheckStartupDelay": "PT0.5S",
|
||||
"hosts": Array [
|
||||
"http://localhost:9200",
|
||||
],
|
||||
|
|
|
@ -144,7 +144,10 @@ export const configSchema = schema.object({
|
|||
}
|
||||
),
|
||||
apiVersion: schema.string({ defaultValue: DEFAULT_API_VERSION }),
|
||||
healthCheck: schema.object({ delay: schema.duration({ defaultValue: 2500 }) }),
|
||||
healthCheck: schema.object({
|
||||
delay: schema.duration({ defaultValue: 2500 }),
|
||||
startupDelay: schema.duration({ defaultValue: 500 }),
|
||||
}),
|
||||
ignoreVersionMismatch: offeringBasedSchema({
|
||||
serverless: schema.boolean({ defaultValue: true }),
|
||||
traditional: schema.conditional(
|
||||
|
@ -301,10 +304,13 @@ export class ElasticsearchConfig implements IElasticsearchConfig {
|
|||
*/
|
||||
public readonly skipStartupConnectionCheck: boolean;
|
||||
/**
|
||||
* The interval between health check requests Kibana sends to the Elasticsearch.
|
||||
* The interval between health check requests Kibana sends to the Elasticsearch before the first green signal.
|
||||
*/
|
||||
public readonly healthCheckStartupDelay: Duration;
|
||||
/**
|
||||
* The interval between health check requests Kibana sends to the Elasticsearch after the first green signal.
|
||||
*/
|
||||
public readonly healthCheckDelay: Duration;
|
||||
|
||||
/**
|
||||
* Whether to allow kibana to connect to a non-compatible elasticsearch node.
|
||||
*/
|
||||
|
@ -435,6 +441,7 @@ export class ElasticsearchConfig implements IElasticsearchConfig {
|
|||
this.sniffOnConnectionFault = rawConfig.sniffOnConnectionFault;
|
||||
this.sniffInterval = rawConfig.sniffInterval;
|
||||
this.healthCheckDelay = rawConfig.healthCheck.delay;
|
||||
this.healthCheckStartupDelay = rawConfig.healthCheck.startupDelay;
|
||||
this.username = rawConfig.username;
|
||||
this.password = rawConfig.password;
|
||||
this.serviceAccountToken = rawConfig.serviceAccountToken;
|
||||
|
@ -466,6 +473,10 @@ const readKeyAndCerts = (rawConfig: ElasticsearchConfigType) => {
|
|||
let certificate: string | undefined;
|
||||
let certificateAuthorities: string[] | undefined;
|
||||
|
||||
const readFile = (file: string) => {
|
||||
return readFileSync(file, 'utf8');
|
||||
};
|
||||
|
||||
const addCAs = (ca: string[] | undefined) => {
|
||||
if (ca && ca.length) {
|
||||
certificateAuthorities = [...(certificateAuthorities || []), ...ca];
|
||||
|
@ -522,7 +533,3 @@ const readKeyAndCerts = (rawConfig: ElasticsearchConfigType) => {
|
|||
certificateAuthorities,
|
||||
};
|
||||
};
|
||||
|
||||
const readFile = (file: string) => {
|
||||
return readFileSync(file, 'utf8');
|
||||
};
|
||||
|
|
|
@ -69,6 +69,7 @@ beforeEach(() => {
|
|||
hosts: ['http://1.2.3.4'],
|
||||
healthCheck: {
|
||||
delay: duration(10),
|
||||
startupDelay: duration(10),
|
||||
},
|
||||
ssl: {
|
||||
verificationMode: 'none',
|
||||
|
@ -182,6 +183,7 @@ describe('#preboot', () => {
|
|||
expect(config).toMatchInlineSnapshot(`
|
||||
Object {
|
||||
"healthCheckDelay": "PT0.01S",
|
||||
"healthCheckStartupDelay": "PT0.01S",
|
||||
"hosts": Array [
|
||||
"http://8.8.8.8",
|
||||
],
|
||||
|
@ -400,6 +402,7 @@ describe('#start', () => {
|
|||
expect(config).toMatchInlineSnapshot(`
|
||||
Object {
|
||||
"healthCheckDelay": "PT0.01S",
|
||||
"healthCheckStartupDelay": "PT0.01S",
|
||||
"hosts": Array [
|
||||
"http://8.8.8.8",
|
||||
],
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
*/
|
||||
|
||||
import { firstValueFrom, Observable, Subject } from 'rxjs';
|
||||
import { map, shareReplay, takeUntil } from 'rxjs/operators';
|
||||
import { map, takeUntil } from 'rxjs/operators';
|
||||
|
||||
import type { Logger } from '@kbn/logging';
|
||||
import type { CoreContext, CoreService } from '@kbn/core-base-server-internal';
|
||||
|
@ -98,12 +98,13 @@ export class ElasticsearchService
|
|||
this.client = this.createClusterClient('data', config);
|
||||
|
||||
const esNodesCompatibility$ = pollEsNodesVersion({
|
||||
internalClient: this.client.asInternalUser,
|
||||
log: this.log,
|
||||
ignoreVersionMismatch: config.ignoreVersionMismatch,
|
||||
esVersionCheckInterval: config.healthCheckDelay.asMilliseconds(),
|
||||
kibanaVersion: this.kibanaVersion,
|
||||
}).pipe(takeUntil(this.stop$), shareReplay({ refCount: true, bufferSize: 1 }));
|
||||
ignoreVersionMismatch: config.ignoreVersionMismatch,
|
||||
healthCheckInterval: config.healthCheckDelay.asMilliseconds(),
|
||||
healthCheckStartupInterval: config.healthCheckStartupDelay.asMilliseconds(),
|
||||
log: this.log,
|
||||
internalClient: this.client.asInternalUser,
|
||||
}).pipe(takeUntil(this.stop$));
|
||||
|
||||
this.esNodesCompatibility$ = esNodesCompatibility$;
|
||||
|
||||
|
|
|
@ -131,10 +131,6 @@ describe('mapNodesVersionCompatibility', () => {
|
|||
|
||||
describe('pollEsNodesVersion', () => {
|
||||
let internalClient: ReturnType<typeof elasticsearchClientMock.createInternalClient>;
|
||||
const getTestScheduler = () =>
|
||||
new TestScheduler((actual, expected) => {
|
||||
expect(actual).toEqual(expected);
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
internalClient = elasticsearchClientMock.createInternalClient();
|
||||
|
@ -159,7 +155,7 @@ describe('pollEsNodesVersion', () => {
|
|||
|
||||
pollEsNodesVersion({
|
||||
internalClient,
|
||||
esVersionCheckInterval: 1,
|
||||
healthCheckInterval: 1,
|
||||
ignoreVersionMismatch: false,
|
||||
kibanaVersion: KIBANA_VERSION,
|
||||
log: mockLogger,
|
||||
|
@ -186,7 +182,7 @@ describe('pollEsNodesVersion', () => {
|
|||
|
||||
pollEsNodesVersion({
|
||||
internalClient,
|
||||
esVersionCheckInterval: 1,
|
||||
healthCheckInterval: 1,
|
||||
ignoreVersionMismatch: false,
|
||||
kibanaVersion: KIBANA_VERSION,
|
||||
log: mockLogger,
|
||||
|
@ -217,7 +213,7 @@ describe('pollEsNodesVersion', () => {
|
|||
|
||||
pollEsNodesVersion({
|
||||
internalClient,
|
||||
esVersionCheckInterval: 1,
|
||||
healthCheckInterval: 1,
|
||||
ignoreVersionMismatch: false,
|
||||
kibanaVersion: KIBANA_VERSION,
|
||||
log: mockLogger,
|
||||
|
@ -252,7 +248,7 @@ describe('pollEsNodesVersion', () => {
|
|||
|
||||
pollEsNodesVersion({
|
||||
internalClient,
|
||||
esVersionCheckInterval: 1,
|
||||
healthCheckInterval: 1,
|
||||
ignoreVersionMismatch: false,
|
||||
kibanaVersion: KIBANA_VERSION,
|
||||
log: mockLogger,
|
||||
|
@ -276,7 +272,7 @@ describe('pollEsNodesVersion', () => {
|
|||
|
||||
pollEsNodesVersion({
|
||||
internalClient,
|
||||
esVersionCheckInterval: 1,
|
||||
healthCheckInterval: 1,
|
||||
ignoreVersionMismatch: false,
|
||||
kibanaVersion: KIBANA_VERSION,
|
||||
log: mockLogger,
|
||||
|
@ -302,7 +298,7 @@ describe('pollEsNodesVersion', () => {
|
|||
|
||||
pollEsNodesVersion({
|
||||
internalClient,
|
||||
esVersionCheckInterval: 1,
|
||||
healthCheckInterval: 1,
|
||||
ignoreVersionMismatch: false,
|
||||
kibanaVersion: KIBANA_VERSION,
|
||||
log: mockLogger,
|
||||
|
@ -315,77 +311,114 @@ describe('pollEsNodesVersion', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('starts polling immediately and then every esVersionCheckInterval', () => {
|
||||
expect.assertions(1);
|
||||
|
||||
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
|
||||
internalClient.nodes.info.mockReturnValueOnce([createNodes('5.1.0', '5.2.0', '5.0.0')]);
|
||||
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
|
||||
internalClient.nodes.info.mockReturnValueOnce([createNodes('5.1.1', '5.2.0', '5.0.0')]);
|
||||
|
||||
getTestScheduler().run(({ expectObservable }) => {
|
||||
const expected = 'a 99ms (b|)';
|
||||
|
||||
const esNodesCompatibility$ = pollEsNodesVersion({
|
||||
internalClient,
|
||||
esVersionCheckInterval: 100,
|
||||
ignoreVersionMismatch: false,
|
||||
kibanaVersion: KIBANA_VERSION,
|
||||
log: mockLogger,
|
||||
}).pipe(take(2));
|
||||
|
||||
expectObservable(esNodesCompatibility$).toBe(expected, {
|
||||
a: mapNodesVersionCompatibility(
|
||||
createNodes('5.1.0', '5.2.0', '5.0.0'),
|
||||
KIBANA_VERSION,
|
||||
false
|
||||
),
|
||||
b: mapNodesVersionCompatibility(
|
||||
createNodes('5.1.1', '5.2.0', '5.0.0'),
|
||||
KIBANA_VERSION,
|
||||
false
|
||||
),
|
||||
describe('marble testing', () => {
|
||||
const getTestScheduler = () =>
|
||||
new TestScheduler((actual, expected) => {
|
||||
expect(actual).toEqual(expected);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('waits for es version check requests to complete before scheduling the next one', () => {
|
||||
expect.assertions(2);
|
||||
const mockTestSchedulerInfoResponseOnce = (infos: NodesInfo) => {
|
||||
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
|
||||
internalClient.nodes.info.mockReturnValueOnce([infos]);
|
||||
};
|
||||
|
||||
getTestScheduler().run(({ expectObservable }) => {
|
||||
const expected = '100ms a 99ms (b|)';
|
||||
it('starts polling immediately and then every healthCheckInterval', () => {
|
||||
expect.assertions(1);
|
||||
|
||||
internalClient.nodes.info.mockReturnValueOnce(
|
||||
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
|
||||
of(createNodes('5.1.0', '5.2.0', '5.0.0')).pipe(delay(100))
|
||||
);
|
||||
internalClient.nodes.info.mockReturnValueOnce(
|
||||
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
|
||||
of(createNodes('5.1.1', '5.2.0', '5.0.0')).pipe(delay(100))
|
||||
);
|
||||
mockTestSchedulerInfoResponseOnce(createNodes('5.1.0', '5.2.0', '5.0.0'));
|
||||
mockTestSchedulerInfoResponseOnce(createNodes('5.1.1', '5.2.0', '5.0.0'));
|
||||
|
||||
const esNodesCompatibility$ = pollEsNodesVersion({
|
||||
internalClient,
|
||||
esVersionCheckInterval: 10,
|
||||
ignoreVersionMismatch: false,
|
||||
kibanaVersion: KIBANA_VERSION,
|
||||
log: mockLogger,
|
||||
}).pipe(take(2));
|
||||
getTestScheduler().run(({ expectObservable }) => {
|
||||
const expected = 'a 99ms (b|)';
|
||||
|
||||
expectObservable(esNodesCompatibility$).toBe(expected, {
|
||||
a: mapNodesVersionCompatibility(
|
||||
createNodes('5.1.0', '5.2.0', '5.0.0'),
|
||||
KIBANA_VERSION,
|
||||
false
|
||||
),
|
||||
b: mapNodesVersionCompatibility(
|
||||
createNodes('5.1.1', '5.2.0', '5.0.0'),
|
||||
KIBANA_VERSION,
|
||||
false
|
||||
),
|
||||
const esNodesCompatibility$ = pollEsNodesVersion({
|
||||
internalClient,
|
||||
healthCheckInterval: 100,
|
||||
ignoreVersionMismatch: false,
|
||||
kibanaVersion: KIBANA_VERSION,
|
||||
log: mockLogger,
|
||||
}).pipe(take(2));
|
||||
|
||||
expectObservable(esNodesCompatibility$).toBe(expected, {
|
||||
a: mapNodesVersionCompatibility(
|
||||
createNodes('5.1.0', '5.2.0', '5.0.0'),
|
||||
KIBANA_VERSION,
|
||||
false
|
||||
),
|
||||
b: mapNodesVersionCompatibility(
|
||||
createNodes('5.1.1', '5.2.0', '5.0.0'),
|
||||
KIBANA_VERSION,
|
||||
false
|
||||
),
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
expect(internalClient.nodes.info).toHaveBeenCalledTimes(2);
|
||||
it('waits for es version check requests to complete before scheduling the next one', () => {
|
||||
expect.assertions(2);
|
||||
|
||||
getTestScheduler().run(({ expectObservable }) => {
|
||||
const expected = '100ms a 99ms (b|)';
|
||||
|
||||
internalClient.nodes.info.mockReturnValueOnce(
|
||||
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
|
||||
of(createNodes('5.1.0', '5.2.0', '5.0.0')).pipe(delay(100))
|
||||
);
|
||||
internalClient.nodes.info.mockReturnValueOnce(
|
||||
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
|
||||
of(createNodes('5.1.1', '5.2.0', '5.0.0')).pipe(delay(100))
|
||||
);
|
||||
|
||||
const esNodesCompatibility$ = pollEsNodesVersion({
|
||||
internalClient,
|
||||
healthCheckInterval: 10,
|
||||
ignoreVersionMismatch: false,
|
||||
kibanaVersion: KIBANA_VERSION,
|
||||
log: mockLogger,
|
||||
}).pipe(take(2));
|
||||
|
||||
expectObservable(esNodesCompatibility$).toBe(expected, {
|
||||
a: mapNodesVersionCompatibility(
|
||||
createNodes('5.1.0', '5.2.0', '5.0.0'),
|
||||
KIBANA_VERSION,
|
||||
false
|
||||
),
|
||||
b: mapNodesVersionCompatibility(
|
||||
createNodes('5.1.1', '5.2.0', '5.0.0'),
|
||||
KIBANA_VERSION,
|
||||
false
|
||||
),
|
||||
});
|
||||
});
|
||||
|
||||
expect(internalClient.nodes.info).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('switch from startup interval to normal interval after first green status', () => {
|
||||
expect.assertions(1);
|
||||
|
||||
mockTestSchedulerInfoResponseOnce(createNodes('6.3.0'));
|
||||
mockTestSchedulerInfoResponseOnce(createNodes('5.1.0'));
|
||||
mockTestSchedulerInfoResponseOnce(createNodes('5.2.0'));
|
||||
mockTestSchedulerInfoResponseOnce(createNodes('5.3.0'));
|
||||
|
||||
getTestScheduler().run(({ expectObservable }) => {
|
||||
const esNodesCompatibility$ = pollEsNodesVersion({
|
||||
internalClient,
|
||||
healthCheckInterval: 100,
|
||||
healthCheckStartupInterval: 50,
|
||||
ignoreVersionMismatch: false,
|
||||
kibanaVersion: KIBANA_VERSION,
|
||||
log: mockLogger,
|
||||
}).pipe(take(4));
|
||||
|
||||
expectObservable(esNodesCompatibility$).toBe('a 49ms b 99ms c 99ms (d|)', {
|
||||
a: expect.any(Object),
|
||||
b: expect.any(Object),
|
||||
c: expect.any(Object),
|
||||
d: expect.any(Object),
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -11,8 +11,17 @@
|
|||
* that defined in Kibana's package.json.
|
||||
*/
|
||||
|
||||
import { timer, of, from, Observable } from 'rxjs';
|
||||
import { map, distinctUntilChanged, catchError, exhaustMap } from 'rxjs/operators';
|
||||
import { interval, of, from, Observable, BehaviorSubject } from 'rxjs';
|
||||
import {
|
||||
map,
|
||||
distinctUntilChanged,
|
||||
catchError,
|
||||
exhaustMap,
|
||||
switchMap,
|
||||
tap,
|
||||
startWith,
|
||||
shareReplay,
|
||||
} from 'rxjs/operators';
|
||||
import type { Logger } from '@kbn/logging';
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import {
|
||||
|
@ -26,7 +35,8 @@ export interface PollEsNodesVersionOptions {
|
|||
log: Logger;
|
||||
kibanaVersion: string;
|
||||
ignoreVersionMismatch: boolean;
|
||||
esVersionCheckInterval: number;
|
||||
healthCheckInterval: number;
|
||||
healthCheckStartupInterval?: number;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
|
@ -149,10 +159,26 @@ export const pollEsNodesVersion = ({
|
|||
log,
|
||||
kibanaVersion,
|
||||
ignoreVersionMismatch,
|
||||
esVersionCheckInterval: healthCheckInterval,
|
||||
healthCheckInterval,
|
||||
healthCheckStartupInterval,
|
||||
}: PollEsNodesVersionOptions): Observable<NodesVersionCompatibility> => {
|
||||
log.debug('Checking Elasticsearch version');
|
||||
return timer(0, healthCheckInterval).pipe(
|
||||
|
||||
const hasStartupInterval =
|
||||
healthCheckStartupInterval !== undefined && healthCheckStartupInterval !== healthCheckInterval;
|
||||
|
||||
const isStartup$ = new BehaviorSubject(hasStartupInterval);
|
||||
|
||||
const checkInterval$ = isStartup$.pipe(
|
||||
distinctUntilChanged(),
|
||||
map((useStartupInterval) =>
|
||||
useStartupInterval ? healthCheckStartupInterval! : healthCheckInterval
|
||||
)
|
||||
);
|
||||
|
||||
return checkInterval$.pipe(
|
||||
switchMap((checkInterval) => interval(checkInterval)),
|
||||
startWith(0),
|
||||
exhaustMap(() => {
|
||||
return from(
|
||||
internalClient.nodes.info({
|
||||
|
@ -164,9 +190,16 @@ export const pollEsNodesVersion = ({
|
|||
})
|
||||
);
|
||||
}),
|
||||
map((nodesInfoResponse: NodesInfo & { nodesInfoRequestError?: Error }) =>
|
||||
mapNodesVersionCompatibility(nodesInfoResponse, kibanaVersion, ignoreVersionMismatch)
|
||||
),
|
||||
distinctUntilChanged(compareNodes) // Only emit if there are new nodes or versions or if we return an error and that error changes
|
||||
map((nodesInfoResponse: NodesInfo & { nodesInfoRequestError?: Error }) => {
|
||||
return mapNodesVersionCompatibility(nodesInfoResponse, kibanaVersion, ignoreVersionMismatch);
|
||||
}),
|
||||
// Only emit if there are new nodes or versions or if we return an error and that error changes
|
||||
distinctUntilChanged(compareNodes),
|
||||
tap((nodesVersionCompatibility) => {
|
||||
if (nodesVersionCompatibility.isCompatible) {
|
||||
isStartup$.next(false);
|
||||
}
|
||||
}),
|
||||
shareReplay({ refCount: true, bufferSize: 1 })
|
||||
);
|
||||
};
|
||||
|
|
|
@ -14,7 +14,11 @@ import type { ElasticsearchApiToRedactInLogs } from './client';
|
|||
*/
|
||||
export interface IElasticsearchConfig {
|
||||
/**
|
||||
* The interval between health check requests Kibana sends to the Elasticsearch.
|
||||
* The interval between health check requests Kibana sends to the Elasticsearch before the first green signal.
|
||||
*/
|
||||
readonly healthCheckStartupDelay: Duration;
|
||||
/**
|
||||
* The interval between health check requests Kibana sends to the Elasticsearch after the first green signal.
|
||||
*/
|
||||
readonly healthCheckDelay: Duration;
|
||||
|
||||
|
|
|
@ -393,7 +393,7 @@ export class ElasticsearchService {
|
|||
log: this.logger,
|
||||
kibanaVersion: this.kibanaVersion,
|
||||
ignoreVersionMismatch: false,
|
||||
esVersionCheckInterval: -1, // Passing a negative number here will result in immediate completion after the first value is emitted
|
||||
healthCheckInterval: -1, // Passing a negative number here will result in immediate completion after the first value is emitted
|
||||
})
|
||||
);
|
||||
}
|
||||
|
|
|
@ -61,6 +61,7 @@ describe('config schema', () => {
|
|||
"customHeaders": Object {},
|
||||
"healthCheck": Object {
|
||||
"delay": "PT2.5S",
|
||||
"startupDelay": "PT0.5S",
|
||||
},
|
||||
"idleSocketTimeout": "PT1M",
|
||||
"ignoreVersionMismatch": false,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue