mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
Collect metrics about the active/idle connections to ES nodes (#141434)
* Collect metrics about the connections from esClient to ES nodes * Misc enhancements following PR remarks and comments * Fix UTs * Fix mock typings * Minimize API surface, fix mocks typings * Fix incomplete mocks * Fix renameed agentManager => agentStore in remaining UT * Cover edge cases for getAgentsSocketsStats() * Misc NIT enhancements * Revert incorrect import type statements
This commit is contained in:
parent
0e7070ad71
commit
25b79a9cdb
38 changed files with 617 additions and 109 deletions
|
@ -61,6 +61,19 @@ const mockedResponse: StatusResponse = {
|
|||
'15m': 0.1,
|
||||
},
|
||||
},
|
||||
elasticsearch_client: {
|
||||
protocol: 'https',
|
||||
connectedNodes: 3,
|
||||
nodesWithActiveSockets: 3,
|
||||
nodesWithIdleSockets: 1,
|
||||
totalActiveSockets: 25,
|
||||
totalIdleSockets: 2,
|
||||
totalQueuedRequests: 0,
|
||||
mostActiveNodeSockets: 15,
|
||||
averageActiveSocketsPerNode: 8,
|
||||
mostIdleNodeSockets: 2,
|
||||
averageIdleSocketsPerNode: 0.5,
|
||||
},
|
||||
process: {
|
||||
pid: 1,
|
||||
memory: {
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
export { ScopedClusterClient } from './src/scoped_cluster_client';
|
||||
export { ClusterClient } from './src/cluster_client';
|
||||
export { configureClient } from './src/configure_client';
|
||||
export { AgentManager } from './src/agent_manager';
|
||||
export { type AgentStore, AgentManager } from './src/agent_manager';
|
||||
export { getRequestDebugMeta, getErrorMessage } from './src/log_query_and_deprecation';
|
||||
export {
|
||||
PRODUCT_RESPONSE_HEADER,
|
||||
|
|
|
@ -104,10 +104,10 @@ describe('AgentManager', () => {
|
|||
const agentFactory = agentManager.getAgentFactory();
|
||||
const agent = agentFactory({ url: new URL('http://elastic-node-1:9200') });
|
||||
// eslint-disable-next-line dot-notation
|
||||
expect(agentManager['httpStore'].has(agent)).toEqual(true);
|
||||
expect(agentManager['agents'].has(agent)).toEqual(true);
|
||||
agent.destroy();
|
||||
// eslint-disable-next-line dot-notation
|
||||
expect(agentManager['httpStore'].has(agent)).toEqual(false);
|
||||
expect(agentManager['agents'].has(agent)).toEqual(false);
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -122,4 +122,21 @@ describe('AgentManager', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('#getAgents()', () => {
|
||||
it('returns the created HTTP and HTTPs Agent instances', () => {
|
||||
const agentManager = new AgentManager();
|
||||
const agentFactory1 = agentManager.getAgentFactory();
|
||||
const agentFactory2 = agentManager.getAgentFactory();
|
||||
const agent1 = agentFactory1({ url: new URL('http://elastic-node-1:9200') });
|
||||
const agent2 = agentFactory2({ url: new URL('http://elastic-node-1:9200') });
|
||||
const agent3 = agentFactory1({ url: new URL('https://elastic-node-1:9200') });
|
||||
const agent4 = agentFactory2({ url: new URL('https://elastic-node-1:9200') });
|
||||
|
||||
const agents = agentManager.getAgents();
|
||||
|
||||
expect(agents.size).toEqual(4);
|
||||
expect([...agents]).toEqual(expect.arrayContaining([agent1, agent2, agent3, agent4]));
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
import { Agent as HttpAgent } from 'http';
|
||||
import { Agent as HttpsAgent } from 'https';
|
||||
import { ConnectionOptions, HttpAgentOptions } from '@elastic/elasticsearch';
|
||||
import type { ConnectionOptions, HttpAgentOptions } from '@elastic/elasticsearch';
|
||||
|
||||
const HTTPS = 'https:';
|
||||
const DEFAULT_CONFIG: HttpAgentOptions = {
|
||||
|
@ -22,6 +22,14 @@ const DEFAULT_CONFIG: HttpAgentOptions = {
|
|||
export type NetworkAgent = HttpAgent | HttpsAgent;
|
||||
export type AgentFactory = (connectionOpts: ConnectionOptions) => NetworkAgent;
|
||||
|
||||
export interface AgentFactoryProvider {
|
||||
getAgentFactory(agentOptions?: HttpAgentOptions): AgentFactory;
|
||||
}
|
||||
|
||||
export interface AgentStore {
|
||||
getAgents(): Set<NetworkAgent>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows obtaining Agent factories, which can then be fed into elasticsearch-js's Client class.
|
||||
* Ideally, we should obtain one Agent factory for each ES Client class.
|
||||
|
@ -33,15 +41,11 @@ export type AgentFactory = (connectionOpts: ConnectionOptions) => NetworkAgent;
|
|||
* exposes methods that can modify the underlying pools, effectively impacting the connections of other Clients.
|
||||
* @internal
|
||||
**/
|
||||
export class AgentManager {
|
||||
// Stores Https Agent instances
|
||||
private httpsStore: Set<HttpsAgent>;
|
||||
// Stores Http Agent instances
|
||||
private httpStore: Set<HttpAgent>;
|
||||
export class AgentManager implements AgentFactoryProvider, AgentStore {
|
||||
private agents: Set<HttpAgent>;
|
||||
|
||||
constructor(private agentOptions: HttpAgentOptions = DEFAULT_CONFIG) {
|
||||
this.httpsStore = new Set();
|
||||
this.httpStore = new Set();
|
||||
this.agents = new Set();
|
||||
}
|
||||
|
||||
public getAgentFactory(agentOptions?: HttpAgentOptions): AgentFactory {
|
||||
|
@ -61,8 +65,8 @@ export class AgentManager {
|
|||
connectionOpts.tls
|
||||
);
|
||||
httpsAgent = new HttpsAgent(config);
|
||||
this.httpsStore.add(httpsAgent);
|
||||
dereferenceOnDestroy(this.httpsStore, httpsAgent);
|
||||
this.agents.add(httpsAgent);
|
||||
dereferenceOnDestroy(this.agents, httpsAgent);
|
||||
}
|
||||
|
||||
return httpsAgent;
|
||||
|
@ -71,19 +75,23 @@ export class AgentManager {
|
|||
if (!httpAgent) {
|
||||
const config = Object.assign({}, DEFAULT_CONFIG, this.agentOptions, agentOptions);
|
||||
httpAgent = new HttpAgent(config);
|
||||
this.httpStore.add(httpAgent);
|
||||
dereferenceOnDestroy(this.httpStore, httpAgent);
|
||||
this.agents.add(httpAgent);
|
||||
dereferenceOnDestroy(this.agents, httpAgent);
|
||||
}
|
||||
|
||||
return httpAgent;
|
||||
};
|
||||
}
|
||||
|
||||
public getAgents(): Set<NetworkAgent> {
|
||||
return this.agents;
|
||||
}
|
||||
}
|
||||
|
||||
const dereferenceOnDestroy = (protocolStore: Set<NetworkAgent>, agent: NetworkAgent) => {
|
||||
const dereferenceOnDestroy = (store: Set<NetworkAgent>, agent: NetworkAgent) => {
|
||||
const doDestroy = agent.destroy.bind(agent);
|
||||
agent.destroy = () => {
|
||||
protocolStore.delete(agent);
|
||||
store.delete(agent);
|
||||
doDestroy();
|
||||
};
|
||||
};
|
||||
|
|
|
@ -46,7 +46,7 @@ describe('ClusterClient', () => {
|
|||
let authHeaders: ReturnType<typeof httpServiceMock.createAuthHeaderStorage>;
|
||||
let internalClient: jest.Mocked<Client>;
|
||||
let scopedClient: jest.Mocked<Client>;
|
||||
let agentManager: AgentManager;
|
||||
let agentFactoryProvider: AgentManager;
|
||||
|
||||
const mockTransport = { mockTransport: true };
|
||||
|
||||
|
@ -54,7 +54,7 @@ describe('ClusterClient', () => {
|
|||
logger = loggingSystemMock.createLogger();
|
||||
internalClient = createClient();
|
||||
scopedClient = createClient();
|
||||
agentManager = new AgentManager();
|
||||
agentFactoryProvider = new AgentManager();
|
||||
|
||||
authHeaders = httpServiceMock.createAuthHeaderStorage();
|
||||
authHeaders.get.mockImplementation(() => ({
|
||||
|
@ -84,21 +84,21 @@ describe('ClusterClient', () => {
|
|||
authHeaders,
|
||||
type: 'custom-type',
|
||||
getExecutionContext: getExecutionContextMock,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
expect(configureClientMock).toHaveBeenCalledTimes(2);
|
||||
expect(configureClientMock).toHaveBeenCalledWith(config, {
|
||||
logger,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
type: 'custom-type',
|
||||
getExecutionContext: getExecutionContextMock,
|
||||
});
|
||||
expect(configureClientMock).toHaveBeenCalledWith(config, {
|
||||
logger,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
type: 'custom-type',
|
||||
getExecutionContext: getExecutionContextMock,
|
||||
|
@ -113,7 +113,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
|
@ -128,7 +128,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest();
|
||||
|
@ -155,7 +155,7 @@ describe('ClusterClient', () => {
|
|||
authHeaders,
|
||||
getExecutionContext,
|
||||
getUnauthorizedErrorHandler,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest();
|
||||
|
@ -179,7 +179,7 @@ describe('ClusterClient', () => {
|
|||
authHeaders,
|
||||
getExecutionContext,
|
||||
getUnauthorizedErrorHandler,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest();
|
||||
|
@ -212,7 +212,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest();
|
||||
|
@ -237,7 +237,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest({
|
||||
|
@ -271,7 +271,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest({});
|
||||
|
@ -305,7 +305,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest({
|
||||
|
@ -344,7 +344,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest({});
|
||||
|
@ -373,7 +373,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest({
|
||||
|
@ -410,7 +410,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest({});
|
||||
|
@ -445,7 +445,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest({
|
||||
|
@ -482,7 +482,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest();
|
||||
|
@ -513,7 +513,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest({
|
||||
|
@ -547,7 +547,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = httpServerMock.createKibanaRequest({
|
||||
|
@ -579,7 +579,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = {
|
||||
|
@ -612,7 +612,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
const request = {
|
||||
|
@ -640,7 +640,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
|
@ -658,7 +658,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
|
@ -703,7 +703,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
|
@ -720,7 +720,7 @@ describe('ClusterClient', () => {
|
|||
logger,
|
||||
type: 'custom-type',
|
||||
authHeaders,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
|
|
|
@ -24,9 +24,12 @@ import type { ElasticsearchClientConfig } from '@kbn/core-elasticsearch-server';
|
|||
import { configureClient } from './configure_client';
|
||||
import { ScopedClusterClient } from './scoped_cluster_client';
|
||||
import { getDefaultHeaders } from './headers';
|
||||
import { createInternalErrorHandler, InternalUnauthorizedErrorHandler } from './retry_unauthorized';
|
||||
import {
|
||||
createInternalErrorHandler,
|
||||
type InternalUnauthorizedErrorHandler,
|
||||
} from './retry_unauthorized';
|
||||
import { createTransport } from './create_transport';
|
||||
import { AgentManager } from './agent_manager';
|
||||
import type { AgentFactoryProvider } from './agent_manager';
|
||||
|
||||
const noop = () => undefined;
|
||||
|
||||
|
@ -49,7 +52,7 @@ export class ClusterClient implements ICustomClusterClient {
|
|||
authHeaders,
|
||||
getExecutionContext = noop,
|
||||
getUnauthorizedErrorHandler = noop,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
}: {
|
||||
config: ElasticsearchClientConfig;
|
||||
|
@ -58,7 +61,7 @@ export class ClusterClient implements ICustomClusterClient {
|
|||
authHeaders?: IAuthHeadersStorage;
|
||||
getExecutionContext?: () => string | undefined;
|
||||
getUnauthorizedErrorHandler?: () => UnauthorizedErrorHandler | undefined;
|
||||
agentManager: AgentManager;
|
||||
agentFactoryProvider: AgentFactoryProvider;
|
||||
kibanaVersion: string;
|
||||
}) {
|
||||
this.config = config;
|
||||
|
@ -71,7 +74,7 @@ export class ClusterClient implements ICustomClusterClient {
|
|||
logger,
|
||||
type,
|
||||
getExecutionContext,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
this.rootScopedClient = configureClient(config, {
|
||||
|
@ -79,7 +82,7 @@ export class ClusterClient implements ICustomClusterClient {
|
|||
type,
|
||||
getExecutionContext,
|
||||
scoped: true,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ jest.mock('./log_query_and_deprecation', () => ({
|
|||
__esModule: true,
|
||||
instrumentEsQueryAndDeprecationLogger: jest.fn(),
|
||||
}));
|
||||
jest.mock('./agent_manager');
|
||||
|
||||
import { Agent } from 'http';
|
||||
import {
|
||||
|
@ -24,9 +23,8 @@ import { ClusterConnectionPool } from '@elastic/elasticsearch';
|
|||
import type { ElasticsearchClientConfig } from '@kbn/core-elasticsearch-server';
|
||||
import { configureClient } from './configure_client';
|
||||
import { instrumentEsQueryAndDeprecationLogger } from './log_query_and_deprecation';
|
||||
import { AgentManager } from './agent_manager';
|
||||
import { type AgentFactoryProvider, AgentManager } from './agent_manager';
|
||||
|
||||
const AgentManagerMock = AgentManager as jest.Mock<AgentManager>;
|
||||
const kibanaVersion = '1.0.0';
|
||||
|
||||
const createFakeConfig = (
|
||||
|
@ -46,31 +44,17 @@ const createFakeClient = () => {
|
|||
return client;
|
||||
};
|
||||
|
||||
const createFakeAgentFactory = (logger: MockedLogger) => {
|
||||
const agentFactory = () => new Agent();
|
||||
|
||||
AgentManagerMock.mockImplementationOnce(() => {
|
||||
const agentManager = new AgentManager();
|
||||
agentManager.getAgentFactory = () => agentFactory;
|
||||
return agentManager;
|
||||
});
|
||||
|
||||
const agentManager = new AgentManager();
|
||||
|
||||
return { agentManager, agentFactory };
|
||||
};
|
||||
|
||||
describe('configureClient', () => {
|
||||
let logger: MockedLogger;
|
||||
let config: ElasticsearchClientConfig;
|
||||
let agentManager: AgentManager;
|
||||
let agentFactoryProvider: AgentFactoryProvider;
|
||||
|
||||
beforeEach(() => {
|
||||
logger = loggingSystemMock.createLogger();
|
||||
config = createFakeConfig();
|
||||
parseClientOptionsMock.mockReturnValue({});
|
||||
ClientMock.mockImplementation(() => createFakeClient());
|
||||
agentManager = new AgentManager();
|
||||
agentFactoryProvider = new AgentManager();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
|
@ -80,14 +64,26 @@ describe('configureClient', () => {
|
|||
});
|
||||
|
||||
it('calls `parseClientOptions` with the correct parameters', () => {
|
||||
configureClient(config, { logger, type: 'test', scoped: false, agentManager, kibanaVersion });
|
||||
configureClient(config, {
|
||||
logger,
|
||||
type: 'test',
|
||||
scoped: false,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
expect(parseClientOptionsMock).toHaveBeenCalledTimes(1);
|
||||
expect(parseClientOptionsMock).toHaveBeenCalledWith(config, false, kibanaVersion);
|
||||
|
||||
parseClientOptionsMock.mockClear();
|
||||
|
||||
configureClient(config, { logger, type: 'test', scoped: true, agentManager, kibanaVersion });
|
||||
configureClient(config, {
|
||||
logger,
|
||||
type: 'test',
|
||||
scoped: true,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
expect(parseClientOptionsMock).toHaveBeenCalledTimes(1);
|
||||
expect(parseClientOptionsMock).toHaveBeenCalledWith(config, true, kibanaVersion);
|
||||
|
@ -103,7 +99,7 @@ describe('configureClient', () => {
|
|||
logger,
|
||||
type: 'test',
|
||||
scoped: false,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
|
@ -112,13 +108,17 @@ describe('configureClient', () => {
|
|||
expect(client).toBe(ClientMock.mock.results[0].value);
|
||||
});
|
||||
|
||||
it('constructs a client using the provided `agentManager`', () => {
|
||||
const { agentManager: customAgentManager, agentFactory } = createFakeAgentFactory(logger);
|
||||
it('constructs a client using the provided `agentFactoryProvider`', () => {
|
||||
const agentFactory = () => new Agent();
|
||||
const customAgentFactoryProvider = {
|
||||
getAgentFactory: () => agentFactory,
|
||||
};
|
||||
|
||||
const client = configureClient(config, {
|
||||
logger,
|
||||
type: 'test',
|
||||
scoped: false,
|
||||
agentManager: customAgentManager,
|
||||
agentFactoryProvider: customAgentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
|
@ -134,7 +134,7 @@ describe('configureClient', () => {
|
|||
type: 'test',
|
||||
scoped: false,
|
||||
getExecutionContext,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
|
@ -148,7 +148,7 @@ describe('configureClient', () => {
|
|||
type: 'test',
|
||||
scoped: true,
|
||||
getExecutionContext,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
|
@ -164,7 +164,7 @@ describe('configureClient', () => {
|
|||
logger,
|
||||
type: 'test',
|
||||
scoped: false,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
|
@ -185,7 +185,7 @@ describe('configureClient', () => {
|
|||
logger,
|
||||
type: 'test',
|
||||
scoped: false,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
|
@ -203,7 +203,7 @@ describe('configureClient', () => {
|
|||
logger,
|
||||
type: 'test',
|
||||
scoped: false,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
});
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ import type { ElasticsearchClientConfig } from '@kbn/core-elasticsearch-server';
|
|||
import { parseClientOptions } from './client_config';
|
||||
import { instrumentEsQueryAndDeprecationLogger } from './log_query_and_deprecation';
|
||||
import { createTransport } from './create_transport';
|
||||
import { AgentManager } from './agent_manager';
|
||||
import type { AgentFactoryProvider } from './agent_manager';
|
||||
|
||||
const noop = () => undefined;
|
||||
|
||||
|
@ -23,14 +23,14 @@ export const configureClient = (
|
|||
type,
|
||||
scoped = false,
|
||||
getExecutionContext = noop,
|
||||
agentManager,
|
||||
agentFactoryProvider,
|
||||
kibanaVersion,
|
||||
}: {
|
||||
logger: Logger;
|
||||
type: string;
|
||||
scoped?: boolean;
|
||||
getExecutionContext?: () => string | undefined;
|
||||
agentManager: AgentManager;
|
||||
agentFactoryProvider: AgentFactoryProvider;
|
||||
kibanaVersion: string;
|
||||
}
|
||||
): Client => {
|
||||
|
@ -38,7 +38,7 @@ export const configureClient = (
|
|||
const KibanaTransport = createTransport({ getExecutionContext });
|
||||
const client = new Client({
|
||||
...clientOptions,
|
||||
agent: agentManager.getAgentFactory(clientOptions.agent),
|
||||
agent: agentFactoryProvider.getAgentFactory(clientOptions.agent),
|
||||
Transport: KibanaTransport,
|
||||
Connection: HttpConnection,
|
||||
// using ClusterConnectionPool until https://github.com/elastic/elasticsearch-js/issues/1714 is addressed
|
||||
|
|
|
@ -15,3 +15,4 @@ export type {
|
|||
DeeplyMockedApi,
|
||||
ElasticsearchClientMock,
|
||||
} from './src/mocks';
|
||||
export { createAgentStoreMock } from './src/agent_manager.mocks';
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import type { AgentStore, NetworkAgent } from '@kbn/core-elasticsearch-client-server-internal';
|
||||
|
||||
export const createAgentStoreMock = (agents: Set<NetworkAgent> = new Set()): AgentStore => ({
|
||||
getAgents: jest.fn(() => agents),
|
||||
});
|
|
@ -6,8 +6,14 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import type { AgentManager } from '@kbn/core-elasticsearch-client-server-internal';
|
||||
|
||||
export const MockClusterClient = jest.fn();
|
||||
export const MockAgentManager = jest.fn();
|
||||
export const MockAgentManager: jest.MockedClass<typeof AgentManager> = jest.fn().mockReturnValue({
|
||||
getAgents: jest.fn(),
|
||||
getAgentFactory: jest.fn(),
|
||||
});
|
||||
|
||||
jest.mock('@kbn/core-elasticsearch-client-server-internal', () => ({
|
||||
ClusterClient: MockClusterClient,
|
||||
AgentManager: MockAgentManager,
|
||||
|
|
|
@ -135,7 +135,7 @@ describe('#preboot', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('creates a ClusterClient using the internal AgentManager', async () => {
|
||||
it('creates a ClusterClient using the internal AgentManager as AgentFactoryProvider ', async () => {
|
||||
const prebootContract = await elasticsearchService.preboot();
|
||||
const customConfig = { keepAlive: true };
|
||||
const clusterClient = prebootContract.createClient('custom-type', customConfig);
|
||||
|
@ -145,7 +145,7 @@ describe('#preboot', () => {
|
|||
expect(MockClusterClient).toHaveBeenCalledTimes(1);
|
||||
expect(MockClusterClient.mock.calls[0][0]).toEqual(
|
||||
// eslint-disable-next-line dot-notation
|
||||
expect.objectContaining({ agentManager: elasticsearchService['agentManager'] })
|
||||
expect.objectContaining({ agentFactoryProvider: elasticsearchService['agentManager'] })
|
||||
);
|
||||
});
|
||||
|
||||
|
@ -201,6 +201,11 @@ describe('#setup', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('returns an AgentStore as part of the contract', async () => {
|
||||
const setupContract = await elasticsearchService.setup(setupDeps);
|
||||
expect(typeof setupContract.agentStore.getAgents).toEqual('function');
|
||||
});
|
||||
|
||||
it('esNodeVersionCompatibility$ only starts polling when subscribed to', async () => {
|
||||
const mockedClient = mockClusterClientInstance.asInternalUser;
|
||||
mockedClient.nodes.info.mockImplementation(() =>
|
||||
|
|
|
@ -120,6 +120,7 @@ export class ElasticsearchService
|
|||
}
|
||||
this.unauthorizedErrorHandler = handler;
|
||||
},
|
||||
agentStore: this.agentManager,
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -182,7 +183,7 @@ export class ElasticsearchService
|
|||
authHeaders: this.authHeaders,
|
||||
getExecutionContext: () => this.executionContextClient?.getAsHeader(),
|
||||
getUnauthorizedErrorHandler: () => this.unauthorizedErrorHandler,
|
||||
agentManager: this.agentManager,
|
||||
agentFactoryProvider: this.agentManager,
|
||||
kibanaVersion: this.kibanaVersion,
|
||||
});
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import type {
|
|||
ElasticsearchServiceStart,
|
||||
ElasticsearchServiceSetup,
|
||||
} from '@kbn/core-elasticsearch-server';
|
||||
import type { AgentStore } from '@kbn/core-elasticsearch-client-server-internal';
|
||||
import type { ServiceStatus } from '@kbn/core-status-common';
|
||||
import type { NodesVersionCompatibility, NodeInfo } from './version_check/ensure_es_version';
|
||||
import type { ClusterInfo } from './get_cluster_info';
|
||||
|
@ -21,6 +22,7 @@ export type InternalElasticsearchServicePreboot = ElasticsearchServicePreboot;
|
|||
|
||||
/** @internal */
|
||||
export interface InternalElasticsearchServiceSetup extends ElasticsearchServiceSetup {
|
||||
agentStore: AgentStore;
|
||||
clusterInfo$: Observable<ClusterInfo>;
|
||||
esNodesCompatibility$: Observable<NodesVersionCompatibility>;
|
||||
status$: Observable<ServiceStatus<ElasticsearchStatusMeta>>;
|
||||
|
|
|
@ -13,6 +13,7 @@ import {
|
|||
elasticsearchClientMock,
|
||||
type ClusterClientMock,
|
||||
type CustomClusterClientMock,
|
||||
createAgentStoreMock,
|
||||
} from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
import type {
|
||||
ElasticsearchClientConfig,
|
||||
|
@ -94,6 +95,7 @@ const createInternalSetupContractMock = () => {
|
|||
level: ServiceStatusLevels.available,
|
||||
summary: 'Elasticsearch is available',
|
||||
}),
|
||||
agentStore: createAgentStoreMock(),
|
||||
};
|
||||
return internalSetupContract;
|
||||
};
|
||||
|
|
|
@ -7,8 +7,8 @@
|
|||
*/
|
||||
|
||||
import type { ElasticsearchClient } from './client';
|
||||
import { ScopeableRequest } from './scopeable_request';
|
||||
import { IScopedClusterClient } from './scoped_cluster_client';
|
||||
import type { ScopeableRequest } from './scopeable_request';
|
||||
import type { IScopedClusterClient } from './scoped_cluster_client';
|
||||
|
||||
/**
|
||||
* Represents an Elasticsearch cluster API client created by the platform.
|
||||
|
|
|
@ -39,6 +39,8 @@ RUNTIME_DEPS = [
|
|||
"//packages/kbn-logging",
|
||||
"@npm//moment",
|
||||
"@npm//getos",
|
||||
### test dependencies
|
||||
"//packages/core/elasticsearch/core-elasticsearch-client-server-mocks",
|
||||
]
|
||||
|
||||
TYPES_DEPS = [
|
||||
|
@ -50,6 +52,7 @@ TYPES_DEPS = [
|
|||
"@npm//@types/hapi__hapi",
|
||||
"//packages/kbn-logging:npm_module_types",
|
||||
"//packages/core/metrics/core-metrics-server:npm_module_types",
|
||||
"//packages/core/elasticsearch/core-elasticsearch-client-server-internal:npm_module_types",
|
||||
]
|
||||
|
||||
jsts_transpiler(
|
||||
|
|
|
@ -11,3 +11,4 @@ export type { OpsMetricsCollectorOptions } from './src/os';
|
|||
export { ProcessMetricsCollector } from './src/process';
|
||||
export { ServerMetricsCollector } from './src/server';
|
||||
export { EventLoopDelaysMonitor } from './src/event_loop_delays_monitor';
|
||||
export { ElasticsearchClientsMetricsCollector } from './src/elasticsearch_client';
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { Agent as HttpAgent } from 'http';
|
||||
import { Agent as HttpsAgent } from 'https';
|
||||
import { sampleEsClientMetrics } from '@kbn/core-metrics-server-mocks';
|
||||
import { createAgentStoreMock } from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
import { getAgentsSocketsStatsMock } from './get_agents_sockets_stats.test.mocks';
|
||||
import { ElasticsearchClientsMetricsCollector } from './elasticsearch_client';
|
||||
import { getAgentsSocketsStats } from './get_agents_sockets_stats';
|
||||
|
||||
jest.mock('@kbn/core-elasticsearch-client-server-internal');
|
||||
|
||||
describe('ElasticsearchClientsMetricsCollector', () => {
|
||||
test('#collect calls getAgentsSocketsStats with the Agents managed by the provided AgentManager', async () => {
|
||||
const agents = new Set<HttpAgent>([new HttpAgent(), new HttpsAgent()]);
|
||||
const agentStore = createAgentStoreMock(agents);
|
||||
getAgentsSocketsStatsMock.mockReturnValueOnce(sampleEsClientMetrics);
|
||||
|
||||
const esClientsMetricsCollector = new ElasticsearchClientsMetricsCollector(agentStore);
|
||||
const metrics = await esClientsMetricsCollector.collect();
|
||||
|
||||
expect(agentStore.getAgents).toHaveBeenCalledTimes(1);
|
||||
expect(getAgentsSocketsStats).toHaveBeenCalledTimes(1);
|
||||
expect(getAgentsSocketsStats).toHaveBeenNthCalledWith(1, agents);
|
||||
expect(metrics).toEqual(sampleEsClientMetrics);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import type { ElasticsearchClientsMetrics, MetricsCollector } from '@kbn/core-metrics-server';
|
||||
import type { AgentStore } from '@kbn/core-elasticsearch-client-server-internal';
|
||||
import { getAgentsSocketsStats } from './get_agents_sockets_stats';
|
||||
|
||||
export class ElasticsearchClientsMetricsCollector
|
||||
implements MetricsCollector<ElasticsearchClientsMetrics>
|
||||
{
|
||||
constructor(private readonly agentStore: AgentStore) {}
|
||||
|
||||
public async collect(): Promise<ElasticsearchClientsMetrics> {
|
||||
return await getAgentsSocketsStats(this.agentStore.getAgents());
|
||||
}
|
||||
|
||||
public reset() {
|
||||
// we do not have a state in this Collector, aka metrics are not accumulated over time.
|
||||
// Thus, we don't need to perform any cleanup to reset the collected metrics
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { Agent as HttpAgent } from 'http';
|
||||
import { Agent as HttpsAgent } from 'https';
|
||||
|
||||
import { getAgentsSocketsStats } from './get_agents_sockets_stats';
|
||||
|
||||
export const getHttpAgentMock = (overrides: Partial<HttpAgent>) => {
|
||||
return Object.assign(new HttpAgent(), overrides);
|
||||
};
|
||||
|
||||
export const getHttpsAgentMock = (overrides: Partial<HttpsAgent>) => {
|
||||
return Object.assign(new HttpsAgent(), overrides);
|
||||
};
|
||||
|
||||
export const getAgentsSocketsStatsMock: jest.MockedFunction<typeof getAgentsSocketsStats> =
|
||||
jest.fn();
|
||||
|
||||
jest.doMock('./get_agents_sockets_stats', () => {
|
||||
return {
|
||||
getAgentsSocketsStats: getAgentsSocketsStatsMock,
|
||||
};
|
||||
});
|
|
@ -0,0 +1,147 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { Socket } from 'net';
|
||||
import { Agent, IncomingMessage } from 'http';
|
||||
import { getAgentsSocketsStats } from './get_agents_sockets_stats';
|
||||
import { getHttpAgentMock, getHttpsAgentMock } from './get_agents_sockets_stats.test.mocks';
|
||||
|
||||
jest.mock('net');
|
||||
|
||||
const mockSocket = new Socket();
|
||||
const mockIncomingMessage = new IncomingMessage(mockSocket);
|
||||
|
||||
describe('getAgentsSocketsStats()', () => {
|
||||
it('extracts aggregated stats from the specified agents', () => {
|
||||
const agent1 = getHttpAgentMock({
|
||||
sockets: {
|
||||
node1: [mockSocket, mockSocket, mockSocket],
|
||||
node2: [mockSocket],
|
||||
},
|
||||
freeSockets: {
|
||||
node1: [mockSocket],
|
||||
node3: [mockSocket, mockSocket, mockSocket, mockSocket],
|
||||
},
|
||||
requests: {
|
||||
node1: [mockIncomingMessage, mockIncomingMessage],
|
||||
},
|
||||
});
|
||||
|
||||
const agent2 = getHttpAgentMock({
|
||||
sockets: {
|
||||
node1: [mockSocket, mockSocket, mockSocket],
|
||||
node4: [mockSocket],
|
||||
},
|
||||
freeSockets: {
|
||||
node3: [mockSocket, mockSocket, mockSocket, mockSocket],
|
||||
},
|
||||
requests: {
|
||||
node4: [mockIncomingMessage, mockIncomingMessage, mockIncomingMessage, mockIncomingMessage],
|
||||
},
|
||||
});
|
||||
|
||||
const stats = getAgentsSocketsStats(new Set<Agent>([agent1, agent2]));
|
||||
expect(stats).toEqual({
|
||||
averageActiveSocketsPerNode: 2.6666666666666665,
|
||||
averageIdleSocketsPerNode: 4.5,
|
||||
connectedNodes: 4,
|
||||
mostActiveNodeSockets: 6,
|
||||
mostIdleNodeSockets: 8,
|
||||
nodesWithActiveSockets: 3,
|
||||
nodesWithIdleSockets: 2,
|
||||
protocol: 'http',
|
||||
totalActiveSockets: 8,
|
||||
totalIdleSockets: 9,
|
||||
totalQueuedRequests: 6,
|
||||
});
|
||||
});
|
||||
|
||||
it('takes into account Agent types to determine the `protocol`', () => {
|
||||
const httpAgent = getHttpAgentMock({
|
||||
sockets: { node1: [mockSocket] },
|
||||
freeSockets: {},
|
||||
requests: {},
|
||||
});
|
||||
|
||||
const httpsAgent = getHttpsAgentMock({
|
||||
sockets: { node1: [mockSocket] },
|
||||
freeSockets: {},
|
||||
requests: {},
|
||||
});
|
||||
|
||||
const noAgents = new Set<Agent>();
|
||||
const httpAgents = new Set<Agent>([httpAgent, httpAgent]);
|
||||
const httpsAgents = new Set<Agent>([httpsAgent, httpsAgent]);
|
||||
const mixedAgents = new Set<Agent>([httpAgent, httpsAgent]);
|
||||
|
||||
expect(getAgentsSocketsStats(noAgents).protocol).toEqual('none');
|
||||
expect(getAgentsSocketsStats(httpAgents).protocol).toEqual('http');
|
||||
expect(getAgentsSocketsStats(httpsAgents).protocol).toEqual('https');
|
||||
expect(getAgentsSocketsStats(mixedAgents).protocol).toEqual('mixed');
|
||||
});
|
||||
|
||||
it('does not take into account those Agents that have not had any connection to any node', () => {
|
||||
const pristineAgentProps = {
|
||||
sockets: {},
|
||||
freeSockets: {},
|
||||
requests: {},
|
||||
};
|
||||
const agent1 = getHttpAgentMock(pristineAgentProps);
|
||||
const agent2 = getHttpAgentMock(pristineAgentProps);
|
||||
const agent3 = getHttpAgentMock(pristineAgentProps);
|
||||
|
||||
const stats = getAgentsSocketsStats(new Set<Agent>([agent1, agent2, agent3]));
|
||||
|
||||
expect(stats).toEqual({
|
||||
averageActiveSocketsPerNode: 0,
|
||||
averageIdleSocketsPerNode: 0,
|
||||
connectedNodes: 0,
|
||||
mostActiveNodeSockets: 0,
|
||||
mostIdleNodeSockets: 0,
|
||||
nodesWithActiveSockets: 0,
|
||||
nodesWithIdleSockets: 0,
|
||||
protocol: 'none',
|
||||
totalActiveSockets: 0,
|
||||
totalIdleSockets: 0,
|
||||
totalQueuedRequests: 0,
|
||||
});
|
||||
});
|
||||
|
||||
it('takes into account those Agents that have hold mappings to one or more nodes, but that do not currently have any pending requests, active connections or idle connections', () => {
|
||||
const emptyAgentProps = {
|
||||
sockets: {
|
||||
node1: [],
|
||||
},
|
||||
freeSockets: {
|
||||
node2: [],
|
||||
},
|
||||
requests: {
|
||||
node3: [],
|
||||
},
|
||||
};
|
||||
|
||||
const agent1 = getHttpAgentMock(emptyAgentProps);
|
||||
const agent2 = getHttpAgentMock(emptyAgentProps);
|
||||
|
||||
const stats = getAgentsSocketsStats(new Set<Agent>([agent1, agent2]));
|
||||
|
||||
expect(stats).toEqual({
|
||||
averageActiveSocketsPerNode: 0,
|
||||
averageIdleSocketsPerNode: 0,
|
||||
connectedNodes: 3,
|
||||
mostActiveNodeSockets: 0,
|
||||
mostIdleNodeSockets: 0,
|
||||
nodesWithActiveSockets: 0,
|
||||
nodesWithIdleSockets: 0,
|
||||
protocol: 'http',
|
||||
totalActiveSockets: 0,
|
||||
totalIdleSockets: 0,
|
||||
totalQueuedRequests: 0,
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { NetworkAgent } from '@kbn/core-elasticsearch-client-server-internal';
|
||||
import { Agent as HttpsAgent } from 'https';
|
||||
import { mean } from 'lodash';
|
||||
import type {
|
||||
ElasticsearchClientProtocol,
|
||||
ElasticsearchClientsMetrics,
|
||||
} from '@kbn/core-metrics-server';
|
||||
|
||||
export const getAgentsSocketsStats = (agents: Set<NetworkAgent>): ElasticsearchClientsMetrics => {
|
||||
const nodes = new Set<string>();
|
||||
let totalActiveSockets = 0;
|
||||
let totalIdleSockets = 0;
|
||||
let totalQueuedRequests = 0;
|
||||
let http: boolean = false;
|
||||
let https: boolean = false;
|
||||
|
||||
const nodesWithActiveSockets: Record<string, number> = {};
|
||||
const nodesWithIdleSockets: Record<string, number> = {};
|
||||
|
||||
agents.forEach((agent) => {
|
||||
const agentRequests = Object.entries(agent.requests) ?? [];
|
||||
const agentSockets = Object.entries(agent.sockets) ?? [];
|
||||
const agentFreeSockets = Object.entries(agent.freeSockets) ?? [];
|
||||
|
||||
if (agentRequests.length || agentSockets.length || agentFreeSockets.length) {
|
||||
if (agent instanceof HttpsAgent) https = true;
|
||||
else http = true;
|
||||
|
||||
agentRequests.forEach(([node, queue]) => {
|
||||
nodes.add(node);
|
||||
totalQueuedRequests += queue?.length ?? 0;
|
||||
});
|
||||
|
||||
agentSockets.forEach(([node, sockets]) => {
|
||||
nodes.add(node);
|
||||
const activeSockets = sockets?.length ?? 0;
|
||||
totalActiveSockets += activeSockets;
|
||||
nodesWithActiveSockets[node] = (nodesWithActiveSockets[node] ?? 0) + activeSockets;
|
||||
});
|
||||
|
||||
agentFreeSockets.forEach(([node, freeSockets]) => {
|
||||
nodes.add(node);
|
||||
const idleSockets = freeSockets?.length ?? 0;
|
||||
totalIdleSockets += idleSockets;
|
||||
nodesWithIdleSockets[node] = (nodesWithIdleSockets[node] ?? 0) + idleSockets;
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
const activeSocketCounters = Object.values(nodesWithActiveSockets);
|
||||
const idleSocketCounters = Object.values(nodesWithIdleSockets);
|
||||
const protocol: ElasticsearchClientProtocol = http
|
||||
? https
|
||||
? 'mixed'
|
||||
: 'http'
|
||||
: https
|
||||
? 'https'
|
||||
: 'none';
|
||||
|
||||
return {
|
||||
protocol,
|
||||
connectedNodes: nodes.size,
|
||||
nodesWithActiveSockets: activeSocketCounters.filter(Boolean).length,
|
||||
nodesWithIdleSockets: idleSocketCounters.filter(Boolean).length,
|
||||
totalActiveSockets,
|
||||
totalIdleSockets,
|
||||
totalQueuedRequests,
|
||||
mostActiveNodeSockets: activeSocketCounters.length ? Math.max(...activeSocketCounters) : 0,
|
||||
averageActiveSocketsPerNode: activeSocketCounters.length ? mean(activeSocketCounters) : 0,
|
||||
mostIdleNodeSockets: idleSocketCounters.length ? Math.max(...idleSocketCounters) : 0,
|
||||
averageIdleSocketsPerNode: idleSocketCounters.length ? mean(idleSocketCounters) : 0,
|
||||
};
|
||||
};
|
|
@ -37,11 +37,15 @@ NPM_MODULE_EXTRA_FILES = [
|
|||
RUNTIME_DEPS = [
|
||||
"@npm//rxjs",
|
||||
"@npm//moment",
|
||||
"//packages/kbn-logging-mocks",
|
||||
"//packages/kbn-config-schema",
|
||||
"//packages/core/http/core-http-server-mocks",
|
||||
"//packages/core/metrics/core-metrics-collectors-server-internal",
|
||||
"//packages/core/elasticsearch/core-elasticsearch-server-internal",
|
||||
### test dependencies
|
||||
"//packages/kbn-logging-mocks",
|
||||
"//packages/core/http/core-http-server-mocks",
|
||||
"//packages/core/metrics/core-metrics-server-mocks",
|
||||
"//packages/core/metrics/core-metrics-collectors-server-mocks",
|
||||
"//packages/core/elasticsearch/core-elasticsearch-server-mocks",
|
||||
|
||||
]
|
||||
|
||||
|
@ -57,6 +61,7 @@ TYPES_DEPS = [
|
|||
"//packages/core/http/core-http-server-internal:npm_module_types",
|
||||
"//packages/core/metrics/core-metrics-server:npm_module_types",
|
||||
"//packages/core/metrics/core-metrics-collectors-server-internal:npm_module_types",
|
||||
"//packages/core/elasticsearch/core-elasticsearch-server-internal:npm_module_types",
|
||||
|
||||
]
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
import type { OpsMetrics } from '@kbn/core-metrics-server';
|
||||
import { getEcsOpsMetricsLog } from './get_ops_metrics_log';
|
||||
import { sampleEsClientMetrics } from '@kbn/core-metrics-server-mocks';
|
||||
import { collectorMock } from '@kbn/core-metrics-collectors-server-mocks';
|
||||
|
||||
function createBaseOpsMetrics(): OpsMetrics {
|
||||
|
@ -24,6 +25,7 @@ function createBaseOpsMetrics(): OpsMetrics {
|
|||
memory: { total_in_bytes: 1, free_in_bytes: 1, used_in_bytes: 1 },
|
||||
uptime_in_millis: 1,
|
||||
},
|
||||
elasticsearch_client: sampleEsClientMetrics,
|
||||
response_times: { avg_in_millis: 1, max_in_millis: 1 },
|
||||
requests: { disconnects: 1, total: 1, statusCodes: { '200': 1 } },
|
||||
concurrent_connections: 1,
|
||||
|
|
|
@ -8,13 +8,15 @@
|
|||
|
||||
import moment from 'moment';
|
||||
|
||||
import { take } from 'rxjs/operators';
|
||||
import { configServiceMock } from '@kbn/config-mocks';
|
||||
import { mockCoreContext } from '@kbn/core-base-server-mocks';
|
||||
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
|
||||
import { httpServiceMock } from '@kbn/core-http-server-mocks';
|
||||
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
|
||||
import { mockOpsCollector } from './metrics_service.test.mocks';
|
||||
import { MetricsService } from './metrics_service';
|
||||
import { take } from 'rxjs/operators';
|
||||
import { OpsMetricsCollector } from './ops_metrics_collector';
|
||||
|
||||
const testInterval = 100;
|
||||
|
||||
|
@ -24,6 +26,7 @@ const logger = loggingSystemMock.create();
|
|||
|
||||
describe('MetricsService', () => {
|
||||
const httpMock = httpServiceMock.createInternalSetupContract();
|
||||
const esServiceMock = elasticsearchServiceMock.createInternalSetup();
|
||||
let metricsService: MetricsService;
|
||||
|
||||
beforeEach(() => {
|
||||
|
@ -43,9 +46,16 @@ describe('MetricsService', () => {
|
|||
|
||||
describe('#start', () => {
|
||||
it('invokes setInterval with the configured interval', async () => {
|
||||
await metricsService.setup({ http: httpMock });
|
||||
await metricsService.setup({ http: httpMock, elasticsearchService: esServiceMock });
|
||||
await metricsService.start();
|
||||
|
||||
expect(OpsMetricsCollector).toHaveBeenCalledTimes(1);
|
||||
expect(OpsMetricsCollector).toHaveBeenCalledWith(
|
||||
httpMock.server,
|
||||
esServiceMock.agentStore,
|
||||
expect.objectContaining({ logger: logger.get('metrics') })
|
||||
);
|
||||
|
||||
expect(setInterval).toHaveBeenCalledTimes(1);
|
||||
expect(setInterval).toHaveBeenCalledWith(expect.any(Function), testInterval);
|
||||
});
|
||||
|
@ -53,7 +63,7 @@ describe('MetricsService', () => {
|
|||
it('collects the metrics at every interval', async () => {
|
||||
mockOpsCollector.collect.mockResolvedValue(dummyMetrics);
|
||||
|
||||
await metricsService.setup({ http: httpMock });
|
||||
await metricsService.setup({ http: httpMock, elasticsearchService: esServiceMock });
|
||||
await metricsService.start();
|
||||
|
||||
expect(mockOpsCollector.collect).toHaveBeenCalledTimes(1);
|
||||
|
@ -68,7 +78,7 @@ describe('MetricsService', () => {
|
|||
it('resets the collector after each collection', async () => {
|
||||
mockOpsCollector.collect.mockResolvedValue(dummyMetrics);
|
||||
|
||||
await metricsService.setup({ http: httpMock });
|
||||
await metricsService.setup({ http: httpMock, elasticsearchService: esServiceMock });
|
||||
const { getOpsMetrics$ } = await metricsService.start();
|
||||
|
||||
// `advanceTimersByTime` only ensure the interval handler is executed
|
||||
|
@ -108,7 +118,7 @@ describe('MetricsService', () => {
|
|||
.mockResolvedValueOnce(firstMetrics)
|
||||
.mockResolvedValueOnce(secondMetrics);
|
||||
|
||||
await metricsService.setup({ http: httpMock });
|
||||
await metricsService.setup({ http: httpMock, elasticsearchService: esServiceMock });
|
||||
const { getOpsMetrics$ } = await metricsService.start();
|
||||
|
||||
const nextEmission = async () => {
|
||||
|
@ -157,7 +167,7 @@ describe('MetricsService', () => {
|
|||
mockOpsCollector.collect
|
||||
.mockResolvedValueOnce(firstMetrics)
|
||||
.mockResolvedValueOnce(secondMetrics);
|
||||
await metricsService.setup({ http: httpMock });
|
||||
await metricsService.setup({ http: httpMock, elasticsearchService: esServiceMock });
|
||||
const { getOpsMetrics$ } = await metricsService.start();
|
||||
|
||||
const nextEmission = async () => {
|
||||
|
@ -176,7 +186,7 @@ describe('MetricsService', () => {
|
|||
it('omits metrics from log message if they are missing or malformed', async () => {
|
||||
const opsLogger = logger.get('metrics', 'ops');
|
||||
mockOpsCollector.collect.mockResolvedValueOnce({ secondMetrics: 'metrics' });
|
||||
await metricsService.setup({ http: httpMock });
|
||||
await metricsService.setup({ http: httpMock, elasticsearchService: esServiceMock });
|
||||
await metricsService.start();
|
||||
expect(loggingSystemMock.collect(opsLogger).debug[0]).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
|
@ -219,7 +229,7 @@ describe('MetricsService', () => {
|
|||
|
||||
describe('#stop', () => {
|
||||
it('stops the metrics interval', async () => {
|
||||
await metricsService.setup({ http: httpMock });
|
||||
await metricsService.setup({ http: httpMock, elasticsearchService: esServiceMock });
|
||||
const { getOpsMetrics$ } = await metricsService.start();
|
||||
|
||||
expect(mockOpsCollector.collect).toHaveBeenCalledTimes(1);
|
||||
|
@ -235,7 +245,7 @@ describe('MetricsService', () => {
|
|||
});
|
||||
|
||||
it('completes the metrics observable', async () => {
|
||||
await metricsService.setup({ http: httpMock });
|
||||
await metricsService.setup({ http: httpMock, elasticsearchService: esServiceMock });
|
||||
const { getOpsMetrics$ } = await metricsService.start();
|
||||
|
||||
let completed = false;
|
||||
|
|
|
@ -10,6 +10,7 @@ import { firstValueFrom, ReplaySubject } from 'rxjs';
|
|||
import type { CoreContext, CoreService } from '@kbn/core-base-server-internal';
|
||||
import type { Logger } from '@kbn/logging';
|
||||
import type { InternalHttpServiceSetup } from '@kbn/core-http-server-internal';
|
||||
import type { InternalElasticsearchServiceSetup } from '@kbn/core-elasticsearch-server-internal';
|
||||
import type {
|
||||
OpsMetrics,
|
||||
MetricsServiceSetup,
|
||||
|
@ -21,6 +22,7 @@ import { getEcsOpsMetricsLog } from './logging';
|
|||
|
||||
export interface MetricsServiceSetupDeps {
|
||||
http: InternalHttpServiceSetup;
|
||||
elasticsearchService: InternalElasticsearchServiceSetup;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
|
@ -45,12 +47,15 @@ export class MetricsService
|
|||
this.opsMetricsLogger = coreContext.logger.get('metrics', 'ops');
|
||||
}
|
||||
|
||||
public async setup({ http }: MetricsServiceSetupDeps): Promise<InternalMetricsServiceSetup> {
|
||||
public async setup({
|
||||
http,
|
||||
elasticsearchService,
|
||||
}: MetricsServiceSetupDeps): Promise<InternalMetricsServiceSetup> {
|
||||
const config = await firstValueFrom(
|
||||
this.coreContext.configService.atPath<OpsConfigType>(OPS_CONFIG_PATH)
|
||||
);
|
||||
|
||||
this.metricsCollector = new OpsMetricsCollector(http.server, {
|
||||
this.metricsCollector = new OpsMetricsCollector(http.server, elasticsearchService.agentStore, {
|
||||
logger: this.logger,
|
||||
...config.cGroupOverrides,
|
||||
});
|
||||
|
|
|
@ -11,11 +11,13 @@ import { collectorMock } from '@kbn/core-metrics-collectors-server-mocks';
|
|||
export const mockOsCollector = collectorMock.create();
|
||||
export const mockProcessCollector = collectorMock.create();
|
||||
export const mockServerCollector = collectorMock.create();
|
||||
export const mockEsClientCollector = collectorMock.create();
|
||||
|
||||
jest.doMock('@kbn/core-metrics-collectors-server-internal', () => {
|
||||
return {
|
||||
OsMetricsCollector: jest.fn().mockImplementation(() => mockOsCollector),
|
||||
ProcessMetricsCollector: jest.fn().mockImplementation(() => mockProcessCollector),
|
||||
ServerMetricsCollector: jest.fn().mockImplementation(() => mockServerCollector),
|
||||
ElasticsearchClientsMetricsCollector: jest.fn().mockImplementation(() => mockEsClientCollector),
|
||||
};
|
||||
});
|
||||
|
|
|
@ -8,7 +8,10 @@
|
|||
|
||||
import { loggerMock } from '@kbn/logging-mocks';
|
||||
import { httpServiceMock } from '@kbn/core-http-server-mocks';
|
||||
import { sampleEsClientMetrics } from '@kbn/core-metrics-server-mocks';
|
||||
import { AgentManager } from '@kbn/core-elasticsearch-client-server-internal';
|
||||
import {
|
||||
mockEsClientCollector,
|
||||
mockOsCollector,
|
||||
mockProcessCollector,
|
||||
mockServerCollector,
|
||||
|
@ -20,7 +23,8 @@ describe('OpsMetricsCollector', () => {
|
|||
|
||||
beforeEach(() => {
|
||||
const hapiServer = httpServiceMock.createInternalSetupContract().server;
|
||||
collector = new OpsMetricsCollector(hapiServer, { logger: loggerMock.create() });
|
||||
const agentManager = new AgentManager();
|
||||
collector = new OpsMetricsCollector(hapiServer, agentManager, { logger: loggerMock.create() });
|
||||
|
||||
mockOsCollector.collect.mockResolvedValue('osMetrics');
|
||||
});
|
||||
|
@ -33,12 +37,14 @@ describe('OpsMetricsCollector', () => {
|
|||
requests: 'serverRequestsMetrics',
|
||||
response_times: 'serverTimingMetrics',
|
||||
});
|
||||
mockEsClientCollector.collect.mockResolvedValue(sampleEsClientMetrics);
|
||||
|
||||
const metrics = await collector.collect();
|
||||
|
||||
expect(mockOsCollector.collect).toHaveBeenCalledTimes(1);
|
||||
expect(mockProcessCollector.collect).toHaveBeenCalledTimes(1);
|
||||
expect(mockServerCollector.collect).toHaveBeenCalledTimes(1);
|
||||
expect(mockEsClientCollector.collect).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(metrics).toEqual({
|
||||
collected_at: expect.any(Date),
|
||||
|
@ -47,6 +53,7 @@ describe('OpsMetricsCollector', () => {
|
|||
os: 'osMetrics',
|
||||
requests: 'serverRequestsMetrics',
|
||||
response_times: 'serverTimingMetrics',
|
||||
elasticsearch_client: sampleEsClientMetrics,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -8,28 +8,33 @@
|
|||
|
||||
import { Server as HapiServer } from '@hapi/hapi';
|
||||
import type { OpsMetrics, MetricsCollector } from '@kbn/core-metrics-server';
|
||||
import type { AgentStore } from '@kbn/core-elasticsearch-client-server-internal';
|
||||
import {
|
||||
ProcessMetricsCollector,
|
||||
OsMetricsCollector,
|
||||
type OpsMetricsCollectorOptions,
|
||||
ServerMetricsCollector,
|
||||
ElasticsearchClientsMetricsCollector,
|
||||
} from '@kbn/core-metrics-collectors-server-internal';
|
||||
|
||||
export class OpsMetricsCollector implements MetricsCollector<OpsMetrics> {
|
||||
private readonly processCollector: ProcessMetricsCollector;
|
||||
private readonly osCollector: OsMetricsCollector;
|
||||
private readonly serverCollector: ServerMetricsCollector;
|
||||
private readonly esClientCollector: ElasticsearchClientsMetricsCollector;
|
||||
|
||||
constructor(server: HapiServer, opsOptions: OpsMetricsCollectorOptions) {
|
||||
constructor(server: HapiServer, agentStore: AgentStore, opsOptions: OpsMetricsCollectorOptions) {
|
||||
this.processCollector = new ProcessMetricsCollector();
|
||||
this.osCollector = new OsMetricsCollector(opsOptions);
|
||||
this.serverCollector = new ServerMetricsCollector(server);
|
||||
this.esClientCollector = new ElasticsearchClientsMetricsCollector(agentStore);
|
||||
}
|
||||
|
||||
public async collect(): Promise<OpsMetrics> {
|
||||
const [processes, os, server] = await Promise.all([
|
||||
const [processes, os, esClient, server] = await Promise.all([
|
||||
this.processCollector.collect(),
|
||||
this.osCollector.collect(),
|
||||
this.esClientCollector.collect(),
|
||||
this.serverCollector.collect(),
|
||||
]);
|
||||
|
||||
|
@ -43,6 +48,7 @@ export class OpsMetricsCollector implements MetricsCollector<OpsMetrics> {
|
|||
process: processes[0],
|
||||
processes,
|
||||
os,
|
||||
elasticsearch_client: esClient,
|
||||
...server,
|
||||
};
|
||||
}
|
||||
|
|
|
@ -6,4 +6,4 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
export { metricsServiceMock } from './src/metrics_service.mock';
|
||||
export { metricsServiceMock, sampleEsClientMetrics } from './src/metrics_service.mock';
|
||||
|
|
|
@ -17,7 +17,25 @@ import {
|
|||
mocked as eventLoopDelaysMonitorMock,
|
||||
collectorMock,
|
||||
} from '@kbn/core-metrics-collectors-server-mocks';
|
||||
import type { MetricsServiceSetup, MetricsServiceStart } from '@kbn/core-metrics-server';
|
||||
import type {
|
||||
ElasticsearchClientsMetrics,
|
||||
MetricsServiceSetup,
|
||||
MetricsServiceStart,
|
||||
} from '@kbn/core-metrics-server';
|
||||
|
||||
export const sampleEsClientMetrics: ElasticsearchClientsMetrics = {
|
||||
protocol: 'https',
|
||||
connectedNodes: 3,
|
||||
nodesWithActiveSockets: 3,
|
||||
nodesWithIdleSockets: 1,
|
||||
totalActiveSockets: 25,
|
||||
totalIdleSockets: 2,
|
||||
totalQueuedRequests: 0,
|
||||
mostActiveNodeSockets: 15,
|
||||
averageActiveSocketsPerNode: 8,
|
||||
mostIdleNodeSockets: 2,
|
||||
averageIdleSocketsPerNode: 0.5,
|
||||
};
|
||||
|
||||
const createInternalSetupContractMock = () => {
|
||||
const setupContract: jest.Mocked<InternalMetricsServiceSetup> = {
|
||||
|
@ -39,6 +57,7 @@ const createInternalSetupContractMock = () => {
|
|||
memory: { total_in_bytes: 1, free_in_bytes: 1, used_in_bytes: 1 },
|
||||
uptime_in_millis: 1,
|
||||
},
|
||||
elasticsearch_client: sampleEsClientMetrics,
|
||||
response_times: { avg_in_millis: 1, max_in_millis: 1 },
|
||||
requests: { disconnects: 1, total: 1, statusCodes: { '200': 1 } },
|
||||
concurrent_connections: 1,
|
||||
|
|
|
@ -14,4 +14,6 @@ export type {
|
|||
OpsProcessMetrics,
|
||||
OpsOsMetrics,
|
||||
OpsServerMetrics,
|
||||
ElasticsearchClientProtocol,
|
||||
ElasticsearchClientsMetrics,
|
||||
} from './src/metrics';
|
||||
|
|
|
@ -40,6 +40,44 @@ export interface IntervalHistogram {
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Protocol(s) used by the Elasticsearch Client
|
||||
* @public
|
||||
*/
|
||||
|
||||
export type ElasticsearchClientProtocol = 'none' | 'http' | 'https' | 'mixed';
|
||||
|
||||
/**
|
||||
* Metrics related to the elasticsearch clients
|
||||
* @public
|
||||
*/
|
||||
export interface ElasticsearchClientsMetrics {
|
||||
/** The protocol (or protocols) that these Agents are using */
|
||||
protocol: ElasticsearchClientProtocol;
|
||||
/** Number of ES nodes that ES-js client is connecting to */
|
||||
connectedNodes: number;
|
||||
/** Number of nodes with active connections */
|
||||
nodesWithActiveSockets: number;
|
||||
/** Number of nodes with available connections (alive but idle).
|
||||
* Note that a node can have both active and idle connections at the same time
|
||||
*/
|
||||
nodesWithIdleSockets: number;
|
||||
/** Total number of active sockets (all nodes, all connections) */
|
||||
totalActiveSockets: number;
|
||||
/** Total number of available sockets (alive but idle, all nodes, all connections) */
|
||||
totalIdleSockets: number;
|
||||
/** Total number of queued requests (all nodes, all connections) */
|
||||
totalQueuedRequests: number;
|
||||
/** Number of active connections of the node with most active connections */
|
||||
mostActiveNodeSockets: number;
|
||||
/** Average of active sockets per node (all connections) */
|
||||
averageActiveSocketsPerNode: number;
|
||||
/** Number of idle connections of the node with most idle connections */
|
||||
mostIdleNodeSockets: number;
|
||||
/** Average of available (idle) sockets per node (all connections) */
|
||||
averageIdleSocketsPerNode: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process related metrics
|
||||
* @public
|
||||
|
@ -165,6 +203,10 @@ export interface OpsServerMetrics {
|
|||
export interface OpsMetrics {
|
||||
/** Time metrics were recorded at. */
|
||||
collected_at: Date;
|
||||
/**
|
||||
* Metrics related to the elasticsearch client
|
||||
*/
|
||||
elasticsearch_client: ElasticsearchClientsMetrics;
|
||||
/**
|
||||
* Process related metrics.
|
||||
* @deprecated use the processes field instead.
|
||||
|
|
|
@ -135,6 +135,7 @@ export const registerStatusRoute = ({
|
|||
...lastMetrics.requests,
|
||||
status_codes: lastMetrics.requests.statusCodes,
|
||||
},
|
||||
elasticsearch_client: lastMetrics.elasticsearch_client,
|
||||
},
|
||||
};
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ export const elasticsearch = new ElasticsearchService(logger, kibanaPackageJson.
|
|||
logger,
|
||||
type,
|
||||
// we use an independent AgentManager for cli_setup, no need to track performance of this one
|
||||
agentManager: new AgentManager(),
|
||||
agentFactoryProvider: new AgentManager(),
|
||||
kibanaVersion: kibanaPackageJson.version,
|
||||
});
|
||||
},
|
||||
|
|
|
@ -280,7 +280,10 @@ export class Server {
|
|||
executionContext: executionContextSetup,
|
||||
});
|
||||
|
||||
const metricsSetup = await this.metrics.setup({ http: httpSetup });
|
||||
const metricsSetup = await this.metrics.setup({
|
||||
http: httpSetup,
|
||||
elasticsearchService: elasticsearchServiceSetup,
|
||||
});
|
||||
|
||||
const coreUsageDataSetup = this.coreUsageData.setup({
|
||||
http: httpSetup,
|
||||
|
|
|
@ -3,6 +3,19 @@
|
|||
exports[`telemetry_ops_stats should return something when there is a metric 1`] = `
|
||||
Object {
|
||||
"concurrent_connections": 1,
|
||||
"elasticsearch_client": Object {
|
||||
"averageActiveSocketsPerNode": 8,
|
||||
"averageIdleSocketsPerNode": 0.5,
|
||||
"connectedNodes": 3,
|
||||
"mostActiveNodeSockets": 15,
|
||||
"mostIdleNodeSockets": 2,
|
||||
"nodesWithActiveSockets": 3,
|
||||
"nodesWithIdleSockets": 1,
|
||||
"protocol": "https",
|
||||
"totalActiveSockets": 25,
|
||||
"totalIdleSockets": 2,
|
||||
"totalQueuedRequests": 0,
|
||||
},
|
||||
"os": Object {
|
||||
"load": Object {
|
||||
"15m": 1,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue