Implement elasticsearch.maxResponseSize config option (#186291)

## Summary

Fix https://github.com/elastic/kibana/issues/185042

- Add a new `elasticsearch.maxResponseSize` config option 
- Set this value to `100mb` on our serverless configuration file

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Pierre Gayvallet 2024-06-19 11:04:33 +02:00 committed by GitHub
parent 52e678a0aa
commit 85f67dd39a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 359 additions and 7 deletions

View file

@ -70,6 +70,9 @@ core.lifecycle.disablePreboot: true
# Enable ZDT migration algorithm
migrations.algorithm: zdt
# Enable elasticsearch response size circuit breaker
elasticsearch.maxResponseSize: "100mb"
# Limit batch size to reduce possibility of failures.
# A longer migration time is acceptable due to the ZDT algorithm.
migrations.batchSize: 250

View file

@ -89,6 +89,10 @@ configuration is effectively ignored when <<csp-strict, `csp.strict`>> is enable
The maximum number of sockets that can be used for communications with {es}.
*Default: `Infinity`*
[[elasticsearch-maxResponseSize]] `elasticsearch.maxResponseSize`::
Either `false` or a `byteSize` value. When set, responses from {es} with a size higher than the defined limit will be rejected.
This is intended to be used as a circuit-breaker mechanism to avoid memory errors in case of unexpectedly high responses coming from {es}.
*Default: `false`*
[[elasticsearch-maxIdleSockets]] `elasticsearch.maxIdleSockets`::
The maximum number of idle sockets to keep open between {kib} and {es}. If more sockets become idle, they will be closed.

View file

@ -7,6 +7,7 @@
*/
import { duration } from 'moment';
import { ByteSizeValue } from '@kbn/config-schema';
import type { ElasticsearchClientConfig } from '@kbn/core-elasticsearch-server';
import { parseClientOptions } from './client_config';
import { getDefaultHeaders } from './headers';
@ -19,6 +20,7 @@ const createConfig = (
compression: false,
maxSockets: Infinity,
maxIdleSockets: 300,
maxResponseSize: undefined,
idleSocketTimeout: duration(30, 'seconds'),
sniffOnStart: false,
sniffOnConnectionFault: false,
@ -152,6 +154,28 @@ describe('parseClientOptions', () => {
});
});
describe('`maxResponseSize` option', () => {
it('does not set the values on client options when undefined', () => {
const options = parseClientOptions(
createConfig({ maxResponseSize: undefined }),
false,
kibanaVersion
);
expect(options.maxResponseSize).toBe(undefined);
expect(options.maxCompressedResponseSize).toBe(undefined);
});
it('sets the right values on client options when defined', () => {
const options = parseClientOptions(
createConfig({ maxResponseSize: ByteSizeValue.parse('2kb') }),
false,
kibanaVersion
);
expect(options.maxResponseSize).toBe(2048);
expect(options.maxCompressedResponseSize).toBe(2048);
});
});
describe('`compression` option', () => {
it('`compression` is true', () => {
const options = parseClientOptions(

View file

@ -47,6 +47,11 @@ export function parseClientOptions(
compression: config.compression,
};
if (config.maxResponseSize) {
clientOptions.maxResponseSize = config.maxResponseSize.getValueInBytes();
clientOptions.maxCompressedResponseSize = config.maxResponseSize.getValueInBytes();
}
if (config.pingTimeout != null) {
clientOptions.pingTimeout = getDurationAsMs(config.pingTimeout);
}

View file

@ -265,6 +265,79 @@ describe('createTransport', () => {
);
});
});
describe('maxResponseSize options', () => {
it('does not set values when not provided in the options', async () => {
const transportClass = createTransportClass();
const transport = new transportClass(baseConstructorParams);
const requestParams = { method: 'GET', path: '/' };
await transport.request(requestParams, {});
expect(transportRequestMock).toHaveBeenCalledTimes(1);
expect(transportRequestMock).toHaveBeenCalledWith(
expect.any(Object),
expect.not.objectContaining({
maxResponseSize: expect.any(Number),
maxCompressedResponseSize: expect.any(Number),
})
);
});
it('uses `maxResponseSize` from the options when provided and when `maxCompressedResponseSize` is not', async () => {
const transportClass = createTransportClass();
const transport = new transportClass(baseConstructorParams);
const requestParams = { method: 'GET', path: '/' };
await transport.request(requestParams, { maxResponseSize: 234 });
expect(transportRequestMock).toHaveBeenCalledTimes(1);
expect(transportRequestMock).toHaveBeenCalledWith(
expect.any(Object),
expect.objectContaining({
maxResponseSize: 234,
maxCompressedResponseSize: 234,
})
);
});
it('uses `maxCompressedResponseSize` from the options when provided and when `maxResponseSize` is not', async () => {
const transportClass = createTransportClass();
const transport = new transportClass(baseConstructorParams);
const requestParams = { method: 'GET', path: '/' };
await transport.request(requestParams, { maxCompressedResponseSize: 272 });
expect(transportRequestMock).toHaveBeenCalledTimes(1);
expect(transportRequestMock).toHaveBeenCalledWith(
expect.any(Object),
expect.objectContaining({
maxResponseSize: 272,
maxCompressedResponseSize: 272,
})
);
});
it('uses individual values when both `maxResponseSize` and `maxCompressedResponseSize` are defined', async () => {
const transportClass = createTransportClass();
const transport = new transportClass(baseConstructorParams);
const requestParams = { method: 'GET', path: '/' };
await transport.request(requestParams, {
maxResponseSize: 512,
maxCompressedResponseSize: 272,
});
expect(transportRequestMock).toHaveBeenCalledTimes(1);
expect(transportRequestMock).toHaveBeenCalledWith(
expect.any(Object),
expect.objectContaining({
maxResponseSize: 512,
maxCompressedResponseSize: 272,
})
);
});
});
});
describe('unauthorized error handler', () => {

View file

@ -41,6 +41,20 @@ export const createTransport = ({
async request(params: TransportRequestParams, options?: TransportRequestOptions) {
const opts: TransportRequestOptions = options ? { ...options } : {};
// sync override of maxResponseSize and maxCompressedResponseSize
if (options) {
if (
options.maxResponseSize !== undefined &&
options.maxCompressedResponseSize === undefined
) {
opts.maxCompressedResponseSize = options.maxResponseSize;
} else if (
options.maxCompressedResponseSize !== undefined &&
options.maxResponseSize === undefined
) {
opts.maxResponseSize = options.maxCompressedResponseSize;
}
}
const opaqueId = getExecutionContext();
if (opaqueId && !opts.opaqueId) {
// rewrites headers['x-opaque-id'] if it presents

View file

@ -1051,4 +1051,41 @@ describe('instrumentQueryAndDeprecationLogger', () => {
});
});
});
describe('requests aborted due to maximum response size exceeded errors', () => {
const requestAbortedErrorMessage = `The content length (9000) is bigger than the maximum allowed buffer (42)`;
it('logs warning when the client emits a RequestAbortedError error due to excessive response length ', () => {
instrumentEsQueryAndDeprecationLogger({
logger,
client,
type: 'test type',
apisToRedactInLogs: [],
});
client.diagnostic.emit(
'response',
new errors.RequestAbortedError(requestAbortedErrorMessage),
null
);
expect(loggingSystemMock.collect(logger).warn[0][0]).toMatchInlineSnapshot(
`"Request was aborted: The content length (9000) is bigger than the maximum allowed buffer (42)"`
);
});
it('does not log warning for other type of errors', () => {
instrumentEsQueryAndDeprecationLogger({
logger,
client,
type: 'test type',
apisToRedactInLogs: [],
});
const response = createApiResponse({ body: {} });
client.diagnostic.emit('response', new errors.TimeoutError('message', response), response);
expect(loggingSystemMock.collect(logger).warn).toMatchInlineSnapshot(`Array []`);
});
});
});

