Create HTTP Agent manager (#137748)

* Create HTTP Agent factory

* Properly extract agent options

* Use independent Agent for preboot

* Create AgentManager to obtain factories

* Make client type mandatory, fix outdated mocks

* Temporarily force new Agent creation

* Revert changes in utils

* Add correct defaults for Agent Options, support proxy agent.

* Forgot to push package.json

* Add hpagent dependency in BUILD.bazel

* Get rid of hpagent (proxy param is not exposed in kibana.yml)

* Remove hpagent from BUILD.bazel

* Use different agents for normal Vs scoped client

* Fix Agent constructor params

* Fix incorrect access to err.message

* Use separate Agent for scoped client

* Create different agents for std vs scoped

* Provide different Agent instances if config differs

* Create a new Agent for each ES Client

* Restructure agent store. Add UTs

* Remove obsolete comment

* Simplify AgentManager store structure (no type needed)

* Fine tune client_config return type

* Misc enhancements following PR comments

* Fix missing param in cli_setup/utils
This commit is contained in:
Gerard Soldevila 2022-09-14 11:00:06 +02:00 committed by GitHub
parent 0384ff8a52
commit a77f8ff052
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 418 additions and 39 deletions

View file

@ -9,6 +9,7 @@
export { ScopedClusterClient } from './src/scoped_cluster_client';
export { ClusterClient } from './src/cluster_client';
export { configureClient } from './src/configure_client';
export { AgentManager } from './src/agent_manager';
export { getRequestDebugMeta, getErrorMessage } from './src/log_query_and_deprecation';
export {
PRODUCT_RESPONSE_HEADER,

View file

@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { AgentManager } from './agent_manager';
import { Agent as HttpAgent } from 'http';
import { Agent as HttpsAgent } from 'https';
jest.mock('http');
jest.mock('https');
const HttpAgentMock = HttpAgent as jest.Mock<HttpAgent>;
const HttpsAgentMock = HttpsAgent as jest.Mock<HttpsAgent>;
describe('AgentManager', () => {
afterEach(() => {
HttpAgentMock.mockClear();
HttpsAgentMock.mockClear();
});
describe('#getAgentFactory()', () => {
it('provides factories which are different at each call', () => {
const agentManager = new AgentManager();
const agentFactory1 = agentManager.getAgentFactory();
const agentFactory2 = agentManager.getAgentFactory();
expect(agentFactory1).not.toEqual(agentFactory2);
});
describe('one agent factory', () => {
it('provides instances of the http and https Agent classes', () => {
const mockedHttpAgent = new HttpAgent();
HttpAgentMock.mockImplementationOnce(() => mockedHttpAgent);
const mockedHttpsAgent = new HttpsAgent();
HttpsAgentMock.mockImplementationOnce(() => mockedHttpsAgent);
const agentManager = new AgentManager();
const agentFactory = agentManager.getAgentFactory();
const httpAgent = agentFactory({ url: new URL('http://elastic-node-1:9200') });
const httpsAgent = agentFactory({ url: new URL('https://elastic-node-1:9200') });
expect(httpAgent).toEqual(mockedHttpAgent);
expect(httpsAgent).toEqual(mockedHttpsAgent);
});
it('provides Agents with a valid default configuration', () => {
const agentManager = new AgentManager();
const agentFactory = agentManager.getAgentFactory();
agentFactory({ url: new URL('http://elastic-node-1:9200') });
expect(HttpAgent).toBeCalledTimes(1);
expect(HttpAgent).toBeCalledWith({
keepAlive: true,
keepAliveMsecs: 50000,
maxFreeSockets: 256,
maxSockets: 256,
scheduling: 'lifo',
});
});
it('takes into account the provided configurations', () => {
const agentManager = new AgentManager({ maxFreeSockets: 32, maxSockets: 2048 });
const agentFactory = agentManager.getAgentFactory({
maxSockets: 1024,
scheduling: 'fifo',
});
agentFactory({ url: new URL('http://elastic-node-1:9200') });
expect(HttpAgent).toBeCalledTimes(1);
expect(HttpAgent).toBeCalledWith({
keepAlive: true,
keepAliveMsecs: 50000,
maxFreeSockets: 32,
maxSockets: 1024,
scheduling: 'fifo',
});
});
it('provides Agents that match the URLs protocol', () => {
const agentManager = new AgentManager();
const agentFactory = agentManager.getAgentFactory();
agentFactory({ url: new URL('http://elastic-node-1:9200') });
expect(HttpAgent).toHaveBeenCalledTimes(1);
expect(HttpsAgent).toHaveBeenCalledTimes(0);
agentFactory({ url: new URL('https://elastic-node-3:9200') });
expect(HttpAgent).toHaveBeenCalledTimes(1);
expect(HttpsAgent).toHaveBeenCalledTimes(1);
});
it('provides the same Agent iif URLs use the same protocol', () => {
const agentManager = new AgentManager();
const agentFactory = agentManager.getAgentFactory();
const agent1 = agentFactory({ url: new URL('http://elastic-node-1:9200') });
const agent2 = agentFactory({ url: new URL('http://elastic-node-2:9200') });
const agent3 = agentFactory({ url: new URL('https://elastic-node-3:9200') });
const agent4 = agentFactory({ url: new URL('https://elastic-node-4:9200') });
expect(agent1).toEqual(agent2);
expect(agent1).not.toEqual(agent3);
expect(agent3).toEqual(agent4);
});
it('dereferences an agent instance when the agent is closed', () => {
const agentManager = new AgentManager();
const agentFactory = agentManager.getAgentFactory();
const agent = agentFactory({ url: new URL('http://elastic-node-1:9200') });
// eslint-disable-next-line dot-notation
expect(agentManager['httpStore'].has(agent)).toEqual(true);
agent.destroy();
// eslint-disable-next-line dot-notation
expect(agentManager['httpStore'].has(agent)).toEqual(false);
});
});
describe('two agent factories', () => {
it('never provide the same Agent instance even if they use the same type', () => {
const agentManager = new AgentManager();
const agentFactory1 = agentManager.getAgentFactory();
const agentFactory2 = agentManager.getAgentFactory();
const agent1 = agentFactory1({ url: new URL('http://elastic-node-1:9200') });
const agent2 = agentFactory2({ url: new URL('http://elastic-node-1:9200') });
expect(agent1).not.toEqual(agent2);
});
});
});
});

View file

@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Agent as HttpAgent } from 'http';
import { Agent as HttpsAgent } from 'https';
import { ConnectionOptions, HttpAgentOptions } from '@elastic/elasticsearch';
const HTTPS = 'https:';
const DEFAULT_CONFIG: HttpAgentOptions = {
keepAlive: true,
keepAliveMsecs: 50000,
maxSockets: 256,
maxFreeSockets: 256,
scheduling: 'lifo',
};
export type NetworkAgent = HttpAgent | HttpsAgent;
export type AgentFactory = (connectionOpts: ConnectionOptions) => NetworkAgent;
/**
* Allows obtaining Agent factories, which can then be fed into elasticsearch-js's Client class.
* Ideally, we should obtain one Agent factory for each ES Client class.
* This allows using the same Agent across all the Pools and Connections of the Client (one per ES node).
*
* Agent instances are stored internally to allow collecting metrics (nbr of active/idle connections to ES).
*
* Using the same Agent factory across multiple ES Client instances is strongly discouraged, cause ES Client
* exposes methods that can modify the underlying pools, effectively impacting the connections of other Clients.
* @internal
**/
export class AgentManager {
// Stores Https Agent instances
private httpsStore: Set<HttpsAgent>;
// Stores Http Agent instances
private httpStore: Set<HttpAgent>;
constructor(private agentOptions: HttpAgentOptions = DEFAULT_CONFIG) {
this.httpsStore = new Set();
this.httpStore = new Set();
}
public getAgentFactory(agentOptions?: HttpAgentOptions): AgentFactory {
// a given agent factory always provides the same Agent instances (for the same protocol)
// we keep references to the instances at factory level, to be able to reuse them
let httpAgent: HttpAgent;
let httpsAgent: HttpsAgent;
return (connectionOpts: ConnectionOptions): NetworkAgent => {
if (connectionOpts.url.protocol === HTTPS) {
if (!httpsAgent) {
const config = Object.assign(
{},
DEFAULT_CONFIG,
this.agentOptions,
agentOptions,
connectionOpts.tls
);
httpsAgent = new HttpsAgent(config);
this.httpsStore.add(httpsAgent);
dereferenceOnDestroy(this.httpsStore, httpsAgent);
}
return httpsAgent;
}
if (!httpAgent) {
const config = Object.assign({}, DEFAULT_CONFIG, this.agentOptions, agentOptions);
httpAgent = new HttpAgent(config);
this.httpStore.add(httpAgent);
dereferenceOnDestroy(this.httpStore, httpAgent);
}
return httpAgent;
};
}
}
const dereferenceOnDestroy = (protocolStore: Set<NetworkAgent>, agent: NetworkAgent) => {
const doDestroy = agent.destroy.bind(agent);
agent.destroy = () => {
protocolStore.delete(agent);
doDestroy();
};
};

View file

@ -9,10 +9,12 @@
import { ConnectionOptions as TlsConnectionOptions } from 'tls';
import { URL } from 'url';
import { Duration } from 'moment';
import type { ClientOptions } from '@elastic/elasticsearch/lib/client';
import type { ClientOptions, HttpAgentOptions } from '@elastic/elasticsearch';
import type { ElasticsearchClientConfig } from '@kbn/core-elasticsearch-server';
import { DEFAULT_HEADERS } from './headers';
export type ParsedClientOptions = Omit<ClientOptions, 'agent'> & { agent: HttpAgentOptions };
/**
* Parse the client options from given client config and `scoped` flag.
*
@ -23,8 +25,8 @@ import { DEFAULT_HEADERS } from './headers';
export function parseClientOptions(
config: ElasticsearchClientConfig,
scoped: boolean
): ClientOptions {
const clientOptions: ClientOptions = {
): ParsedClientOptions {
const clientOptions: ParsedClientOptions = {
sniffOnStart: config.sniffOnStart,
sniffOnConnectionFault: config.sniffOnConnectionFault,
headers: {

View file

@ -17,6 +17,7 @@ import { httpServerMock, httpServiceMock } from '@kbn/core-http-server-mocks';
import type { ElasticsearchClientConfig } from '@kbn/core-elasticsearch-server';
import { ClusterClient } from './cluster_client';
import { DEFAULT_HEADERS } from './headers';
import { AgentManager } from './agent_manager';
const createConfig = (
parts: Partial<ElasticsearchClientConfig> = {}
@ -42,6 +43,7 @@ describe('ClusterClient', () => {
let authHeaders: ReturnType<typeof httpServiceMock.createAuthHeaderStorage>;
let internalClient: jest.Mocked<Client>;
let scopedClient: jest.Mocked<Client>;
let agentManager: AgentManager;
const mockTransport = { mockTransport: true };
@ -49,6 +51,7 @@ describe('ClusterClient', () => {
logger = loggingSystemMock.createLogger();
internalClient = createClient();
scopedClient = createClient();
agentManager = new AgentManager();
authHeaders = httpServiceMock.createAuthHeaderStorage();
authHeaders.get.mockImplementation(() => ({
@ -78,16 +81,19 @@ describe('ClusterClient', () => {
authHeaders,
type: 'custom-type',
getExecutionContext: getExecutionContextMock,
agentManager,
});
expect(configureClientMock).toHaveBeenCalledTimes(2);
expect(configureClientMock).toHaveBeenCalledWith(config, {
logger,
agentManager,
type: 'custom-type',
getExecutionContext: getExecutionContextMock,
});
expect(configureClientMock).toHaveBeenCalledWith(config, {
logger,
agentManager,
type: 'custom-type',
getExecutionContext: getExecutionContextMock,
scoped: true,
@ -101,6 +107,7 @@ describe('ClusterClient', () => {
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
expect(clusterClient.asInternalUser).toBe(internalClient);
@ -114,6 +121,7 @@ describe('ClusterClient', () => {
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = httpServerMock.createKibanaRequest();
@ -139,6 +147,7 @@ describe('ClusterClient', () => {
authHeaders,
getExecutionContext,
getUnauthorizedErrorHandler,
agentManager,
});
const request = httpServerMock.createKibanaRequest();
@ -161,6 +170,7 @@ describe('ClusterClient', () => {
authHeaders,
getExecutionContext,
getUnauthorizedErrorHandler,
agentManager,
});
const request = httpServerMock.createKibanaRequest();
@ -192,6 +202,7 @@ describe('ClusterClient', () => {
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = httpServerMock.createKibanaRequest();
@ -210,7 +221,13 @@ describe('ClusterClient', () => {
});
authHeaders.get.mockReturnValue({});
const clusterClient = new ClusterClient({ config, logger, type: 'custom-type', authHeaders });
const clusterClient = new ClusterClient({
config,
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = httpServerMock.createKibanaRequest({
headers: {
foo: 'bar',
@ -237,7 +254,13 @@ describe('ClusterClient', () => {
other: 'yep',
});
const clusterClient = new ClusterClient({ config, logger, type: 'custom-type', authHeaders });
const clusterClient = new ClusterClient({
config,
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = httpServerMock.createKibanaRequest({});
clusterClient.asScoped(request);
@ -264,7 +287,13 @@ describe('ClusterClient', () => {
other: 'yep',
});
const clusterClient = new ClusterClient({ config, logger, type: 'custom-type', authHeaders });
const clusterClient = new ClusterClient({
config,
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = httpServerMock.createKibanaRequest({
headers: {
authorization: 'override',
@ -296,7 +325,13 @@ describe('ClusterClient', () => {
});
authHeaders.get.mockReturnValue({});
const clusterClient = new ClusterClient({ config, logger, type: 'custom-type', authHeaders });
const clusterClient = new ClusterClient({
config,
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = httpServerMock.createKibanaRequest({});
clusterClient.asScoped(request);
@ -318,7 +353,13 @@ describe('ClusterClient', () => {
const config = createConfig();
authHeaders.get.mockReturnValue({});
const clusterClient = new ClusterClient({ config, logger, type: 'custom-type', authHeaders });
const clusterClient = new ClusterClient({
config,
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = httpServerMock.createKibanaRequest({
kibanaRequestState: { requestId: 'my-fake-id', requestUuid: 'ignore-this-id' },
});
@ -348,7 +389,13 @@ describe('ClusterClient', () => {
foo: 'auth',
});
const clusterClient = new ClusterClient({ config, logger, type: 'custom-type', authHeaders });
const clusterClient = new ClusterClient({
config,
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = httpServerMock.createKibanaRequest({});
clusterClient.asScoped(request);
@ -376,7 +423,13 @@ describe('ClusterClient', () => {
});
authHeaders.get.mockReturnValue({});
const clusterClient = new ClusterClient({ config, logger, type: 'custom-type', authHeaders });
const clusterClient = new ClusterClient({
config,
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = httpServerMock.createKibanaRequest({
headers: { foo: 'request' },
});
@ -405,7 +458,13 @@ describe('ClusterClient', () => {
});
authHeaders.get.mockReturnValue({});
const clusterClient = new ClusterClient({ config, logger, type: 'custom-type', authHeaders });
const clusterClient = new ClusterClient({
config,
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = httpServerMock.createKibanaRequest();
clusterClient.asScoped(request);
@ -428,7 +487,13 @@ describe('ClusterClient', () => {
});
authHeaders.get.mockReturnValue({});
const clusterClient = new ClusterClient({ config, logger, type: 'custom-type', authHeaders });
const clusterClient = new ClusterClient({
config,
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = httpServerMock.createKibanaRequest({
headers: { [headerKey]: 'foo' },
});
@ -454,7 +519,13 @@ describe('ClusterClient', () => {
});
authHeaders.get.mockReturnValue({});
const clusterClient = new ClusterClient({ config, logger, type: 'custom-type', authHeaders });
const clusterClient = new ClusterClient({
config,
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = httpServerMock.createKibanaRequest({
headers: { foo: 'request' },
kibanaRequestState: { requestId: 'from request', requestUuid: 'ignore-this-id' },
@ -479,7 +550,13 @@ describe('ClusterClient', () => {
});
authHeaders.get.mockReturnValue({});
const clusterClient = new ClusterClient({ config, logger, type: 'custom-type', authHeaders });
const clusterClient = new ClusterClient({
config,
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = {
headers: {
authorization: 'auth',
@ -505,7 +582,13 @@ describe('ClusterClient', () => {
authorization: 'auth',
});
const clusterClient = new ClusterClient({ config, logger, type: 'custom-type', authHeaders });
const clusterClient = new ClusterClient({
config,
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
const request = {
headers: {
foo: 'bar',
@ -531,6 +614,7 @@ describe('ClusterClient', () => {
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
await clusterClient.close();
@ -547,6 +631,7 @@ describe('ClusterClient', () => {
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
let internalClientClosed = false;
@ -590,6 +675,7 @@ describe('ClusterClient', () => {
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
internalClient.close.mockRejectedValue(new Error('error closing client'));
@ -605,6 +691,7 @@ describe('ClusterClient', () => {
logger,
type: 'custom-type',
authHeaders,
agentManager,
});
await clusterClient.close();

View file

@ -26,6 +26,7 @@ import { ScopedClusterClient } from './scoped_cluster_client';
import { DEFAULT_HEADERS } from './headers';
import { createInternalErrorHandler, InternalUnauthorizedErrorHandler } from './retry_unauthorized';
import { createTransport } from './create_transport';
import { AgentManager } from './agent_manager';
const noop = () => undefined;
@ -47,6 +48,7 @@ export class ClusterClient implements ICustomClusterClient {
authHeaders,
getExecutionContext = noop,
getUnauthorizedErrorHandler = noop,
agentManager,
}: {
config: ElasticsearchClientConfig;
logger: Logger;
@ -54,18 +56,25 @@ export class ClusterClient implements ICustomClusterClient {
authHeaders?: IAuthHeadersStorage;
getExecutionContext?: () => string | undefined;
getUnauthorizedErrorHandler?: () => UnauthorizedErrorHandler | undefined;
agentManager: AgentManager;
}) {
this.config = config;
this.authHeaders = authHeaders;
this.getExecutionContext = getExecutionContext;
this.getUnauthorizedErrorHandler = getUnauthorizedErrorHandler;
this.asInternalUser = configureClient(config, { logger, type, getExecutionContext });
this.asInternalUser = configureClient(config, {
logger,
type,
getExecutionContext,
agentManager,
});
this.rootScopedClient = configureClient(config, {
logger,
type,
getExecutionContext,
scoped: true,
agentManager,
});
}

View file

@ -10,17 +10,23 @@ jest.mock('./log_query_and_deprecation', () => ({
__esModule: true,
instrumentEsQueryAndDeprecationLogger: jest.fn(),
}));
jest.mock('./agent_manager');
import { Agent } from 'http';
import {
parseClientOptionsMock,
createTransportMock,
ClientMock,
} from './configure_client.test.mocks';
import { MockedLogger } from '@kbn/logging-mocks';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { ClusterConnectionPool } from '@elastic/elasticsearch';
import type { ElasticsearchClientConfig } from '@kbn/core-elasticsearch-server';
import { configureClient } from './configure_client';
import { instrumentEsQueryAndDeprecationLogger } from './log_query_and_deprecation';
import { AgentManager } from './agent_manager';
const AgentManagerMock = AgentManager as jest.Mock<AgentManager>;
const createFakeConfig = (
parts: Partial<ElasticsearchClientConfig> = {}
@ -39,15 +45,31 @@ const createFakeClient = () => {
return client;
};
const createFakeAgentFactory = (logger: MockedLogger) => {
const agentFactory = () => new Agent();
AgentManagerMock.mockImplementationOnce(() => {
const agentManager = new AgentManager();
agentManager.getAgentFactory = () => agentFactory;
return agentManager;
});
const agentManager = new AgentManager();
return { agentManager, agentFactory };
};
describe('configureClient', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
let logger: MockedLogger;
let config: ElasticsearchClientConfig;
let agentManager: AgentManager;
beforeEach(() => {
logger = loggingSystemMock.createLogger();
config = createFakeConfig();
parseClientOptionsMock.mockReturnValue({});
ClientMock.mockImplementation(() => createFakeClient());
agentManager = new AgentManager();
});
afterEach(() => {
@ -57,14 +79,14 @@ describe('configureClient', () => {
});
it('calls `parseClientOptions` with the correct parameters', () => {
configureClient(config, { logger, type: 'test', scoped: false });
configureClient(config, { logger, type: 'test', scoped: false, agentManager });
expect(parseClientOptionsMock).toHaveBeenCalledTimes(1);
expect(parseClientOptionsMock).toHaveBeenCalledWith(config, false);
parseClientOptionsMock.mockClear();
configureClient(config, { logger, type: 'test', scoped: true });
configureClient(config, { logger, type: 'test', scoped: true, agentManager });
expect(parseClientOptionsMock).toHaveBeenCalledTimes(1);
expect(parseClientOptionsMock).toHaveBeenCalledWith(config, true);
@ -76,23 +98,49 @@ describe('configureClient', () => {
};
parseClientOptionsMock.mockReturnValue(parsedOptions);
const client = configureClient(config, { logger, type: 'test', scoped: false });
const client = configureClient(config, { logger, type: 'test', scoped: false, agentManager });
expect(ClientMock).toHaveBeenCalledTimes(1);
expect(ClientMock).toHaveBeenCalledWith(expect.objectContaining(parsedOptions));
expect(client).toBe(ClientMock.mock.results[0].value);
});
it('constructs a client using the provided `agentManager`', () => {
const { agentManager: customAgentManager, agentFactory } = createFakeAgentFactory(logger);
const client = configureClient(config, {
logger,
type: 'test',
scoped: false,
agentManager: customAgentManager,
});
expect(ClientMock).toHaveBeenCalledTimes(1);
expect(ClientMock).toHaveBeenCalledWith(expect.objectContaining({ agent: agentFactory }));
expect(client).toBe(ClientMock.mock.results[0].value);
});
it('calls `createTransport` with the correct parameters', () => {
const getExecutionContext = jest.fn();
configureClient(config, { logger, type: 'test', scoped: false, getExecutionContext });
configureClient(config, {
logger,
type: 'test',
scoped: false,
getExecutionContext,
agentManager,
});
expect(createTransportMock).toHaveBeenCalledTimes(1);
expect(createTransportMock).toHaveBeenCalledWith({ getExecutionContext });
createTransportMock.mockClear();
configureClient(config, { logger, type: 'test', scoped: true, getExecutionContext });
configureClient(config, {
logger,
type: 'test',
scoped: true,
getExecutionContext,
agentManager,
});
expect(createTransportMock).toHaveBeenCalledTimes(1);
expect(createTransportMock).toHaveBeenCalledWith({ getExecutionContext });
@ -102,7 +150,7 @@ describe('configureClient', () => {
const mockedTransport = { mockTransport: true };
createTransportMock.mockReturnValue(mockedTransport);
const client = configureClient(config, { logger, type: 'test', scoped: false });
const client = configureClient(config, { logger, type: 'test', scoped: false, agentManager });
expect(ClientMock).toHaveBeenCalledTimes(1);
expect(ClientMock).toHaveBeenCalledWith(
@ -117,7 +165,7 @@ describe('configureClient', () => {
const mockedTransport = { mockTransport: true };
createTransportMock.mockReturnValue(mockedTransport);
const client = configureClient(config, { logger, type: 'test', scoped: false });
const client = configureClient(config, { logger, type: 'test', scoped: false, agentManager });
expect(ClientMock).toHaveBeenCalledTimes(1);
expect(ClientMock).toHaveBeenCalledWith(
@ -129,7 +177,7 @@ describe('configureClient', () => {
});
it('calls instrumentEsQueryAndDeprecationLogger', () => {
const client = configureClient(config, { logger, type: 'test', scoped: false });
const client = configureClient(config, { logger, type: 'test', scoped: false, agentManager });
expect(instrumentEsQueryAndDeprecationLogger).toHaveBeenCalledTimes(1);
expect(instrumentEsQueryAndDeprecationLogger).toHaveBeenCalledWith({

View file

@ -12,6 +12,7 @@ import type { ElasticsearchClientConfig } from '@kbn/core-elasticsearch-server';
import { parseClientOptions } from './client_config';
import { instrumentEsQueryAndDeprecationLogger } from './log_query_and_deprecation';
import { createTransport } from './create_transport';
import { AgentManager } from './agent_manager';
const noop = () => undefined;
@ -22,18 +23,20 @@ export const configureClient = (
type,
scoped = false,
getExecutionContext = noop,
agentManager,
}: {
logger: Logger;
type: string;
scoped?: boolean;
getExecutionContext?: () => string | undefined;
agentManager: AgentManager;
}
): Client => {
const clientOptions = parseClientOptions(config, scoped);
const KibanaTransport = createTransport({ getExecutionContext });
const client = new Client({
...clientOptions,
agent: agentManager.getAgentFactory(clientOptions.agent),
Transport: KibanaTransport,
Connection: HttpConnection,
// using ClusterConnectionPool until https://github.com/elastic/elasticsearch-js/issues/1714 is addressed

View file

@ -7,8 +7,10 @@
*/
export const MockClusterClient = jest.fn();
export const MockAgentManager = jest.fn();
jest.mock('@kbn/core-elasticsearch-client-server-internal', () => ({
ClusterClient: MockClusterClient,
AgentManager: MockAgentManager,
}));
export const isScriptingEnabledMock = jest.fn();

View file

@ -135,6 +135,20 @@ describe('#preboot', () => {
);
});
it('creates a ClusterClient using the internal AgentManager', async () => {
const prebootContract = await elasticsearchService.preboot();
const customConfig = { keepAlive: true };
const clusterClient = prebootContract.createClient('custom-type', customConfig);
expect(clusterClient).toBe(mockClusterClientInstance);
expect(MockClusterClient).toHaveBeenCalledTimes(1);
expect(MockClusterClient.mock.calls[0][0]).toEqual(
// eslint-disable-next-line dot-notation
expect.objectContaining({ agentManager: elasticsearchService['agentManager'] })
);
});
it('creates a new client on each call', async () => {
const prebootContract = await elasticsearchService.preboot();

View file

@ -22,7 +22,7 @@ import type {
UnauthorizedErrorHandler,
ElasticsearchClientConfig,
} from '@kbn/core-elasticsearch-server';
import { ClusterClient } from '@kbn/core-elasticsearch-client-server-internal';
import { ClusterClient, AgentManager } from '@kbn/core-elasticsearch-client-server-internal';
import { registerAnalyticsContextProvider } from './register_analytics_context_provider';
import { ElasticsearchConfig, ElasticsearchConfigType } from './elasticsearch_config';
@ -58,6 +58,7 @@ export class ElasticsearchService
private esNodesCompatibility$?: Observable<NodesVersionCompatibility>;
private client?: ClusterClient;
private unauthorizedErrorHandler?: UnauthorizedErrorHandler;
private agentManager: AgentManager;
constructor(private readonly coreContext: CoreContext) {
this.kibanaVersion = coreContext.env.packageInfo.version;
@ -65,6 +66,7 @@ export class ElasticsearchService
this.config$ = coreContext.configService
.atPath<ElasticsearchConfigType>('elasticsearch')
.pipe(map((rawConfig) => new ElasticsearchConfig(rawConfig)));
this.agentManager = new AgentManager();
}
public async preboot(): Promise<InternalElasticsearchServicePreboot> {
@ -172,6 +174,7 @@ export class ElasticsearchService
clientConfig: Partial<ElasticsearchClientConfig> = {}
) {
const config = mergeConfig(baseConfig, clientConfig);
return new ClusterClient({
config,
logger: this.coreContext.logger.get('elasticsearch'),
@ -179,6 +182,7 @@ export class ElasticsearchService
authHeaders: this.authHeaders,
getExecutionContext: () => this.executionContextClient?.getAsHeader(),
getUnauthorizedErrorHandler: () => this.unauthorizedErrorHandler,
agentManager: this.agentManager,
});
}
}

View file

@ -42,18 +42,15 @@ export type MockedElasticSearchServiceSetup = jest.Mocked<
export interface MockedElasticSearchServiceStart {
client: ClusterClientMock;
createClient: jest.MockedFunction<
(name: string, config?: Partial<ElasticsearchClientConfig>) => CustomClusterClientMock
(type: string, config?: Partial<ElasticsearchClientConfig>) => CustomClusterClientMock
>;
}
const createPrebootContractMock = () => {
const prebootContract: MockedElasticSearchServicePreboot = {
config: { hosts: [], credentialsSpecified: false },
createClient: jest.fn(),
createClient: jest.fn((type: string) => elasticsearchClientMock.createCustomClusterClient()),
};
prebootContract.createClient.mockImplementation(() =>
elasticsearchClientMock.createCustomClusterClient()
);
return prebootContract;
};
@ -70,12 +67,8 @@ const createSetupContractMock = () => {
const createStartContractMock = () => {
const startContract: MockedElasticSearchServiceStart = {
client: elasticsearchClientMock.createClusterClient(),
createClient: jest.fn(),
createClient: jest.fn((type: string) => elasticsearchClientMock.createCustomClusterClient()),
};
startContract.createClient.mockImplementation(() =>
elasticsearchClientMock.createCustomClusterClient()
);
return startContract;
};

View file

@ -13,7 +13,7 @@ import { merge } from 'lodash';
import { kibanaPackageJson } from '@kbn/utils';
import { Logger } from '@kbn/core/server';
import { ClusterClient } from '@kbn/core-elasticsearch-client-server-internal';
import { AgentManager, ClusterClient } from '@kbn/core-elasticsearch-client-server-internal';
import { configSchema } from '@kbn/core-elasticsearch-server-internal';
import { ElasticsearchService } from '@kbn/interactive-setup-plugin/server/elasticsearch_service';
import { KibanaConfigWriter } from '@kbn/interactive-setup-plugin/server/kibana_config_writer';
@ -47,6 +47,8 @@ export const elasticsearch = new ElasticsearchService(logger, kibanaPackageJson.
),
logger,
type,
// we use an independent AgentManager for cli_setup, no need to track performance of this one
agentManager: new AgentManager(),
});
},
},

View file

@ -126,7 +126,7 @@ export class SavedObjectsSyncService {
return taskInstance;
} catch (e) {
this.log.error(`Error running task: ${SAVED_OBJECTS_SYNC_TASK_ID}, `, e?.message() ?? e);
this.log.error(`Error running task: ${SAVED_OBJECTS_SYNC_TASK_ID}, `, e?.message ?? e);
return null;
}
}