[Health Gateway] Update response aggregation (#145761)

This commit is contained in:
Michael Dokolin 2022-11-25 10:56:36 +01:00 committed by GitHub
parent a287ea449b
commit 2728ee359d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 389 additions and 142 deletions

View file

@ -9,7 +9,8 @@
import type { IConfigService } from '@kbn/config';
import type { Logger, LoggerFactory } from '@kbn/logging';
import { ServerStart } from '../server';
import { createRootRoute } from './routes';
import { KibanaConfig } from './kibana_config';
import { RootRoute } from './routes';
interface KibanaServiceStartDependencies {
server: ServerStart;
@ -25,15 +26,15 @@ interface KibanaServiceDependencies {
*/
export class KibanaService {
private readonly logger: Logger;
private readonly config: IConfigService;
private readonly kibanaConfig: KibanaConfig;
constructor({ logger, config }: KibanaServiceDependencies) {
this.logger = logger.get('kibana-service');
this.config = config;
this.kibanaConfig = new KibanaConfig({ config, logger: this.logger });
}
async start({ server }: KibanaServiceStartDependencies) {
server.addRoute(createRootRoute({ config: this.config, logger: this.logger }));
server.addRoute(new RootRoute(this.kibanaConfig, this.logger));
}
stop() {

View file

@ -6,4 +6,4 @@
* Side Public License, v 1.
*/
export { createRootRoute } from './root';
export { RootRoute } from './root';

View file

@ -0,0 +1,252 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Server } from '@hapi/hapi';
import { duration } from 'moment';
import fetch, { Response } from 'node-fetch';
import { loggerMock, MockedLogger } from '@kbn/logging-mocks';
import type { KibanaConfig } from '../kibana_config';
import { RootRoute } from './root';
describe('RootRoute', () => {
let kibanaConfig: KibanaConfig;
let logger: MockedLogger;
let server: Server;
beforeAll(async () => {
jest.spyOn(await import('node-fetch'), 'default');
});
beforeEach(async () => {
kibanaConfig = {
hosts: ['http://localhost:5601'],
requestTimeout: duration(60, 's'),
} as unknown as typeof kibanaConfig;
logger = loggerMock.create();
server = new Server();
server.route(new RootRoute(kibanaConfig, logger));
await server.initialize();
});
afterEach(() => {
jest.clearAllMocks();
});
afterAll(() => {
jest.restoreAllMocks();
});
describe('handler', () => {
const ok = { status: 200 };
const noContent = { status: 204 };
const found = { status: 302 };
const badRequest = { status: 400 };
const unauthorized = { status: 401, headers: { 'www-authenticate': '' } };
const forbidden = { status: 403 };
const notFound = { status: 404 };
const serverError = { status: 500 };
const badGateway = { status: 502 };
const unavailable = { status: 503 };
const timeout = { status: 504 };
it.each`
config | status | code
${ok} | ${'healthy'} | ${200}
${noContent} | ${'healthy'} | ${200}
${found} | ${'healthy'} | ${200}
${unauthorized} | ${'healthy'} | ${200}
${forbidden} | ${'unhealthy'} | ${503}
${notFound} | ${'unhealthy'} | ${503}
${badRequest} | ${'unhealthy'} | ${503}
${serverError} | ${'unhealthy'} | ${503}
${badGateway} | ${'unhealthy'} | ${503}
${unavailable} | ${'unhealthy'} | ${503}
${timeout} | ${'unhealthy'} | ${503}
`(
"should return '$status' with $code when Kibana host returns $config.status",
async ({ config, status, code }) => {
(fetch as jest.MockedFunction<typeof fetch>).mockResolvedValueOnce(
new Response('', config)
);
const response = server.inject({
method: 'get',
url: '/',
});
await expect(response).resolves.toEqual(
expect.objectContaining({
statusCode: code,
result: expect.objectContaining({
status,
hosts: [
expect.objectContaining({
status,
code: config.status,
host: 'http://localhost:5601',
}),
],
}),
})
);
}
);
it("should return 'failure' with 502 when `fetch` throws an error", async () => {
(fetch as jest.MockedFunction<typeof fetch>).mockRejectedValueOnce(new Error('Fetch Error'));
const response = server.inject({
method: 'get',
url: '/',
});
await expect(response).resolves.toEqual(
expect.objectContaining({
statusCode: 502,
result: expect.objectContaining({
status: 'failure',
hosts: [
expect.objectContaining({
status: 'failure',
host: 'http://localhost:5601',
}),
],
}),
})
);
});
it("should return 'timeout' with 504 when `fetch` timeouts", async () => {
(fetch as jest.MockedFunction<typeof fetch>).mockImplementationOnce(
(url, { signal } = {}) => {
return new Promise((resolve, reject) => {
signal?.addEventListener('abort', () => {
reject(new DOMException('Fetch Aborted', 'AbortError'));
});
jest.advanceTimersByTime(60000);
});
}
);
jest.useFakeTimers({ doNotFake: ['nextTick'] });
const response = server.inject({
method: 'get',
url: '/',
});
try {
await expect(response).resolves.toEqual(
expect.objectContaining({
statusCode: 504,
result: expect.objectContaining({
status: 'timeout',
hosts: [
expect.objectContaining({
status: 'timeout',
host: 'http://localhost:5601',
}),
],
}),
})
);
} finally {
jest.useRealTimers();
}
});
it("should always return 'healthy' when there are no hosts", async () => {
kibanaConfig.hosts.splice(0);
const response = server.inject({
method: 'get',
url: '/',
});
await expect(response).resolves.toEqual(
expect.objectContaining({
statusCode: 200,
result: expect.objectContaining({
status: 'healthy',
hosts: [],
}),
})
);
expect(fetch).not.toHaveBeenCalled();
});
it("should return 'healthy' only when all the hosts healthy", async () => {
kibanaConfig.hosts.push('http://localhost:5602');
(fetch as jest.MockedFunction<typeof fetch>)
.mockResolvedValueOnce(new Response('', ok))
.mockResolvedValueOnce(new Response('', unauthorized));
const response = server.inject({
method: 'get',
url: '/',
});
await expect(response).resolves.toEqual(
expect.objectContaining({
statusCode: 200,
result: expect.objectContaining({
status: 'healthy',
hosts: expect.arrayContaining([
expect.objectContaining({
status: 'healthy',
code: ok.status,
host: 'http://localhost:5601',
}),
expect.objectContaining({
status: 'healthy',
code: unauthorized.status,
host: 'http://localhost:5602',
}),
]),
}),
})
);
});
it("should return 'unhealthy' when at least one host is not healthy", async () => {
kibanaConfig.hosts.push('http://localhost:5602');
(fetch as jest.MockedFunction<typeof fetch>)
.mockResolvedValueOnce(new Response('', ok))
.mockResolvedValueOnce(new Response('', serverError));
const response = server.inject({
method: 'get',
url: '/',
});
await expect(response).resolves.toEqual(
expect.objectContaining({
statusCode: 503,
result: expect.objectContaining({
status: 'unhealthy',
hosts: expect.arrayContaining([
expect.objectContaining({
status: 'healthy',
code: ok.status,
host: 'http://localhost:5601',
}),
expect.objectContaining({
status: 'unhealthy',
code: serverError.status,
host: 'http://localhost:5602',
}),
]),
}),
})
);
});
});
});

View file

@ -6,164 +6,158 @@
* Side Public License, v 1.
*/
import https from 'https';
import { capitalize, chain, memoize, pick } from 'lodash';
import { Agent, AgentOptions } from 'https';
import { URL } from 'url';
import type { Request, ResponseToolkit } from '@hapi/hapi';
import nodeFetch, { RequestInit, Response } from 'node-fetch';
import type { IConfigService } from '@kbn/config';
import type { Request, ResponseObject, ResponseToolkit, ServerRoute } from '@hapi/hapi';
import nodeFetch, { Response } from 'node-fetch';
import type { Logger } from '@kbn/logging';
import { KibanaConfig } from '../kibana_config';
import type { KibanaConfig } from '../kibana_config';
const HTTPS = 'https:';
type Status = 'healthy' | 'unhealthy' | 'failure' | 'timeout';
const GATEWAY_ROOT_ROUTE = '/';
const KIBANA_ROOT_ROUTE = '/';
interface RootRouteDependencies {
logger: Logger;
config: IConfigService;
interface RootRouteResponse {
status: Status;
hosts?: HostStatus[];
}
type Fetch = (path: string) => Promise<Response>;
export function createRootRoute({ config, logger }: RootRouteDependencies) {
const kibanaConfig = new KibanaConfig({ config, logger });
const fetch = configureFetch(kibanaConfig);
return {
method: 'GET',
path: GATEWAY_ROOT_ROUTE,
handler: async (req: Request, h: ResponseToolkit) => {
const responses = await fetchKibanaRoots({ fetch, kibanaConfig, logger });
const { body, statusCode } = mergeResponses(responses);
logger.debug(`Returning ${statusCode} response with body: ${JSON.stringify(body)}`);
return h.response(body).type('application/json').code(statusCode);
},
};
interface HostStatus {
host: string;
status: Status;
code?: number;
}
async function fetchKibanaRoots({
fetch,
kibanaConfig,
logger,
}: {
fetch: Fetch;
kibanaConfig: KibanaConfig;
logger: Logger;
}) {
const responses = await Promise.allSettled(
kibanaConfig.hosts.map(async (host) => {
logger.debug(`Fetching response from ${host}${KIBANA_ROOT_ROUTE}`);
return fetch(`${host}${KIBANA_ROOT_ROUTE}`);
})
);
responses.forEach((response, index) => {
const host = `${kibanaConfig.hosts[index]}${KIBANA_ROOT_ROUTE}`;
if (response.status !== 'rejected') {
logger.debug(`Got response from ${host}: ${JSON.stringify(response.value.status)}`);
return;
}
if (response.reason instanceof Error) {
logger.error(response.reason);
}
if (response.reason instanceof Error && response.reason.name === 'AbortError') {
logger.error(`Request timeout for ${host}`);
return;
}
logger.error(
`No response from ${host}: ${
response.reason instanceof Error ? response.reason.message : JSON.stringify(response.reason)
}`
);
});
return responses;
}
function mergeResponses(
responses: Array<PromiseFulfilledResult<Response> | PromiseRejectedResult>
) {
const hasUnhealthyResponse = responses.some(isUnhealthyResponse);
return {
body: {}, // The control plane health check ignores the body, so we do the same
statusCode: hasUnhealthyResponse ? 503 : 200,
};
}
function isUnhealthyResponse(response: PromiseFulfilledResult<Response> | PromiseRejectedResult) {
return (
response.status === 'rejected' || !(isSuccess(response.value) || isUnauthorized(response.value))
);
}
function isUnauthorized({ status, headers }: Response): boolean {
return status === 401 && headers.has('www-authenticate');
}
function isSuccess({ status }: Response): boolean {
return (status >= 200 && status <= 299) || status === 302;
}
function generateAgentConfig(sslConfig: KibanaConfig['ssl']) {
const options: https.AgentOptions = {
ca: sslConfig.certificateAuthorities,
cert: sslConfig.certificate,
};
const verificationMode = sslConfig.verificationMode;
switch (verificationMode) {
case 'none':
options.rejectUnauthorized = false;
break;
case 'certificate':
options.rejectUnauthorized = true;
// by default, NodeJS is checking the server identify
options.checkServerIdentity = () => undefined;
break;
case 'full':
options.rejectUnauthorized = true;
break;
default:
throw new Error(`Unknown ssl verificationMode: ${verificationMode}`);
export class RootRoute implements ServerRoute {
private static isHealthy(response: Response) {
return RootRoute.isSuccess(response) || RootRoute.isUnauthorized(response);
}
return options;
}
private static isUnauthorized({ status, headers }: Response): boolean {
return status === 401 && headers.has('www-authenticate');
}
function configureFetch(kibanaConfig: KibanaConfig) {
let agent: https.Agent;
private static isSuccess({ status }: Response): boolean {
return (status >= 200 && status <= 299) || status === 302;
}
return async (url: string) => {
const { protocol } = new URL(url);
if (protocol === HTTPS && !agent) {
agent = new https.Agent(generateAgentConfig(kibanaConfig.ssl));
private static readonly POLL_ROUTE = '/';
private static readonly STATUS_CODE: Record<Status, number> = {
healthy: 200,
unhealthy: 503,
failure: 502,
timeout: 504,
};
readonly method = 'GET';
readonly path = '/';
constructor(private kibanaConfig: KibanaConfig, private logger: Logger) {
this.handler = this.handler.bind(this);
return pick(this, ['method', 'path', 'handler']) as RootRoute;
}
async handler(request: Request, toolkit: ResponseToolkit): Promise<ResponseObject> {
const body = await this.poll();
const code = RootRoute.STATUS_CODE[body.status];
this.logger.debug(`Returning ${code} response with body: ${JSON.stringify(body)}`);
return toolkit.response(body).type('application/json').code(code);
}
private async poll(): Promise<RootRouteResponse> {
const hosts = await Promise.all(this.kibanaConfig.hosts.map(this.pollHost.bind(this)));
const statuses = chain(hosts).map('status').uniq().value();
const status = statuses.length <= 1 ? statuses[0] ?? 'healthy' : 'unhealthy';
return {
status,
hosts,
};
}
private async pollHost(host: string): Promise<HostStatus> {
const url = `${host}${RootRoute.POLL_ROUTE}`;
this.logger.debug(`Requesting ${url}`);
try {
const response = await this.fetch(url);
const status = RootRoute.isHealthy(response) ? 'healthy' : 'unhealthy';
this.logger.debug(`${capitalize(status)} response from ${url} with code ${response.status}`);
return {
host,
status,
code: response.status,
};
} catch (error) {
this.logger.error(error);
if (error.name === 'AbortError') {
this.logger.error(`Request timeout for ${url}`);
return {
host,
status: 'timeout',
};
}
this.logger.error(`Failed response from ${url}: ${error.message}`);
return {
host,
status: 'failure',
};
}
}
private async fetch(url: string) {
const { protocol } = new URL(url);
const controller = new AbortController();
const timeoutId = setTimeout(
() => controller.abort(),
kibanaConfig.requestTimeout.asMilliseconds()
this.kibanaConfig.requestTimeout.asMilliseconds()
);
const fetchOptions: RequestInit = {
...(protocol === HTTPS && { agent }),
signal: controller.signal,
redirect: 'manual',
};
try {
return await nodeFetch(url, fetchOptions);
return await nodeFetch(url, {
agent: protocol === 'https:' ? this.getAgent() : undefined,
signal: controller.signal,
redirect: 'manual',
});
} finally {
clearTimeout(timeoutId);
}
};
}
private getAgent = memoize(() => new Agent(this.getAgentConfig()));
private getAgentConfig() {
const {
certificateAuthorities: ca,
certificate: cert,
verificationMode,
} = this.kibanaConfig.ssl;
const options: AgentOptions = { ca, cert };
switch (verificationMode) {
case 'none':
options.rejectUnauthorized = false;
break;
case 'certificate':
options.rejectUnauthorized = true;
// by default, NodeJS is checking the server identify
options.checkServerIdentity = () => undefined;
break;
case 'full':
options.rejectUnauthorized = true;
break;
default:
throw new Error(`Unknown ssl verificationMode: ${verificationMode}`);
}
return options;
}
}