View file

@ -12,7 +12,7 @@ import { stringify } from 'querystring';
import { errors, DiagnosticResult, RequestBody, Client } from '@elastic/elasticsearch';
import numeral from '@elastic/numeral';
import type { Logger } from '@kbn/logging';
import type { ElasticsearchErrorDetails } from '@kbn/es-errors';
import { isMaximumResponseSizeExceededError, type ElasticsearchErrorDetails } from '@kbn/es-errors';
import type { ElasticsearchApiToRedactInLogs } from '@kbn/core-elasticsearch-server';
import { getEcsResponseLog } from './get_ecs_response_log';
@ -171,6 +171,16 @@ function getQueryMessage(
}
}
function getResponseSizeExceededErrorMessage(error: errors.RequestAbortedError): string {
if (error.meta) {
const params = error.meta.meta.request.params;
return `Request against ${params.method} ${params.path} was aborted: ${error.message}`;
} else {
// in theory meta is always populated for such errors, but better safe than sorry
return `Request was aborted: ${error.message}`;
}
}
export const instrumentEsQueryAndDeprecationLogger = ({
logger,
client,
@ -184,6 +194,7 @@ export const instrumentEsQueryAndDeprecationLogger = ({
}) => {
const queryLogger = logger.get('query', type);
const deprecationLogger = logger.get('deprecation');
const warningLogger = logger.get('warnings'); // elasticsearch.warnings
client.diagnostic.on('response', (error, event) => {
// we could check this once and not subscribe to response events if both are disabled,
@ -191,6 +202,10 @@ export const instrumentEsQueryAndDeprecationLogger = ({
const logQuery = queryLogger.isLevelEnabled('debug');
const logDeprecation = deprecationLogger.isLevelEnabled('debug');
if (error && isMaximumResponseSizeExceededError(error)) {
warningLogger.warn(getResponseSizeExceededErrorMessage(error));
}
if (event && (logQuery || logDeprecation)) {
const bytes = getContentLength(event.headers);
const queryMsg = getQueryMessage(bytes, error, event, apisToRedactInLogs);

View file

@ -22,6 +22,7 @@
"@kbn/core-logging-server-mocks",
"@kbn/core-http-server-mocks",
"@kbn/core-metrics-server",
"@kbn/config-schema",
],
"exclude": [
"target/**/*",

View file

@ -42,6 +42,7 @@ test('set correct defaults', () => {
"idleSocketTimeout": "PT1M",
"ignoreVersionMismatch": false,
"maxIdleSockets": 256,
"maxResponseSize": undefined,
"maxSockets": 800,
"password": undefined,
"pingTimeout": "PT30S",
@ -127,6 +128,20 @@ describe('#maxSockets', () => {
});
});
describe('#maxResponseSize', () => {
test('accepts `false` value', () => {
const configValue = new ElasticsearchConfig(config.schema.validate({ maxResponseSize: false }));
expect(configValue.maxResponseSize).toBe(undefined);
});
test('accepts bytesize value', () => {
const configValue = new ElasticsearchConfig(
config.schema.validate({ maxResponseSize: '200b' })
);
expect(configValue.maxResponseSize!.getValueInBytes()).toBe(200);
});
});
test('#requestHeadersWhitelist accepts both string and array of strings', () => {
let configValue = new ElasticsearchConfig(
config.schema.validate({ requestHeadersWhitelist: 'token' })

View file

@ -6,11 +6,11 @@
* Side Public License, v 1.
*/
import { schema, TypeOf, offeringBasedSchema } from '@kbn/config-schema';
import { readFileSync } from 'fs';
import { Duration } from 'moment';
import { readPkcs12Keystore, readPkcs12Truststore } from '@kbn/crypto';
import { i18n } from '@kbn/i18n';
import { Duration } from 'moment';
import { readFileSync } from 'fs';
import { schema, offeringBasedSchema, ByteSizeValue, type TypeOf } from '@kbn/config-schema';
import type { ServiceConfigDescriptor } from '@kbn/core-base-server-internal';
import type { ConfigDeprecationProvider } from '@kbn/config';
import type {
@ -42,6 +42,9 @@ export const configSchema = schema.object({
}),
maxSockets: schema.number({ defaultValue: 800, min: 1 }),
maxIdleSockets: schema.number({ defaultValue: 256, min: 1 }),
maxResponseSize: schema.oneOf([schema.literal(false), schema.byteSize()], {
defaultValue: false,
}),
idleSocketTimeout: schema.duration({ defaultValue: '60s' }),
compression: schema.boolean({ defaultValue: false }),
username: schema.maybe(
@ -332,6 +335,12 @@ export class ElasticsearchConfig implements IElasticsearchConfig {
*/
public readonly maxIdleSockets: number;
/**
* The maximum allowed response size (both compressed and uncompressed).
* When defined, responses with a size higher than the set limit will be aborted with an error.
*/
public readonly maxResponseSize?: ByteSizeValue;
/**
* The timeout for idle sockets kept open between Kibana and Elasticsearch. If the socket is idle for longer than this timeout, it will be closed.
*/
@ -455,6 +464,8 @@ export class ElasticsearchConfig implements IElasticsearchConfig {
this.customHeaders = rawConfig.customHeaders;
this.maxSockets = rawConfig.maxSockets;
this.maxIdleSockets = rawConfig.maxIdleSockets;
this.maxResponseSize =
rawConfig.maxResponseSize !== false ? rawConfig.maxResponseSize : undefined;
this.idleSocketTimeout = rawConfig.idleSocketTimeout;
this.compression = rawConfig.compression;
this.skipStartupConnectionCheck = rawConfig.skipStartupConnectionCheck;

View file

@ -7,6 +7,7 @@
*/
import type { Duration } from 'moment';
import type { ByteSizeValue } from '@kbn/config-schema';
/**
* Definition of an API that should redact the requested body in the logs
@ -35,6 +36,7 @@ export interface ElasticsearchClientConfig {
requestHeadersWhitelist: string[];
maxSockets: number;
maxIdleSockets: number;
maxResponseSize?: ByteSizeValue;
idleSocketTimeout: Duration;
compression: boolean;
sniffOnStart: boolean;

View file

@ -13,7 +13,8 @@
"kbn_references": [
"@kbn/utility-types",
"@kbn/es-errors",
"@kbn/core-http-server"
"@kbn/core-http-server",
"@kbn/config-schema"
],
"exclude": [
"target/**/*",

View file

@ -7,5 +7,10 @@
*/
export type { ElasticsearchErrorDetails } from './src/types';
export { isUnauthorizedError, isResponseError } from './src/errors';
export {
isUnauthorizedError,
isResponseError,
isRequestAbortedError,
isMaximumResponseSizeExceededError,
} from './src/errors';
export type { UnauthorizedError } from './src/errors';

View file

@ -8,7 +8,12 @@
import { errors } from '@elastic/elasticsearch';
import type { TransportResult } from '@elastic/elasticsearch';
import { isResponseError, isUnauthorizedError } from './errors';
import {
isResponseError,
isUnauthorizedError,
isRequestAbortedError,
isMaximumResponseSizeExceededError,
} from './errors';
const createApiResponseError = ({
statusCode = 200,
@ -69,3 +74,36 @@ describe('isUnauthorizedError', () => {
expect(isUnauthorizedError(new errors.ConfigurationError('foo'))).toBe(false);
});
});
describe('isRequestAbortedError', () => {
it('returns `true` when the input is a `RequestAbortedError`', () => {
expect(isRequestAbortedError(new errors.RequestAbortedError('Oh no'))).toBe(true);
});
it('returns `false` when the input is not a `RequestAbortedError`', () => {
expect(
isRequestAbortedError(new errors.ResponseError(createApiResponseError({ statusCode: 500 })))
).toBe(false);
});
});
describe('isMaximumResponseSizeExceededError', () => {
it('returns `true` when the input is a `RequestAbortedError` with the right message', () => {
expect(
isMaximumResponseSizeExceededError(
new errors.RequestAbortedError(
`The content length (9000) is bigger than the maximum allowed buffer (42)`
)
)
).toBe(true);
});
it('returns `false` when the input is a `RequestAbortedError` without the right message', () => {
expect(isMaximumResponseSizeExceededError(new errors.RequestAbortedError('Oh no'))).toBe(false);
});
it('returns `false` when the input is not a `RequestAbortedError`', () => {
expect(
isMaximumResponseSizeExceededError(
new errors.ResponseError(createApiResponseError({ statusCode: 500 }))
)
).toBe(false);
});
});

View file

@ -31,3 +31,17 @@ export function isResponseError(error: unknown): error is errors.ResponseError {
export function isUnauthorizedError(error: unknown): error is UnauthorizedError {
return isResponseError(error) && error.statusCode === 401;
}
/**
* Checks if the provided `error` is an {@link errors.RequestAbortedError | elasticsearch request aborted error}
* @public
*/
export function isRequestAbortedError(error: unknown): error is errors.RequestAbortedError {
return error instanceof errors.RequestAbortedError;
}
export function isMaximumResponseSizeExceededError(
error: unknown
): error is errors.RequestAbortedError {
return isRequestAbortedError(error) && error.message.includes('content length');
}

View file

@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import {
createTestServers,
type TestElasticsearchUtils,
type TestKibanaUtils,
} from '@kbn/core-test-helpers-kbn-server';
import { unsafeConsole } from '@kbn/security-hardening';
describe('Elasticsearch max response size', () => {
let mockConsoleLog: jest.SpyInstance;
let esServer: TestElasticsearchUtils;
let kibanaServer: TestKibanaUtils;
beforeAll(async () => {
mockConsoleLog = jest.spyOn(unsafeConsole, 'log');
const { startES, startKibana } = createTestServers({
adjustTimeout: jest.setTimeout,
settings: {
kbn: {
logging: {
appenders: {
'test-appender': {
type: 'console',
layout: {
type: 'pattern',
},
},
},
loggers: [
{ name: 'elasticsearch.warnings', appenders: ['test-appender'], level: 'info' },
],
},
},
},
});
esServer = await startES();
kibanaServer = await startKibana();
});
beforeEach(() => {
mockConsoleLog.mockClear();
});
afterAll(async () => {
mockConsoleLog.mockRestore();
await kibanaServer.stop();
await esServer.stop();
});
it('rejects the response when the response size is larger than the requested limit', async () => {
const esClient = kibanaServer.coreStart.elasticsearch.client.asInternalUser;
try {
await esClient.cluster.stats({}, { maxResponseSize: 200 });
expect('should have thrown').toEqual('but it did not');
} catch (e) {
expect(e.name).toEqual('RequestAbortedError');
expect(e.message).toContain('is bigger than the maximum allowed string (200)');
}
});
it('logs a warning with the expected message', async () => {
const esClient = kibanaServer.coreStart.elasticsearch.client.asInternalUser;
try {
await esClient.cluster.stats({}, { maxResponseSize: 200 });
expect('should have thrown').toEqual('but it did not');
} catch (e) {
const calls = mockConsoleLog.mock.calls;
const warningCall = calls
.map((call) => call[0])
.find((call) => call.includes('elasticsearch.warnings'));
expect(warningCall).toContain(
'Request against GET /_cluster/stats was aborted: The content length'
);
expect(warningCall).toContain('is bigger than the maximum allowed string (200)');
}
});
});

View file

@ -69,6 +69,7 @@ describe('config schema', () => {
"logFetchCount": 10,
"logQueries": false,
"maxIdleSockets": 256,
"maxResponseSize": false,
"maxSockets": 800,
"pingTimeout": "PT30S",
"requestHeadersWhitelist": Array [