kibana/x-pack/test/api_integration/apis/aiops/parse_stream.ts
Walter Rafelsberger b38bbbcea3
[ML] Explain Log Rate Spikes: Fix uncompressed streams and backpressure handling. (#142970)
- Adds a flag for `compressResponse` and `flushFix` to the request body to be able to overrule compression settings inferred from headers.
- Updates the developer examples with a toggle to run requests with compression enabled or disabled.
- Adds support for backpressure handling for response streams.
- The backpressure update includes a fix where uncompressed streams would never start streaming to the client.
- The analysis endpoint for Explain Log Rate Spikes now includes a ping every 10 seconds to keep the stream alive.
- Integration tests were updated to test both uncompressed and compressed streaming.
2022-10-14 15:30:34 +02:00

37 lines
912 B
TypeScript

/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export async function* parseStream(
stream: NodeJS.ReadableStream,
callback?: (chunkCounter: number) => void
) {
let partial = '';
let chunkCounter = 0;
try {
for await (const value of stream) {
chunkCounter++;
const full = `${partial}${value}`;
const parts = full.split('\n');
const last = parts.pop();
partial = last ?? '';
const actions = parts.map((p) => JSON.parse(p));
for (const action of actions) {
yield action;
}
}
} catch (error) {
yield { type: 'error', payload: error.toString() };
}
if (typeof callback === 'function') {
callback(chunkCounter);
}
}