[ML] Explain Log Rate Spikes: Fix uncompressed streams and backpressure handling. (#142970)

- Adds a flag for `compressResponse` and `flushFix` to the request body to be able to overrule compression settings inferred from headers.
- Updates the developer examples with a toggle to run requests with compression enabled or disabled.
- Adds support for backpressure handling for response streams.
- The backpressure update includes a fix where uncompressed streams would never start streaming to the client.
- The analysis endpoint for Explain Log Rate Spikes now includes a ping every 10 seconds to keep the stream alive.
- Integration tests were updated to test both uncompressed and compressed streaming.
This commit is contained in:
Walter Rafelsberger 2022-10-14 15:30:34 +02:00 committed by GitHub
parent c8a2ee2539
commit b38bbbcea3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 291 additions and 62 deletions

View file

@ -13,5 +13,7 @@ export const reducerStreamRequestBodySchema = schema.object({
simulateErrors: schema.maybe(schema.boolean()),
/** Maximum timeout between streaming messages. */
timeout: schema.maybe(schema.number()),
/** Setting to override headers derived compression */
compressResponse: schema.maybe(schema.boolean()),
});
export type ReducerStreamRequestBodySchema = TypeOf<typeof reducerStreamRequestBodySchema>;

View file

@ -11,6 +11,8 @@ import { schema, TypeOf } from '@kbn/config-schema';
export const simpleStringStreamRequestBodySchema = schema.object({
/** Maximum timeout between streaming messages. */
timeout: schema.number(),
/** Setting to override headers derived compression */
compressResponse: schema.maybe(schema.boolean()),
});
export type SimpleStringStreamRequestBodySchema = TypeOf<
typeof simpleStringStreamRequestBodySchema

View file

@ -44,13 +44,14 @@ export const PageReducerStream: FC = () => {
const basePath = http?.basePath.get() ?? '';
const [simulateErrors, setSimulateErrors] = useState(false);
const [compressResponse, setCompressResponse] = useState(true);
const { dispatch, start, cancel, data, errors, isCancelled, isRunning } = useFetchStream<
ApiReducerStream,
typeof basePath
>(
`${basePath}/internal/response_stream/reducer_stream`,
{ simulateErrors },
{ compressResponse, simulateErrors },
{ reducer: reducerStreamReducer, initialState }
);
@ -144,6 +145,13 @@ export const PageReducerStream: FC = () => {
onChange={(e) => setSimulateErrors(!simulateErrors)}
compressed
/>
<EuiCheckbox
id="responseStreamCompressionCheckbox"
label="Toggle compression setting for response stream."
checked={compressResponse}
onChange={(e) => setCompressResponse(!compressResponse)}
compressed
/>
</EuiText>
</Page>
);

View file

@ -6,9 +6,17 @@
* Side Public License, v 1.
*/
import React, { FC } from 'react';
import React, { useState, FC } from 'react';
import { EuiButton, EuiCallOut, EuiFlexGroup, EuiFlexItem, EuiSpacer, EuiText } from '@elastic/eui';
import {
EuiButton,
EuiCallOut,
EuiCheckbox,
EuiFlexGroup,
EuiFlexItem,
EuiSpacer,
EuiText,
} from '@elastic/eui';
import { useFetchStream } from '@kbn/aiops-utils';
@ -21,10 +29,15 @@ export const PageSimpleStringStream: FC = () => {
const { core } = useDeps();
const basePath = core.http?.basePath.get() ?? '';
const [compressResponse, setCompressResponse] = useState(true);
const { dispatch, errors, start, cancel, data, isRunning } = useFetchStream<
ApiSimpleStringStream,
typeof basePath
>(`${basePath}/internal/response_stream/simple_string_stream`, { timeout: 500 });
>(`${basePath}/internal/response_stream/simple_string_stream`, {
compressResponse,
timeout: 500,
});
const onClickHandler = async () => {
if (isRunning) {
@ -58,6 +71,14 @@ export const PageSimpleStringStream: FC = () => {
</EuiFlexItem>
</EuiFlexGroup>
<EuiSpacer />
<EuiCheckbox
id="responseStreamCompressionCheckbox"
label="Toggle compression setting for response stream."
checked={compressResponse}
onChange={(e) => setCompressResponse(!compressResponse)}
compressed
/>
<EuiSpacer />
<EuiText>
<p>{data}</p>
</EuiText>

View file

@ -31,17 +31,29 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
const maxTimeoutMs = request.body.timeout ?? 250;
const simulateError = request.body.simulateErrors ?? false;
let logMessageCounter = 1;
function logDebugMessage(msg: string) {
logger.debug(`Response Stream Example #${logMessageCounter}: ${msg}`);
logMessageCounter++;
}
logDebugMessage('Starting stream.');
let shouldStop = false;
request.events.aborted$.subscribe(() => {
logDebugMessage('aborted$ subscription trigger.');
shouldStop = true;
});
request.events.completed$.subscribe(() => {
logDebugMessage('completed$ subscription trigger.');
shouldStop = true;
});
const { end, push, responseWithHeaders } = streamFactory<ReducerStreamApiAction>(
request.headers,
logger
logger,
request.body.compressResponse
);
const entities = [

View file

@ -35,7 +35,11 @@ export const defineSimpleStringStreamRoute = (router: IRouter, logger: Logger) =
shouldStop = true;
});
const { end, push, responseWithHeaders } = streamFactory(request.headers, logger);
const { end, push, responseWithHeaders } = streamFactory(
request.headers,
logger,
request.body.compressResponse
);
const text =
'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.';

View file

@ -29,7 +29,7 @@ describe('streamFactory', () => {
let mockLogger: Logger;
beforeEach(() => {
mockLogger = { error: jest.fn() } as unknown as Logger;
mockLogger = { debug: jest.fn(), error: jest.fn(), info: jest.fn() } as unknown as Logger;
});
it('should encode and receive an uncompressed string based stream', async () => {

View file

@ -14,12 +14,15 @@ import type { Headers, ResponseHeaders } from '@kbn/core-http-server';
import { acceptCompression } from './accept_compression';
// We need this otherwise Kibana server will crash with a 'ERR_METHOD_NOT_IMPLEMENTED' error.
class ResponseStream extends Stream.PassThrough {
flush() {}
_read() {}
// type guard to identify compressed stream
function isCompressedSream(arg: unknown): arg is zlib.Gzip {
return typeof arg === 'object' && arg !== null && typeof (arg as zlib.Gzip).flush === 'function';
}
const FLUSH_PAYLOAD_SIZE = 4 * 1024;
class UncompressedResponseStream extends Stream.PassThrough {}
const DELIMITER = '\n';
type StreamType = 'string' | 'ndjson';
@ -27,9 +30,9 @@ type StreamType = 'string' | 'ndjson';
interface StreamFactoryReturnType<T = unknown> {
DELIMITER: string;
end: () => void;
push: (d: T) => void;
push: (d: T, drain?: boolean) => void;
responseWithHeaders: {
body: zlib.Gzip | ResponseStream;
body: zlib.Gzip | UncompressedResponseStream;
headers?: ResponseHeaders;
};
}
@ -39,11 +42,16 @@ interface StreamFactoryReturnType<T = unknown> {
* for gzip compression depending on provided request headers.
*
* @param headers - Request headers.
* @param logger - Kibana logger.
* @param compressOverride - Optional flag to override header based compression setting.
* @param flushFix - Adds an attribute with a random string payload to overcome buffer flushing with certain proxy configurations.
*
* @returns An object with stream attributes and methods.
*/
export function streamFactory<T = string>(
headers: Headers,
logger: Logger,
compressOverride?: boolean,
flushFix?: boolean
): StreamFactoryReturnType<T>;
/**
@ -51,27 +59,72 @@ export function streamFactory<T = string>(
* request headers. Any non-string data pushed to the stream will be stream as NDJSON.
*
* @param headers - Request headers.
* @param logger - Kibana logger.
* @param compressOverride - Optional flag to override header based compression setting.
* @param flushFix - Adds an attribute with a random string payload to overcome buffer flushing with certain proxy configurations.
*
* @returns An object with stream attributes and methods.
*/
export function streamFactory<T = unknown>(
headers: Headers,
logger: Logger,
compressOverride: boolean = true,
flushFix: boolean = false
): StreamFactoryReturnType<T> {
let streamType: StreamType;
const isCompressed = acceptCompression(headers);
const isCompressed = compressOverride && acceptCompression(headers);
const stream = isCompressed ? zlib.createGzip() : new ResponseStream();
const stream = isCompressed ? zlib.createGzip() : new UncompressedResponseStream();
function end() {
stream.end();
// If waiting for draining of the stream, items will be added to this buffer.
const backPressureBuffer: T[] = [];
// Flag will be set when the "drain" listener is active so we can avoid setting multiple listeners.
let waitForDrain = false;
// Instead of a flag this is an array where we check if we are waiting on any callback from writing to the stream.
// It needs to be an array to avoid running into race conditions.
const waitForCallbacks: number[] = [];
// Flag to set if the stream should be ended. Because there could be items in the backpressure buffer, we might
// not want to end the stream right away. Once the backpressure buffer is cleared, we'll end the stream eventually.
let tryToEnd = false;
function logDebugMessage(msg: string) {
logger.debug(`HTTP Response Stream: ${msg}`);
}
function push(d: T) {
function end() {
tryToEnd = true;
logDebugMessage(`backPressureBuffer size on end(): ${backPressureBuffer.length}`);
logDebugMessage(`waitForCallbacks size on end(): ${waitForCallbacks.length}`);
// Before ending the stream, we need to empty the backPressureBuffer
if (backPressureBuffer.length > 0) {
const el = backPressureBuffer.shift();
if (el !== undefined) {
push(el, true);
}
return;
}
if (waitForCallbacks.length === 0) {
logDebugMessage('All backPressureBuffer and waitForCallbacks cleared, ending the stream.');
stream.end();
}
}
function push(d: T, drain = false) {
logDebugMessage(
`Push to stream. Current backPressure buffer size: ${backPressureBuffer.length}, drain flag: ${drain}`
);
if (d === undefined) {
logger.error('Stream chunk must not be undefined.');
return;
}
// Initialize the stream type with the first push to the stream,
// otherwise check the integrity of the data to be pushed.
if (streamType === undefined) {
@ -84,26 +137,69 @@ export function streamFactory<T = unknown>(
return;
}
if ((!drain && waitForDrain) || (!drain && backPressureBuffer.length > 0)) {
logDebugMessage('Adding item to backpressure buffer.');
backPressureBuffer.push(d);
return;
}
try {
const line =
streamType === 'ndjson'
? `${JSON.stringify({
...d,
// This is a temporary fix for response streaming with proxy configurations that buffer responses up to 4KB in size.
...(flushFix ? { flushPayload: crypto.randomBytes(4096).toString('hex') } : {}),
...(flushFix
? { flushPayload: crypto.randomBytes(FLUSH_PAYLOAD_SIZE).toString('hex') }
: {}),
})}${DELIMITER}`
: d;
stream.write(line);
waitForCallbacks.push(1);
const writeOk = stream.write(line, () => {
waitForCallbacks.pop();
// Calling .flush() on a compression stream will
// make zlib return as much output as currently possible.
if (isCompressedSream(stream)) {
stream.flush();
}
if (tryToEnd && waitForCallbacks.length === 0) {
end();
}
});
logDebugMessage(`Ok to write to the stream again? ${writeOk}`);
if (!writeOk) {
logDebugMessage(`Should we add the "drain" listener?: ${!waitForDrain}`);
if (!waitForDrain) {
waitForDrain = true;
stream.once('drain', () => {
logDebugMessage(
'The "drain" listener triggered, we can continue pushing to the stream.'
);
waitForDrain = false;
if (backPressureBuffer.length > 0) {
const el = backPressureBuffer.shift();
if (el !== undefined) {
push(el, true);
}
}
});
}
} else if (writeOk && drain && backPressureBuffer.length > 0) {
logDebugMessage('Continue clearing the backpressure buffer.');
const el = backPressureBuffer.shift();
if (el !== undefined) {
push(el, true);
}
}
} catch (e) {
logger.error(`Could not serialize or stream data chunk: ${e.toString()}`);
return;
}
// Calling .flush() on a compression stream will
// make zlib return as much output as currently possible.
if (isCompressed) {
stream.flush();
}
}
const responseWithHeaders: StreamFactoryReturnType['responseWithHeaders'] = {

View file

@ -21,6 +21,9 @@ export const aiopsExplainLogRateSpikesSchema = schema.object({
deviationMax: schema.number(),
/** The index to query for log rate spikes */
index: schema.string(),
/** Settings to override headers derived compression and flush fix */
compressResponse: schema.maybe(schema.boolean()),
flushFix: schema.maybe(schema.boolean()),
});
export type AiopsExplainLogRateSpikesSchema = TypeOf<typeof aiopsExplainLogRateSpikesSchema>;

View file

@ -95,6 +95,7 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
timeFieldName: dataView.timeFieldName ?? '',
index: dataView.title,
grouping: true,
flushFix: true,
...windowParameters,
},
{ reducer: streamReducer, initialState }

View file

@ -51,6 +51,9 @@ import {
markDuplicates,
} from './queries/get_simple_hierarchical_tree';
// 10s ping frequency to keep the stream alive.
const PING_FREQUENCY = 10000;
// Overall progress is a float from 0 to 1.
const LOADED_FIELD_CANDIDATES = 0.2;
const PROGRESS_STEP_P_VALUES = 0.5;
@ -77,12 +80,12 @@ export const defineExplainLogRateSpikesRoute = (
let logMessageCounter = 1;
function logInfoMessage(msg: string) {
logger.info(`Explain Log Rate Spikes #${logMessageCounter}: ${msg}`);
function logDebugMessage(msg: string) {
logger.debug(`Explain Log Rate Spikes #${logMessageCounter}: ${msg}`);
logMessageCounter++;
}
logInfoMessage('Starting analysis.');
logDebugMessage('Starting analysis.');
const groupingEnabled = !!request.body.grouping;
@ -90,15 +93,16 @@ export const defineExplainLogRateSpikesRoute = (
const controller = new AbortController();
let isRunning = false;
let loaded = 0;
let shouldStop = false;
request.events.aborted$.subscribe(() => {
logInfoMessage('aborted$ subscription trigger.');
logDebugMessage('aborted$ subscription trigger.');
shouldStop = true;
controller.abort();
});
request.events.completed$.subscribe(() => {
logInfoMessage('completed$ subscription trigger.');
logDebugMessage('completed$ subscription trigger.');
shouldStop = true;
controller.abort();
});
@ -107,17 +111,26 @@ export const defineExplainLogRateSpikesRoute = (
end: streamEnd,
push,
responseWithHeaders,
} = streamFactory<AiopsExplainLogRateSpikesApiAction>(request.headers, logger, true);
} = streamFactory<AiopsExplainLogRateSpikesApiAction>(
request.headers,
logger,
request.body.compressResponse,
request.body.flushFix
);
function pushPing() {
push(pingAction());
function pushPingWithTimeout() {
setTimeout(() => {
if (isRunning) {
logDebugMessage('Ping message.');
push(pingAction());
pushPingWithTimeout();
}
}, PING_FREQUENCY);
}
const pingInterval = setInterval(pushPing, 1000);
function end() {
logInfoMessage('Ending analysis.');
clearInterval(pingInterval);
isRunning = false;
logDebugMessage('Ending analysis.');
streamEnd();
}
@ -139,15 +152,16 @@ export const defineExplainLogRateSpikesRoute = (
}
function pushError(m: string) {
logInfoMessage('Push error.');
logDebugMessage('Push error.');
push(addErrorAction(m));
}
// Async IIFE to run the analysis while not blocking returning `responseWithHeaders`.
(async () => {
logInfoMessage('Reset.');
async function runAnalysis() {
isRunning = true;
logDebugMessage('Reset.');
push(resetAction());
logInfoMessage('Load field candidates.');
pushPingWithTimeout();
logDebugMessage('Load field candidates.');
push(
updateLoadingStateAction({
ccsWarning: false,
@ -204,11 +218,11 @@ export const defineExplainLogRateSpikesRoute = (
const fieldCandidatesChunks = chunk(fieldCandidates, chunkSize);
logInfoMessage('Fetch p-values.');
logDebugMessage('Fetch p-values.');
for (const fieldCandidatesChunk of fieldCandidatesChunks) {
chunkCount++;
logInfoMessage(`Fetch p-values. Chunk ${chunkCount} of ${fieldCandidatesChunks.length}`);
logDebugMessage(`Fetch p-values. Chunk ${chunkCount} of ${fieldCandidatesChunks.length}`);
let pValues: Awaited<ReturnType<typeof fetchChangePointPValues>>;
try {
pValues = await fetchChangePointPValues(
@ -258,7 +272,7 @@ export const defineExplainLogRateSpikesRoute = (
);
if (shouldStop) {
logInfoMessage('shouldStop fetching p-values.');
logDebugMessage('shouldStop fetching p-values.');
end();
return;
@ -266,7 +280,7 @@ export const defineExplainLogRateSpikesRoute = (
}
if (changePoints?.length === 0) {
logInfoMessage('Stopping analysis, did not find change points.');
logDebugMessage('Stopping analysis, did not find change points.');
endWithUpdatedLoadingState();
return;
}
@ -275,7 +289,7 @@ export const defineExplainLogRateSpikesRoute = (
{ fieldName: request.body.timeFieldName, type: KBN_FIELD_TYPES.DATE },
];
logInfoMessage('Fetch overall histogram.');
logDebugMessage('Fetch overall histogram.');
let overallTimeSeries: NumericChartData | undefined;
try {
@ -313,7 +327,7 @@ export const defineExplainLogRateSpikesRoute = (
}
if (groupingEnabled) {
logInfoMessage('Group results.');
logDebugMessage('Group results.');
push(
updateLoadingStateAction({
@ -498,7 +512,7 @@ export const defineExplainLogRateSpikesRoute = (
pushHistogramDataLoadingState();
logInfoMessage('Fetch group histograms.');
logDebugMessage('Fetch group histograms.');
await asyncForEach(changePointGroups, async (cpg) => {
if (overallTimeSeries !== undefined) {
@ -577,7 +591,7 @@ export const defineExplainLogRateSpikesRoute = (
loaded += PROGRESS_STEP_HISTOGRAMS_GROUPS;
logInfoMessage('Fetch field/value histograms.');
logDebugMessage('Fetch field/value histograms.');
// time series filtered by fields
if (changePoints && overallTimeSeries !== undefined) {
@ -661,7 +675,10 @@ export const defineExplainLogRateSpikesRoute = (
}
endWithUpdatedLoadingState();
})();
}
// Do not call this using `await` so it will run asynchronously while we return the stream already.
runAnalysis();
return response.ok(responseWithHeaders);
}

View file

@ -75,13 +75,21 @@ export default ({ getService }: FtrProviderContext) => {
await esArchiver.unload('x-pack/test/functional/es_archives/ml/ecommerce');
});
it('should return full data without streaming', async () => {
async function requestWithoutStreaming(body: ApiExplainLogRateSpikes['body']) {
const resp = await supertest
.post(`/internal/aiops/explain_log_rate_spikes`)
.set('kbn-xsrf', 'kibana')
.send(requestBody)
.send(body)
.expect(200);
// compression is on by default so if the request body is undefined
// the response header should include "gzip" and otherwise be "undefined"
if (body.compressResponse === undefined) {
expect(resp.header['content-encoding']).to.be('gzip');
} else if (body.compressResponse === false) {
expect(resp.header['content-encoding']).to.be(undefined);
}
expect(Buffer.isBuffer(resp.body)).to.be(true);
const chunks: string[] = resp.body.toString().split('\n');
@ -131,34 +139,64 @@ export default ({ getService }: FtrProviderContext) => {
histograms.forEach((h, index) => {
expect(h.histogram.length).to.be(20);
});
}
it('should return full data without streaming with compression with flushFix', async () => {
await requestWithoutStreaming(requestBody);
});
it('should return data in chunks with streaming', async () => {
const response = await fetch(`${kibanaServerUrl}/internal/aiops/explain_log_rate_spikes`, {
it('should return full data without streaming with compression without flushFix', async () => {
await requestWithoutStreaming({ ...requestBody, flushFix: false });
});
it('should return full data without streaming without compression with flushFix', async () => {
await requestWithoutStreaming({ ...requestBody, compressResponse: false });
});
it('should return full data without streaming without compression without flushFix', async () => {
await requestWithoutStreaming({ ...requestBody, compressResponse: false, flushFix: false });
});
async function requestWithStreaming(body: ApiExplainLogRateSpikes['body']) {
const resp = await fetch(`${kibanaServerUrl}/internal/aiops/explain_log_rate_spikes`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'kbn-xsrf': 'stream',
},
body: JSON.stringify(requestBody),
body: JSON.stringify(body),
});
expect(response.ok).to.be(true);
expect(response.status).to.be(200);
// compression is on by default so if the request body is undefined
// the response header should include "gzip" and otherwise be "null"
if (body.compressResponse === undefined) {
expect(resp.headers.get('content-encoding')).to.be('gzip');
} else if (body.compressResponse === false) {
expect(resp.headers.get('content-encoding')).to.be(null);
}
const stream = response.body;
expect(resp.ok).to.be(true);
expect(resp.status).to.be(200);
const stream = resp.body;
expect(stream).not.to.be(null);
if (stream !== null) {
const data: any[] = [];
let chunkCounter = 0;
const parseStreamCallback = (c: number) => (chunkCounter = c);
for await (const action of parseStream(stream)) {
for await (const action of parseStream(stream, parseStreamCallback)) {
expect(action.type).not.to.be('error');
data.push(action);
}
// If streaming works correctly we should receive more than one chunk.
expect(chunkCounter).to.be.greaterThan(1);
expect(data.length).to.be(expected.actionsLength);
const addChangePointsActions = data.filter((d) => d.type === expected.changePointFilter);
expect(addChangePointsActions.length).to.greaterThan(0);
@ -189,6 +227,22 @@ export default ({ getService }: FtrProviderContext) => {
expect(h.histogram.length).to.be(20);
});
}
}
it('should return data in chunks with streaming with compression with flushFix', async () => {
await requestWithStreaming(requestBody);
});
it('should return data in chunks with streaming with compression without flushFix', async () => {
await requestWithStreaming({ ...requestBody, flushFix: false });
});
it('should return data in chunks with streaming without compression with flushFix', async () => {
await requestWithStreaming({ ...requestBody, compressResponse: false });
});
it('should return data in chunks with streaming without compression without flushFix', async () => {
await requestWithStreaming({ ...requestBody, compressResponse: false, flushFix: false });
});
it('should return an error for non existing index without streaming', async () => {

View file

@ -5,11 +5,16 @@
* 2.0.
*/
export async function* parseStream(stream: NodeJS.ReadableStream) {
export async function* parseStream(
stream: NodeJS.ReadableStream,
callback?: (chunkCounter: number) => void
) {
let partial = '';
let chunkCounter = 0;
try {
for await (const value of stream) {
chunkCounter++;
const full = `${partial}${value}`;
const parts = full.split('\n');
const last = parts.pop();
@ -25,4 +30,8 @@ export async function* parseStream(stream: NodeJS.ReadableStream) {
} catch (error) {
yield { type: 'error', payload: error.toString() };
}
if (typeof callback === 'function') {
callback(chunkCounter);
}
}