[esArchiver] fix esArchiver.loadIfNeeded (#18621) (#18923)

* [utils/jsonParseStream] ensure only callback once

* [utils/streams] add util for piping many streams in series

* [esArchiver/load] combine record streams to fix skipExisting functionality
This commit is contained in:
Spencer 2018-05-08 10:03:41 -07:00 committed by GitHub
parent cd46061588
commit 9beb886853
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 144 additions and 14 deletions

View file

@ -2,7 +2,8 @@ import { resolve } from 'path';
import { createReadStream } from 'fs';
import {
createPromiseFromStreams
createPromiseFromStreams,
concatStreamProviders,
} from '../../utils';
import {
@ -15,21 +16,41 @@ import {
createIndexDocRecordsStream,
} from '../lib';
// pipe a series of streams into each other so that data and errors
// flow from the first stream to the last. Errors from the last stream
// are not listened for
const pipeline = (...streams) => streams
.reduce((source, dest) => (
source
.once('error', (error) => dest.emit('error', error))
.pipe(dest)
));
export async function loadAction({ name, skipExisting, client, dataDir, log }) {
const inputDir = resolve(dataDir, name);
const stats = createStats(name, log);
const files = prioritizeMappings(await readDirectory(inputDir));
for (const filename of files) {
log.info('[%s] Loading %j', name, filename);
await createPromiseFromStreams([
createReadStream(resolve(inputDir, filename)),
...createParseArchiveStreams({ gzip: isGzip(filename) }),
createCreateIndexStream({ client, stats, skipExisting }),
createIndexDocRecordsStream(client, stats),
]);
}
// a single stream that emits records from all archive files, in
// order, so that createIndexStream can track the state of indexes
// across archives and properly skip docs from existing indexes
const recordStream = concatStreamProviders(
files.map(filename => () => {
log.info('[%s] Loading %j', name, filename);
return pipeline(
createReadStream(resolve(inputDir, filename)),
...createParseArchiveStreams({ gzip: isGzip(filename) })
);
}),
{ objectMode: true }
);
await createPromiseFromStreams([
recordStream,
createCreateIndexStream({ client, stats, skipExisting }),
createIndexDocRecordsStream(client, stats),
]);
const indicesToRefresh = [];
stats.forEachIndex((index, { docs }) => {

View file

@ -15,6 +15,7 @@ export {
} from './kbn_field_types';
export {
concatStreamProviders,
createConcatStream,
createIntersperseStream,
createJsonParseStream,

View file

@ -0,0 +1,60 @@
import { Readable } from 'stream';
import sinon from 'sinon';
import expect from 'expect.js';
import { concatStreamProviders } from '../concat_stream_providers';
import { createListStream } from '../list_stream';
import { createConcatStream } from '../concat_stream';
import { createPromiseFromStreams } from '../promise_from_streams';
describe('concatStreamProviders() helper', () => {
it('writes the data from an array of stream providers into a destination stream in order', async () => {
const results = await createPromiseFromStreams([
concatStreamProviders([
() => createListStream([
'foo',
'bar'
]),
() => createListStream([
'baz',
]),
() => createListStream([
'bug',
]),
]),
createConcatStream('')
]);
expect(results).to.be('foobarbazbug');
});
it('emits the errors from a sub-stream to the destination', async () => {
const dest = concatStreamProviders([
() => createListStream([
'foo',
'bar'
]),
() => new Readable({
read() {
this.emit('error', new Error('foo'));
}
}),
]);
const errorListener = sinon.stub();
dest.on('error', errorListener);
try {
await createPromiseFromStreams([dest]);
throw new Error('Expected createPromiseFromStreams() to reject with error');
} catch (error) {
expect(error).to.have.property('message', 'foo');
}
sinon.assert.calledOnce(errorListener);
sinon.assert.calledWithExactly(errorListener, sinon.match({
message: 'foo'
}));
});
});

View file

@ -0,0 +1,42 @@
import { PassThrough } from 'stream';
/**
* Write the data and errors from a list of stream providers
* to a single stream in order. Stream providers are only
* called right before they will be consumed, and only one
* provider will be active at a time.
*
* @param {Array<() => ReadableStream>} sourceProviders
* @param {PassThroughOptions} options options passed to the PassThrough constructor
* @return {WritableStream} combined stream
*/
export function concatStreamProviders(sourceProviders, options = {}) {
const destination = new PassThrough(options);
const queue = sourceProviders.slice();
(function pipeNext() {
const provider = queue.shift();
if (!provider) {
return;
}
const source = provider();
const isLast = !queue.length;
// if there are more sources to pipe, hook
// into the source completion
if (!isLast) {
source.once('end', pipeNext);
}
source
// proxy errors from the source to the destination
.once('error', (error) => destination.emit('error', error))
// pipe the source to the destination but only proxy the
// end event if this is the last source
.pipe(destination, { end: isLast });
}());
return destination;
}

View file

@ -1,3 +1,4 @@
export { concatStreamProviders } from './concat_stream_providers';
export { createIntersperseStream } from './intersperse_stream';
export { createSplitStream } from './split_stream';
export { createListStream } from './list_stream';

View file

@ -16,11 +16,16 @@ export function createJsonParseStream() {
writableObjectMode: true,
readableObjectMode: true,
transform(json, enc, callback) {
let parsed;
let error;
try {
callback(null, JSON.parse(json));
} catch (err) {
callback(err);
parsed = JSON.parse(json);
} catch (_error) {
error = _error;
}
callback(error, parsed);
}
});
}