[Elasticsearch] Log queued requests (#152571)

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Alejandro Fernández Haro 2023-03-06 14:25:44 +01:00 committed by GitHub
parent b3807ba0b9
commit f10777e5d5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 115 additions and 71 deletions

View file

@ -9,7 +9,7 @@
export { ScopedClusterClient } from './src/scoped_cluster_client';
export { ClusterClient } from './src/cluster_client';
export { configureClient } from './src/configure_client';
export { type AgentStore, AgentManager, type NetworkAgent } from './src/agent_manager';
export { type AgentStatsProvider, AgentManager, type NetworkAgent } from './src/agent_manager';
export { getRequestDebugMeta, getErrorMessage } from './src/log_query_and_deprecation';
export {
PRODUCT_RESPONSE_HEADER,

View file

@ -6,9 +6,12 @@
* Side Public License, v 1.
*/
import { AgentManager } from './agent_manager';
import { Agent as HttpAgent } from 'http';
import { Agent as HttpsAgent } from 'https';
import type { ElasticsearchClientsMetrics } from '@kbn/core-metrics-server';
import { loggerMock, type MockedLogger } from '@kbn/logging-mocks';
import { getAgentsSocketsStatsMock } from './get_agents_sockets_stats.test.mocks';
import { AgentManager } from './agent_manager';
jest.mock('http');
jest.mock('https');
@ -17,6 +20,12 @@ const HttpAgentMock = HttpAgent as jest.Mock<HttpAgent>;
const HttpsAgentMock = HttpsAgent as jest.Mock<HttpsAgent>;
describe('AgentManager', () => {
let logger: MockedLogger;
beforeEach(() => {
logger = loggerMock.create();
});
afterEach(() => {
HttpAgentMock.mockClear();
HttpsAgentMock.mockClear();
@ -24,7 +33,7 @@ describe('AgentManager', () => {
describe('#getAgentFactory()', () => {
it('provides factories which are different at each call', () => {
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory1 = agentManager.getAgentFactory();
const agentFactory2 = agentManager.getAgentFactory();
expect(agentFactory1).not.toEqual(agentFactory2);
@ -36,7 +45,7 @@ describe('AgentManager', () => {
HttpAgentMock.mockImplementationOnce(() => mockedHttpAgent);
const mockedHttpsAgent = new HttpsAgent();
HttpsAgentMock.mockImplementationOnce(() => mockedHttpsAgent);
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory = agentManager.getAgentFactory();
const httpAgent = agentFactory({ url: new URL('http://elastic-node-1:9200') });
const httpsAgent = agentFactory({ url: new URL('https://elastic-node-1:9200') });
@ -45,7 +54,7 @@ describe('AgentManager', () => {
});
it('takes into account the provided configurations', () => {
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory = agentManager.getAgentFactory({
maxTotalSockets: 1024,
scheduling: 'fifo',
@ -68,7 +77,7 @@ describe('AgentManager', () => {
});
it('provides Agents that match the URLs protocol', () => {
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory = agentManager.getAgentFactory();
agentFactory({ url: new URL('http://elastic-node-1:9200') });
expect(HttpAgent).toHaveBeenCalledTimes(1);
@ -79,7 +88,7 @@ describe('AgentManager', () => {
});
it('provides the same Agent if URLs use the same protocol', () => {
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory = agentManager.getAgentFactory();
const agent1 = agentFactory({ url: new URL('http://elastic-node-1:9200') });
const agent2 = agentFactory({ url: new URL('http://elastic-node-2:9200') });
@ -92,7 +101,7 @@ describe('AgentManager', () => {
});
it('dereferences an agent instance when the agent is closed', () => {
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory = agentManager.getAgentFactory();
const agent = agentFactory({ url: new URL('http://elastic-node-1:9200') });
// eslint-disable-next-line dot-notation
@ -105,7 +114,7 @@ describe('AgentManager', () => {
describe('two agent factories', () => {
it('never provide the same Agent instance even if they use the same type', () => {
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory1 = agentManager.getAgentFactory();
const agentFactory2 = agentManager.getAgentFactory();
const agent1 = agentFactory1({ url: new URL('http://elastic-node-1:9200') });
@ -115,20 +124,34 @@ describe('AgentManager', () => {
});
});
describe('#getAgents()', () => {
it('returns the created HTTP and HTTPs Agent instances', () => {
const agentManager = new AgentManager();
const agentFactory1 = agentManager.getAgentFactory();
const agentFactory2 = agentManager.getAgentFactory();
const agent1 = agentFactory1({ url: new URL('http://elastic-node-1:9200') });
const agent2 = agentFactory2({ url: new URL('http://elastic-node-1:9200') });
const agent3 = agentFactory1({ url: new URL('https://elastic-node-1:9200') });
const agent4 = agentFactory2({ url: new URL('https://elastic-node-1:9200') });
describe('#getAgentsStats()', () => {
it('returns the stats of the agents', () => {
const agentManager = new AgentManager(logger);
const metrics: ElasticsearchClientsMetrics = {
totalQueuedRequests: 0,
totalIdleSockets: 100,
totalActiveSockets: 400,
};
getAgentsSocketsStatsMock.mockReturnValue(metrics);
const agents = agentManager.getAgents();
expect(agentManager.getAgentsStats()).toStrictEqual(metrics);
});
expect(agents.size).toEqual(4);
expect([...agents]).toEqual(expect.arrayContaining([agent1, agent2, agent3, agent4]));
it('warns when there are queued requests (requests unassigned to any socket)', () => {
const agentManager = new AgentManager(logger);
const metrics: ElasticsearchClientsMetrics = {
totalQueuedRequests: 2,
totalIdleSockets: 100, // There may be idle sockets when many clients are initialized. It should not be taken as an indicator of health.
totalActiveSockets: 400,
};
getAgentsSocketsStatsMock.mockReturnValue(metrics);
expect(agentManager.getAgentsStats()).toStrictEqual(metrics);
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn).toHaveBeenCalledWith(
'There are 2 queued requests. If this number is constantly high, consider scaling Kibana horizontally or increasing "elasticsearch.maxSockets" in the config.'
);
});
});
});

View file

@ -9,6 +9,9 @@
import { Agent as HttpAgent, type AgentOptions } from 'http';
import { Agent as HttpsAgent } from 'https';
import type { ConnectionOptions, HttpAgentOptions } from '@elastic/elasticsearch';
import type { Logger } from '@kbn/logging';
import type { ElasticsearchClientsMetrics } from '@kbn/core-metrics-server';
import { getAgentsSocketsStats } from './get_agents_sockets_stats';
const HTTPS = 'https:';
@ -19,8 +22,14 @@ export interface AgentFactoryProvider {
getAgentFactory(agentOptions?: HttpAgentOptions): AgentFactory;
}
export interface AgentStore {
getAgents(): Set<NetworkAgent>;
/**
* Exposes the APIs to fetch stats of the existing agents.
*/
export interface AgentStatsProvider {
/**
* Returns the {@link ElasticsearchClientsMetrics}, to understand the load on the Elasticsearch HTTP agents.
*/
getAgentsStats(): ElasticsearchClientsMetrics;
}
/**
@ -34,10 +43,10 @@ export interface AgentStore {
* exposes methods that can modify the underlying pools, effectively impacting the connections of other Clients.
* @internal
**/
export class AgentManager implements AgentFactoryProvider, AgentStore {
private agents: Set<HttpAgent>;
export class AgentManager implements AgentFactoryProvider, AgentStatsProvider {
private readonly agents: Set<HttpAgent>;
constructor() {
constructor(private readonly logger: Logger) {
this.agents = new Set();
}
@ -69,8 +78,16 @@ export class AgentManager implements AgentFactoryProvider, AgentStore {
};
}
public getAgents(): Set<NetworkAgent> {
return this.agents;
public getAgentsStats(): ElasticsearchClientsMetrics {
const stats = getAgentsSocketsStats(this.agents);
if (stats.totalQueuedRequests > 0) {
this.logger.warn(
`There are ${stats.totalQueuedRequests} queued requests. If this number is constantly high, consider scaling Kibana horizontally or increasing "elasticsearch.maxSockets" in the config.`
);
}
return stats;
}
}

View file

@ -57,7 +57,7 @@ describe('ClusterClient', () => {
logger = loggingSystemMock.createLogger();
internalClient = createClient();
scopedClient = createClient();
agentFactoryProvider = new AgentManager();
agentFactoryProvider = new AgentManager(logger);
authHeaders = httpServiceMock.createAuthHeaderStorage();
authHeaders.get.mockImplementation(() => ({

View file

@ -54,7 +54,7 @@ describe('configureClient', () => {
config = createFakeConfig();
parseClientOptionsMock.mockReturnValue({});
ClientMock.mockImplementation(() => createFakeClient());
agentFactoryProvider = new AgentManager();
agentFactoryProvider = new AgentManager(logger);
});
afterEach(() => {

View file

@ -6,8 +6,8 @@
* Side Public License, v 1.
*/
import { NetworkAgent } from '@kbn/core-elasticsearch-client-server-internal';
import type { ElasticsearchClientsMetrics } from '@kbn/core-metrics-server';
import type { NetworkAgent } from './agent_manager';
export const getAgentsSocketsStats = (agents: Set<NetworkAgent>): ElasticsearchClientsMetrics => {
const nodes = new Set<string>();

View file

@ -21,6 +21,7 @@
"@kbn/logging-mocks",
"@kbn/core-logging-server-mocks",
"@kbn/core-http-server-mocks",
"@kbn/core-metrics-server",
],
"exclude": [
"target/**/*",

View file

@ -15,4 +15,4 @@ export type {
DeeplyMockedApi,
ElasticsearchClientMock,
} from './src/mocks';
export { createAgentStoreMock } from './src/agent_manager.mocks';
export { createAgentStatsProviderMock } from './src/agent_manager.mocks';

View file

@ -6,8 +6,8 @@
* Side Public License, v 1.
*/
import type { AgentStore, NetworkAgent } from '@kbn/core-elasticsearch-client-server-internal';
import type { AgentStatsProvider } from '@kbn/core-elasticsearch-client-server-internal';
export const createAgentStoreMock = (agents: Set<NetworkAgent> = new Set()): AgentStore => ({
getAgents: jest.fn(() => agents),
export const createAgentStatsProviderMock = (): jest.Mocked<AgentStatsProvider> => ({
getAgentsStats: jest.fn(),
});

View file

@ -10,7 +10,7 @@ import type { AgentManager } from '@kbn/core-elasticsearch-client-server-interna
export const MockClusterClient = jest.fn();
export const MockAgentManager: jest.MockedClass<typeof AgentManager> = jest.fn().mockReturnValue({
getAgents: jest.fn(),
getAgentsStats: jest.fn(),
getAgentFactory: jest.fn(),
});

View file

@ -201,9 +201,9 @@ describe('#setup', () => {
);
});
it('returns an AgentStore as part of the contract', async () => {
it('returns an AgentStatsProvider as part of the contract', async () => {
const setupContract = await elasticsearchService.setup(setupDeps);
expect(typeof setupContract.agentStore.getAgents).toEqual('function');
expect(typeof setupContract.agentStatsProvider.getAgentsStats).toEqual('function');
});
it('esNodeVersionCompatibility$ only starts polling when subscribed to', async () => {

View file

@ -66,7 +66,7 @@ export class ElasticsearchService
this.config$ = coreContext.configService
.atPath<ElasticsearchConfigType>('elasticsearch')
.pipe(map((rawConfig) => new ElasticsearchConfig(rawConfig)));
this.agentManager = new AgentManager();
this.agentManager = new AgentManager(this.log.get('agent-manager'));
}
public async preboot(): Promise<InternalElasticsearchServicePreboot> {
@ -120,7 +120,9 @@ export class ElasticsearchService
}
this.unauthorizedErrorHandler = handler;
},
agentStore: this.agentManager,
agentStatsProvider: {
getAgentsStats: this.agentManager.getAgentsStats.bind(this.agentManager),
},
};
}

View file

@ -12,7 +12,7 @@ import type {
ElasticsearchServiceStart,
ElasticsearchServiceSetup,
} from '@kbn/core-elasticsearch-server';
import type { AgentStore } from '@kbn/core-elasticsearch-client-server-internal';
import type { AgentStatsProvider } from '@kbn/core-elasticsearch-client-server-internal';
import type { ServiceStatus } from '@kbn/core-status-common';
import type { NodesVersionCompatibility, NodeInfo } from './version_check/ensure_es_version';
import type { ClusterInfo } from './get_cluster_info';
@ -22,7 +22,7 @@ export type InternalElasticsearchServicePreboot = ElasticsearchServicePreboot;
/** @internal */
export interface InternalElasticsearchServiceSetup extends ElasticsearchServiceSetup {
agentStore: AgentStore;
agentStatsProvider: AgentStatsProvider;
clusterInfo$: Observable<ClusterInfo>;
esNodesCompatibility$: Observable<NodesVersionCompatibility>;
status$: Observable<ServiceStatus<ElasticsearchStatusMeta>>;

View file

@ -13,7 +13,7 @@ import {
elasticsearchClientMock,
type ClusterClientMock,
type CustomClusterClientMock,
createAgentStoreMock,
createAgentStatsProviderMock,
} from '@kbn/core-elasticsearch-client-server-mocks';
import type {
ElasticsearchClientConfig,
@ -95,7 +95,7 @@ const createInternalSetupContractMock = () => {
level: ServiceStatusLevels.available,
summary: 'Elasticsearch is available',
}),
agentStore: createAgentStoreMock(),
agentStatsProvider: createAgentStatsProviderMock(),
};
return internalSetupContract;
};

View file

@ -6,13 +6,9 @@
* Side Public License, v 1.
*/
import { Agent as HttpAgent } from 'http';
import { Agent as HttpsAgent } from 'https';
import type { ElasticsearchClientsMetrics } from '@kbn/core-metrics-server';
import { createAgentStoreMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { getAgentsSocketsStatsMock } from './get_agents_sockets_stats.test.mocks';
import { createAgentStatsProviderMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { ElasticsearchClientsMetricsCollector } from './elasticsearch_client';
import { getAgentsSocketsStats } from './get_agents_sockets_stats';
jest.mock('@kbn/core-elasticsearch-client-server-internal');
@ -24,16 +20,12 @@ export const sampleEsClientMetrics: ElasticsearchClientsMetrics = {
describe('ElasticsearchClientsMetricsCollector', () => {
test('#collect calls getAgentsSocketsStats with the Agents managed by the provided AgentManager', async () => {
const agents = new Set<HttpAgent>([new HttpAgent(), new HttpsAgent()]);
const agentStore = createAgentStoreMock(agents);
getAgentsSocketsStatsMock.mockReturnValueOnce(sampleEsClientMetrics);
const agentStatsProvider = createAgentStatsProviderMock();
agentStatsProvider.getAgentsStats.mockReturnValue(sampleEsClientMetrics);
const esClientsMetricsCollector = new ElasticsearchClientsMetricsCollector(agentStore);
const esClientsMetricsCollector = new ElasticsearchClientsMetricsCollector(agentStatsProvider);
const metrics = await esClientsMetricsCollector.collect();
expect(agentStore.getAgents).toHaveBeenCalledTimes(1);
expect(getAgentsSocketsStats).toHaveBeenCalledTimes(1);
expect(getAgentsSocketsStats).toHaveBeenNthCalledWith(1, agents);
expect(metrics).toEqual(sampleEsClientMetrics);
});
});

View file

@ -7,16 +7,15 @@
*/
import type { ElasticsearchClientsMetrics, MetricsCollector } from '@kbn/core-metrics-server';
import type { AgentStore } from '@kbn/core-elasticsearch-client-server-internal';
import { getAgentsSocketsStats } from './get_agents_sockets_stats';
import type { AgentStatsProvider } from '@kbn/core-elasticsearch-client-server-internal';
export class ElasticsearchClientsMetricsCollector
implements MetricsCollector<ElasticsearchClientsMetrics>
{
constructor(private readonly agentStore: AgentStore) {}
constructor(private readonly agentStatsProvider: AgentStatsProvider) {}
public async collect(): Promise<ElasticsearchClientsMetrics> {
return await getAgentsSocketsStats(this.agentStore.getAgents());
return await this.agentStatsProvider.getAgentsStats();
}
public reset() {

View file

@ -52,7 +52,7 @@ describe('MetricsService', () => {
expect(OpsMetricsCollector).toHaveBeenCalledTimes(1);
expect(OpsMetricsCollector).toHaveBeenCalledWith(
httpMock.server,
esServiceMock.agentStore,
esServiceMock.agentStatsProvider,
expect.objectContaining({ logger: logger.get('metrics') })
);

View file

@ -55,10 +55,14 @@ export class MetricsService
this.coreContext.configService.atPath<OpsConfigType>(OPS_CONFIG_PATH)
);
this.metricsCollector = new OpsMetricsCollector(http.server, elasticsearchService.agentStore, {
logger: this.logger,
...config.cGroupOverrides,
});
this.metricsCollector = new OpsMetricsCollector(
http.server,
elasticsearchService.agentStatsProvider,
{
logger: this.logger,
...config.cGroupOverrides,
}
);
await this.refreshMetrics();

View file

@ -29,7 +29,7 @@ describe('OpsMetricsCollector', () => {
beforeEach(() => {
const hapiServer = httpServiceMock.createInternalSetupContract().server;
const agentManager = new AgentManager();
const agentManager = new AgentManager(loggerMock.create());
collector = new OpsMetricsCollector(hapiServer, agentManager, { logger: loggerMock.create() });
mockOsCollector.collect.mockResolvedValue('osMetrics');

View file

@ -8,7 +8,7 @@
import { Server as HapiServer } from '@hapi/hapi';
import type { OpsMetrics, MetricsCollector } from '@kbn/core-metrics-server';
import type { AgentStore } from '@kbn/core-elasticsearch-client-server-internal';
import type { AgentStatsProvider } from '@kbn/core-elasticsearch-client-server-internal';
import {
ProcessMetricsCollector,
OsMetricsCollector,
@ -23,11 +23,15 @@ export class OpsMetricsCollector implements MetricsCollector<OpsMetrics> {
private readonly serverCollector: ServerMetricsCollector;
private readonly esClientCollector: ElasticsearchClientsMetricsCollector;
constructor(server: HapiServer, agentStore: AgentStore, opsOptions: OpsMetricsCollectorOptions) {
constructor(
server: HapiServer,
agentStatsProvider: AgentStatsProvider,
opsOptions: OpsMetricsCollectorOptions
) {
this.processCollector = new ProcessMetricsCollector();
this.osCollector = new OsMetricsCollector(opsOptions);
this.serverCollector = new ServerMetricsCollector(server);
this.esClientCollector = new ElasticsearchClientsMetricsCollector(agentStore);
this.esClientCollector = new ElasticsearchClientsMetricsCollector(agentStatsProvider);
}
public async collect(): Promise<OpsMetrics> {

View file

@ -49,7 +49,7 @@ export const elasticsearch = new ElasticsearchService(logger, kibanaPackageJson.
logger,
type,
// we use an independent AgentManager for cli_setup, no need to track performance of this one
agentFactoryProvider: new AgentManager(),
agentFactoryProvider: new AgentManager(logger.get('agent-manager')),
kibanaVersion: kibanaPackageJson.version,
});
},

View file

@ -233,7 +233,9 @@ const getElasticsearchClient = async (
return configureClient(esClientConfig, {
logger: loggerFactory.get('elasticsearch'),
type: 'data',
agentFactoryProvider: new AgentManager(),
agentFactoryProvider: new AgentManager(
loggerFactory.get('elasticsearch-service', 'agent-manager')
),
kibanaVersion,
});
};