mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
* Convert createPromiseFromStreams util to use stream pipeline * Add back 50ms timeout to duplex stream test * Add friendly error message when a single non readable stream is given * Throw error instead of returning a rejected promise
This commit is contained in:
parent
796ce2d26d
commit
c9cc05855b
2 changed files with 69 additions and 54 deletions
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
import { Writable, Duplex } from 'stream';
|
||||
import { Readable, Writable, Duplex, Transform } from 'stream';
|
||||
|
||||
import expect from 'expect.js';
|
||||
|
||||
|
@ -83,20 +83,25 @@ describe('promiseFromStreams', () => {
|
|||
describe('last stream is duplex', () => {
|
||||
it('waits for writing and resolves to final value', async () => {
|
||||
let written = '';
|
||||
|
||||
const duplexReadQueue = [];
|
||||
const duplexItemsToPush = ['foo', 'bar', null];
|
||||
const result = await createPromiseFromStreams([
|
||||
createListStream(['a', 'b', 'c']),
|
||||
new Duplex({
|
||||
read() {
|
||||
this.push('foo');
|
||||
this.push('bar');
|
||||
this.push(null);
|
||||
async read() {
|
||||
const result = await duplexReadQueue.shift();
|
||||
this.push(result);
|
||||
},
|
||||
|
||||
write(chunk, enc, cb) {
|
||||
setTimeout(() => {
|
||||
written += chunk;
|
||||
cb();
|
||||
}, 50);
|
||||
duplexReadQueue.push(new Promise((resolve) => {
|
||||
setTimeout(() => {
|
||||
written += chunk;
|
||||
cb();
|
||||
resolve(duplexItemsToPush.shift());
|
||||
}, 50);
|
||||
}));
|
||||
}
|
||||
}).setEncoding('utf8')
|
||||
]);
|
||||
|
@ -105,4 +110,36 @@ describe('promiseFromStreams', () => {
|
|||
expect(result).to.be('bar');
|
||||
});
|
||||
});
|
||||
|
||||
describe('error handling', () => {
|
||||
it('read stream gets destroyed when transform stream fails', async () => {
|
||||
let destroyCalled = false;
|
||||
const readStream = new Readable({
|
||||
read() {
|
||||
this.push('a');
|
||||
this.push('b');
|
||||
this.push('c');
|
||||
this.push(null);
|
||||
},
|
||||
destroy() {
|
||||
destroyCalled = true;
|
||||
}
|
||||
});
|
||||
const transformStream = new Transform({
|
||||
transform(chunk, enc, done) {
|
||||
done(new Error('Test error'));
|
||||
}
|
||||
});
|
||||
try {
|
||||
await createPromiseFromStreams([
|
||||
readStream,
|
||||
transformStream,
|
||||
]);
|
||||
throw new Error('Should fail');
|
||||
} catch (e) {
|
||||
expect(e.message).to.be('Test error');
|
||||
expect(destroyCalled).to.be(true);
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -33,54 +33,32 @@
|
|||
* @param {Array<Stream>} streams
|
||||
* @return {Promise<any>}
|
||||
*/
|
||||
|
||||
import { pipeline, Writable } from 'stream';
|
||||
|
||||
export async function createPromiseFromStreams(streams) {
|
||||
let finalChunk;
|
||||
const last = streams[streams.length - 1];
|
||||
|
||||
// reject if any of the streams emits an error
|
||||
const anyStreamFailure = new Promise((resolve, reject) => {
|
||||
streams.forEach((stream, i) => {
|
||||
if (i > 0) streams[i - 1].pipe(stream);
|
||||
stream.on('error', reject);
|
||||
return stream;
|
||||
});
|
||||
});
|
||||
|
||||
// resolve when the last stream has finished writing, or
|
||||
// immediately if the last stream is not writable
|
||||
const lastFinishedWriting = new Promise(resolve => {
|
||||
if (typeof last.write !== 'function') {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
last.on('finish', resolve);
|
||||
});
|
||||
|
||||
// resolve with the final value provided by the last stream
|
||||
// after the last stream has provided it, or immediately if the
|
||||
// stream is not readable
|
||||
const lastFinishedReading = new Promise(resolve => {
|
||||
if (typeof last.read !== 'function') {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
let finalChunk;
|
||||
last.on('data', (chunk) => {
|
||||
finalChunk = chunk;
|
||||
});
|
||||
last.on('end', () => {
|
||||
if (typeof last.read !== 'function' && streams.length === 1) {
|
||||
// For a nicer error than what stream.pipeline throws
|
||||
throw new Error('A minimum of 2 streams is required when a non-readable stream is given');
|
||||
}
|
||||
if (typeof last.read === 'function') {
|
||||
// We are pushing a writable stream to capture the last chunk
|
||||
streams.push(new Writable({
|
||||
// Use object mode even when "last" stream isn't. This allows to
|
||||
// capture the last chunk as-is.
|
||||
objectMode: true,
|
||||
write(chunk, enc, done) {
|
||||
finalChunk = chunk;
|
||||
done();
|
||||
}
|
||||
}));
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
pipeline(...streams, (err) => {
|
||||
if (err) return reject(err);
|
||||
resolve(finalChunk);
|
||||
});
|
||||
});
|
||||
|
||||
// wait (and rethrow) the first error, or for the last stream
|
||||
// to both finish writing and providing values to read
|
||||
await Promise.race([
|
||||
anyStreamFailure,
|
||||
Promise.all([lastFinishedWriting, lastFinishedReading])
|
||||
]);
|
||||
|
||||
// return the final chunk read from the last stream
|
||||
return await lastFinishedReading;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue