mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
* use Buffer.alloc + .set API instead of .concat * refactor variable names and actually assign to this.buffer * ok, looks like an array of buffers could work * added large comment and refactored some variable name * fix comment * refactored logic to deal with an edge case where partial buffers should be added, also throw if bad config is detected * added new test for detecting when the write stream throws for bad config * updated logic to not ever call .slice(0), updated the guard for the config error, updated a comment * refactor totalBytesConsumed -> bytesToFlush * use the while loop mike wrote * remove unused variable * update comment Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
d39dff3b47
commit
29b5fc175b
1 changed files with 45 additions and 9 deletions
|
@ -66,7 +66,9 @@ export class ContentStream extends Duplex {
|
|||
return Math.floor(max / 2);
|
||||
}
|
||||
|
||||
private buffer = Buffer.from('');
|
||||
private buffers: Buffer[] = [];
|
||||
private bytesBuffered = 0;
|
||||
|
||||
private bytesRead = 0;
|
||||
private chunksRead = 0;
|
||||
private chunksWritten = 0;
|
||||
|
@ -249,8 +251,43 @@ export class ContentStream extends Duplex {
|
|||
});
|
||||
}
|
||||
|
||||
private async flush(size = this.buffer.byteLength) {
|
||||
const chunk = this.buffer.slice(0, size);
|
||||
private async flush(size = this.bytesBuffered) {
|
||||
const buffersToFlush: Buffer[] = [];
|
||||
let bytesToFlush = 0;
|
||||
|
||||
/*
|
||||
Loop over each buffer, keeping track of how many bytes we have added
|
||||
to the array of buffers to be flushed. The array of buffers to be flushed
|
||||
contains buffers by reference, not copies. This avoids putting pressure on
|
||||
the CPU for copying buffers or for gc activity. Please profile performance
|
||||
with a large byte configuration and a large number of records (900k+)
|
||||
before changing this code. Config used at time of writing:
|
||||
|
||||
xpack.reporting:
|
||||
csv.maxSizeBytes: 500000000
|
||||
csv.scroll.size: 1000
|
||||
|
||||
At the moment this can put memory pressure on Kibana. Up to 1,1 GB in a dev
|
||||
build. It is not recommended to have overly large max size bytes but we
|
||||
need this code to be as performant as possible.
|
||||
*/
|
||||
while (this.buffers.length) {
|
||||
const remainder = size - bytesToFlush;
|
||||
if (remainder <= 0) {
|
||||
break;
|
||||
}
|
||||
const buffer = this.buffers.shift()!;
|
||||
const chunkedBuffer = buffer.slice(0, remainder);
|
||||
buffersToFlush.push(chunkedBuffer);
|
||||
bytesToFlush += chunkedBuffer.byteLength;
|
||||
|
||||
if (buffer.byteLength > remainder) {
|
||||
this.buffers.unshift(buffer.slice(remainder));
|
||||
}
|
||||
}
|
||||
|
||||
// We call Buffer.concat with the fewest number of buffers possible
|
||||
const chunk = Buffer.concat(buffersToFlush);
|
||||
const content = this.encode(chunk);
|
||||
|
||||
if (!this.chunksWritten) {
|
||||
|
@ -265,22 +302,21 @@ export class ContentStream extends Duplex {
|
|||
}
|
||||
|
||||
this.bytesWritten += chunk.byteLength;
|
||||
this.buffer = this.buffer.slice(size);
|
||||
this.bytesBuffered -= bytesToFlush;
|
||||
}
|
||||
|
||||
private async flushAllFullChunks() {
|
||||
const maxChunkSize = await this.getMaxChunkSize();
|
||||
|
||||
while (this.buffer.byteLength >= maxChunkSize) {
|
||||
while (this.bytesBuffered >= maxChunkSize && this.buffers.length) {
|
||||
await this.flush(maxChunkSize);
|
||||
}
|
||||
}
|
||||
|
||||
_write(chunk: Buffer | string, encoding: BufferEncoding, callback: Callback) {
|
||||
this.buffer = Buffer.concat([
|
||||
this.buffer,
|
||||
Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding),
|
||||
]);
|
||||
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
|
||||
this.bytesBuffered += buffer.byteLength;
|
||||
this.buffers.push(buffer);
|
||||
|
||||
this.flushAllFullChunks()
|
||||
.then(() => callback())
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue