[8.0] log ES client response size (#123173) (#123214)

* log ES client response size (#123173)

(cherry picked from commit c1b98ba7fb)

* remove unsupported syntax

* remove unsupported syntax 2

* fix unsupported syntax

Co-authored-by: Mikhail Shustov <mikhail.shustov@elastic.co>
Co-authored-by: Mikhail Shustov <restrry@gmail.com>
This commit is contained in:
Kibana Machine 2022-01-18 09:43:26 -05:00 committed by GitHub
parent 8f1f5cd9e1
commit 857d85cc83
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 346 additions and 103 deletions

View file

@ -0,0 +1,172 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { DiagnosticResult, ConnectionRequestParams } from '@elastic/elasticsearch';
import { getEcsResponseLog } from './get_ecs_response_log';
interface ResponseFixtureOptions {
requestParams?: Partial<ConnectionRequestParams>;
response?: {
body?: any;
headers?: Record<string, string | string[]>;
statusCode?: number;
};
}
function createResponseEvent({
requestParams = {},
response = {},
}: ResponseFixtureOptions = {}): DiagnosticResult {
return {
body: response.body ?? {},
statusCode: response.statusCode ?? 200,
headers: response.headers ?? {},
meta: {
request: {
params: {
headers: requestParams.headers ?? { 'content-length': '123' },
method: requestParams.method ?? 'get',
path: requestParams.path ?? '/path',
querystring: requestParams.querystring ?? '?wait_for_completion=true',
},
options: {
id: '42',
},
} as DiagnosticResult['meta']['request'],
} as DiagnosticResult['meta'],
warnings: null,
};
}
describe('getEcsResponseLog', () => {
describe('filters sensitive headers', () => {
test('redacts Authorization and Cookie headers by default', () => {
const event = createResponseEvent({
requestParams: { headers: { authorization: 'a', cookie: 'b', 'user-agent': 'hi' } },
response: { headers: { 'content-length': '123', 'set-cookie': 'c' } },
});
const log = getEcsResponseLog(event);
// @ts-expect-error ECS custom field
expect(log.http.request.headers).toMatchInlineSnapshot(`
Object {
"authorization": "[REDACTED]",
"cookie": "[REDACTED]",
"user-agent": "hi",
}
`);
// @ts-expect-error ECS custom field
expect(log.http.response.headers).toMatchInlineSnapshot(`
Object {
"content-length": "123",
"set-cookie": "[REDACTED]",
}
`);
});
test('does not mutate original headers', () => {
const reqHeaders = { a: 'foo', b: ['hello', 'world'] };
const resHeaders = { c: 'bar' };
const event = createResponseEvent({
requestParams: { headers: reqHeaders },
response: { headers: resHeaders },
});
const log = getEcsResponseLog(event);
expect(reqHeaders).toMatchInlineSnapshot(`
Object {
"a": "foo",
"b": Array [
"hello",
"world",
],
}
`);
expect(resHeaders).toMatchInlineSnapshot(`
Object {
"c": "bar",
}
`);
// @ts-expect-error ECS custom field
log.http.request.headers.a = 'testA';
// @ts-expect-error ECS custom field
log.http.request.headers.b[1] = 'testB';
// @ts-expect-error ECS custom field
log.http.request.headers.c = 'testC';
expect(reqHeaders).toMatchInlineSnapshot(`
Object {
"a": "foo",
"b": Array [
"hello",
"testB",
],
}
`);
expect(resHeaders).toMatchInlineSnapshot(`
Object {
"c": "bar",
}
`);
});
test('does not mutate original headers when redacting sensitive data', () => {
const reqHeaders = { authorization: 'a', cookie: 'b', 'user-agent': 'hi' };
const resHeaders = { 'content-length': '123', 'set-cookie': 'c' };
const event = createResponseEvent({
requestParams: { headers: reqHeaders },
response: { headers: resHeaders },
});
getEcsResponseLog(event);
expect(reqHeaders).toMatchInlineSnapshot(`
Object {
"authorization": "a",
"cookie": "b",
"user-agent": "hi",
}
`);
expect(resHeaders).toMatchInlineSnapshot(`
Object {
"content-length": "123",
"set-cookie": "c",
}
`);
});
});
describe('ecs', () => {
test('provides an ECS-compatible response', () => {
const event = createResponseEvent();
const result = getEcsResponseLog(event, 123);
expect(result).toMatchInlineSnapshot(`
Object {
"http": Object {
"request": Object {
"headers": Object {
"content-length": "123",
},
"id": undefined,
"method": "GET",
},
"response": Object {
"body": Object {
"bytes": 123,
},
"headers": Object {},
"status_code": 200,
},
},
"url": Object {
"path": "/path",
"query": "?wait_for_completion=true",
},
}
`);
});
});
});

View file

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { IncomingHttpHeaders } from 'http';
import type { DiagnosticResult } from '@elastic/elasticsearch';
import type { LogMeta } from '@kbn/logging';
const FORBIDDEN_HEADERS = ['authorization', 'cookie', 'set-cookie'];
const REDACTED_HEADER_TEXT = '[REDACTED]';
// We are excluding sensitive headers by default, until we have a log filtering mechanism.
function redactSensitiveHeaders(key: string, value: string | string[]): string | string[] {
return FORBIDDEN_HEADERS.includes(key) ? REDACTED_HEADER_TEXT : value;
}
// Shallow clone the headers so they are not mutated if filtered by a RewriteAppender.
function cloneAndFilterHeaders(headers?: IncomingHttpHeaders) {
const result = {} as IncomingHttpHeaders;
if (headers) {
for (const key of Object.keys(headers)) {
const value = headers[key];
if (value) {
result[key] = redactSensitiveHeaders(key, value);
}
}
}
return result;
}
/**
* Retruns ECS-compliant `LogMeta` for logging.
*
* @internal
*/
export function getEcsResponseLog(event: DiagnosticResult, bytes?: number): LogMeta {
const meta: LogMeta = {
http: {
request: {
id: event.meta.request.options.opaqueId,
method: event.meta.request.params.method.toUpperCase(),
// @ts-expect-error ECS custom field: https://github.com/elastic/ecs/issues/232.
headers: cloneAndFilterHeaders(event.meta.request.params.headers),
},
response: {
body: {
bytes,
},
status_code: event.statusCode,
// @ts-expect-error ECS custom field: https://github.com/elastic/ecs/issues/232.
headers: cloneAndFilterHeaders(event.headers),
},
},
url: {
path: event.meta.request.params.path,
query: event.meta.request.params.querystring,
},
};
return meta;
}

View file

@ -10,13 +10,14 @@ import { Buffer } from 'buffer';
import { Readable } from 'stream';
import {
errors,
Client,
ConnectionRequestParams,
errors,
TransportRequestOptions,
TransportRequestParams,
DiagnosticResult,
RequestBody,
} from '@elastic/elasticsearch';
import type { DiagnosticResult, RequestBody } from '@elastic/elasticsearch';
import { parseClientOptionsMock, ClientMock } from './configure_client.test.mocks';
import { loggingSystemMock } from '../../logging/logging_system.mock';
@ -27,7 +28,7 @@ const createApiResponse = <T>({
statusCode = 200,
headers = {},
warnings = null,
params,
params = { method: 'GET', path: '/path', querystring: '?wait_for_completion=true' },
requestOptions = {},
}: {
body: T;
@ -77,10 +78,14 @@ describe('instrumentQueryAndDeprecationLogger', () => {
jest.clearAllMocks();
});
function createResponseWithBody(body?: RequestBody) {
function createResponseWithBody(
body?: RequestBody,
params?: { headers?: Record<string, string> }
) {
return createApiResponse({
body: {},
statusCode: 200,
headers: params?.headers ?? {},
params: {
method: 'GET',
path: '/foo',
@ -107,15 +112,10 @@ describe('instrumentQueryAndDeprecationLogger', () => {
});
client.diagnostic.emit('response', null, response);
expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(`
Array [
Array [
"200
expect(loggingSystemMock.collect(logger).debug[0][0]).toMatchInlineSnapshot(`
"200
GET /foo?hello=dolly
{\\"seq_no_primary_term\\":true,\\"query\\":{\\"term\\":{\\"user\\":\\"kimchy\\"}}}",
undefined,
],
]
{\\"seq_no_primary_term\\":true,\\"query\\":{\\"term\\":{\\"user\\":\\"kimchy\\"}}}"
`);
});
@ -132,15 +132,10 @@ describe('instrumentQueryAndDeprecationLogger', () => {
);
client.diagnostic.emit('response', null, response);
expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(`
Array [
Array [
"200
expect(loggingSystemMock.collect(logger).debug[0][0]).toMatchInlineSnapshot(`
"200
GET /foo?hello=dolly
{\\"seq_no_primary_term\\":true,\\"query\\":{\\"term\\":{\\"user\\":\\"kimchy\\"}}}",
undefined,
],
]
{\\"seq_no_primary_term\\":true,\\"query\\":{\\"term\\":{\\"user\\":\\"kimchy\\"}}}"
`);
});
@ -159,15 +154,10 @@ describe('instrumentQueryAndDeprecationLogger', () => {
);
client.diagnostic.emit('response', null, response);
expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(`
Array [
Array [
"200
expect(loggingSystemMock.collect(logger).debug[0][0]).toMatchInlineSnapshot(`
"200
GET /foo?hello=dolly
[buffer]",
undefined,
],
]
[buffer]"
`);
});
@ -186,15 +176,10 @@ describe('instrumentQueryAndDeprecationLogger', () => {
);
client.diagnostic.emit('response', null, response);
expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(`
Array [
Array [
"200
expect(loggingSystemMock.collect(logger).debug[0][0]).toMatchInlineSnapshot(`
"200
GET /foo?hello=dolly
[stream]",
undefined,
],
]
[stream]"
`);
});
@ -204,14 +189,9 @@ describe('instrumentQueryAndDeprecationLogger', () => {
const response = createResponseWithBody();
client.diagnostic.emit('response', null, response);
expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(`
Array [
Array [
"200
GET /foo?hello=dolly",
undefined,
],
]
expect(loggingSystemMock.collect(logger).debug[0][0]).toMatchInlineSnapshot(`
"200
GET /foo?hello=dolly"
`);
});
@ -230,14 +210,9 @@ describe('instrumentQueryAndDeprecationLogger', () => {
client.diagnostic.emit('response', null, response);
expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(`
Array [
Array [
"200
GET /foo?city=M%C3%BCnich",
undefined,
],
]
expect(loggingSystemMock.collect(logger).debug[0][0]).toMatchInlineSnapshot(`
"200
GET /foo?city=M%C3%BCnich"
`);
});
@ -265,15 +240,10 @@ describe('instrumentQueryAndDeprecationLogger', () => {
});
client.diagnostic.emit('response', new errors.ResponseError(response), response);
expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(`
Array [
Array [
"500
expect(loggingSystemMock.collect(logger).debug[0][0]).toMatchInlineSnapshot(`
"500
GET /foo?hello=dolly
{\\"seq_no_primary_term\\":true,\\"query\\":{\\"term\\":{\\"user\\":\\"kimchy\\"}}} [internal server error]: internal server error",
undefined,
],
]
{\\"seq_no_primary_term\\":true,\\"query\\":{\\"term\\":{\\"user\\":\\"kimchy\\"}}} [internal server error]: internal server error"
`);
});
@ -283,14 +253,9 @@ describe('instrumentQueryAndDeprecationLogger', () => {
const response = createApiResponse({ body: {} });
client.diagnostic.emit('response', new errors.TimeoutError('message', response), response);
expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(`
Array [
Array [
"[TimeoutError]: message",
undefined,
],
]
`);
expect(loggingSystemMock.collect(logger).debug[0][0]).toMatchInlineSnapshot(
`"[TimeoutError]: message"`
);
});
it('logs debug when the client emits an ResponseError returned by elasticsearch', () => {
@ -313,14 +278,9 @@ describe('instrumentQueryAndDeprecationLogger', () => {
});
client.diagnostic.emit('response', new errors.ResponseError(response), response);
expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(`
Array [
Array [
"400
GET /_path?hello=dolly [illegal_argument_exception]: request [/_path] contains unrecognized parameter: [name]",
undefined,
],
]
expect(loggingSystemMock.collect(logger).debug[0][0]).toMatchInlineSnapshot(`
"400
GET /_path?hello=dolly [illegal_argument_exception]: request [/_path] contains unrecognized parameter: [name]"
`);
});
@ -340,14 +300,9 @@ describe('instrumentQueryAndDeprecationLogger', () => {
});
client.diagnostic.emit('response', new errors.ResponseError(response), response);
expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(`
Array [
Array [
"400
GET /_path [undefined]: {\\"error\\":{}}",
undefined,
],
]
expect(loggingSystemMock.collect(logger).debug[0][0]).toMatchInlineSnapshot(`
"400
GET /_path [undefined]: {\\"error\\":{}}"
`);
logger.debug.mockClear();
@ -363,14 +318,9 @@ describe('instrumentQueryAndDeprecationLogger', () => {
});
client.diagnostic.emit('response', new errors.ResponseError(response), response);
expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(`
Array [
Array [
"400
GET /_path [undefined]: Response Error",
undefined,
],
]
expect(loggingSystemMock.collect(logger).debug[0][0]).toMatchInlineSnapshot(`
"400
GET /_path [undefined]: Response Error"
`);
});
@ -397,8 +347,21 @@ describe('instrumentQueryAndDeprecationLogger', () => {
Object {
"http": Object {
"request": Object {
"headers": Object {},
"id": "opaque-id",
"method": "GET",
},
"response": Object {
"body": Object {
"bytes": undefined,
},
"headers": Object {},
"status_code": 400,
},
},
"url": Object {
"path": "/_path",
"query": undefined,
},
}
`);
@ -423,12 +386,48 @@ describe('instrumentQueryAndDeprecationLogger', () => {
Object {
"http": Object {
"request": Object {
"headers": Object {},
"id": "opaque-id",
"method": "GET",
},
"response": Object {
"body": Object {
"bytes": undefined,
},
"headers": Object {},
"status_code": 400,
},
},
"url": Object {
"path": "/_path",
"query": undefined,
},
}
`);
});
it('logs response size', () => {
instrumentEsQueryAndDeprecationLogger({ logger, client, type: 'test type' });
const response = createResponseWithBody(
{
seq_no_primary_term: true,
query: {
term: { user: 'kimchy' },
},
},
{
headers: { 'content-length': '12345678' },
}
);
client.diagnostic.emit('response', null, response);
expect(loggingSystemMock.collect(logger).debug[0][0]).toMatchInlineSnapshot(`
"200 - 11.8MB
GET /foo?hello=dolly
{\\"seq_no_primary_term\\":true,\\"query\\":{\\"term\\":{\\"user\\":\\"kimchy\\"}}}"
`);
});
});
describe('deprecation warnings from response headers', () => {

View file

@ -5,11 +5,13 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { IncomingHttpHeaders } from 'http';
import { Buffer } from 'buffer';
import { stringify } from 'querystring';
import { errors, DiagnosticResult, RequestBody, Client } from '@elastic/elasticsearch';
import numeral from '@elastic/numeral';
import type { ElasticsearchErrorDetails } from './types';
import { getEcsResponseLog } from './get_ecs_response_log';
import { Logger } from '../../logging';
const convertQueryString = (qs: string | Record<string, any> | undefined): string => {
@ -38,6 +40,14 @@ export function getErrorMessage(error: errors.ElasticsearchClientError): string
return `[${error.name}]: ${error.message}`;
}
function getContentLength(headers?: IncomingHttpHeaders): number | undefined {
const contentLength = headers && headers['content-length'];
if (contentLength) {
const val = parseInt(contentLength, 10);
return !isNaN(val) ? val : undefined;
}
}
/**
* returns a string in format:
*
@ -47,10 +57,10 @@ export function getErrorMessage(error: errors.ElasticsearchClientError): string
*
* so it could be copy-pasted into the Dev console
*/
function getResponseMessage(event: DiagnosticResult): string {
function getResponseMessage(event: DiagnosticResult, bytesMsg: string): string {
const errorMeta = getRequestDebugMeta(event);
const body = errorMeta.body ? `\n${errorMeta.body}` : '';
return `${errorMeta.statusCode}\n${errorMeta.method} ${errorMeta.url}${body}`;
return `${errorMeta.statusCode}${bytesMsg}\n${errorMeta.method} ${errorMeta.url}${body}`;
}
/**
@ -93,21 +103,19 @@ export const instrumentEsQueryAndDeprecationLogger = ({
const deprecationLogger = logger.get('deprecation');
client.diagnostic.on('response', (error, event) => {
if (event) {
const opaqueId = event.meta.request.options.opaqueId;
const meta = opaqueId
? {
http: { request: { id: event.meta.request.options.opaqueId } },
}
: undefined; // do not clutter logs if opaqueId is not present
const bytes = getContentLength(event.headers);
const bytesMsg = bytes ? ` - ${numeral(bytes).format('0.0b')}` : '';
const meta = getEcsResponseLog(event, bytes);
let queryMsg = '';
if (error) {
if (error instanceof errors.ResponseError) {
queryMsg = `${getResponseMessage(event)} ${getErrorMessage(error)}`;
queryMsg = `${getResponseMessage(event, bytesMsg)} ${getErrorMessage(error)}`;
} else {
queryMsg = getErrorMessage(error);
}
} else {
queryMsg = getResponseMessage(event);
queryMsg = getResponseMessage(event, bytesMsg);
}
queryLogger.debug(queryMsg, meta);