mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
[esArchiver] fix esArchiver.loadIfNeeded (#18621)
* [utils/jsonParseStream] ensure only callback once
* [utils/streams] add util for piping many streams in series
* [esArchiver/load] combine record streams to fix skipExisting functionality
(cherry picked from commit 21d6dd2cba
)
This commit is contained in:
parent
4301728453
commit
78b6cd419d
6 changed files with 144 additions and 14 deletions
|
@ -2,7 +2,8 @@ import { resolve } from 'path';
|
|||
import { createReadStream } from 'fs';
|
||||
|
||||
import {
|
||||
createPromiseFromStreams
|
||||
createPromiseFromStreams,
|
||||
concatStreamProviders,
|
||||
} from '../../utils';
|
||||
|
||||
import {
|
||||
|
@ -15,21 +16,41 @@ import {
|
|||
createIndexDocRecordsStream,
|
||||
} from '../lib';
|
||||
|
||||
// pipe a series of streams into each other so that data and errors
|
||||
// flow from the first stream to the last. Errors from the last stream
|
||||
// are not listened for
|
||||
const pipeline = (...streams) => streams
|
||||
.reduce((source, dest) => (
|
||||
source
|
||||
.once('error', (error) => dest.emit('error', error))
|
||||
.pipe(dest)
|
||||
));
|
||||
|
||||
export async function loadAction({ name, skipExisting, client, dataDir, log }) {
|
||||
const inputDir = resolve(dataDir, name);
|
||||
const stats = createStats(name, log);
|
||||
|
||||
const files = prioritizeMappings(await readDirectory(inputDir));
|
||||
for (const filename of files) {
|
||||
log.info('[%s] Loading %j', name, filename);
|
||||
|
||||
await createPromiseFromStreams([
|
||||
createReadStream(resolve(inputDir, filename)),
|
||||
...createParseArchiveStreams({ gzip: isGzip(filename) }),
|
||||
createCreateIndexStream({ client, stats, skipExisting }),
|
||||
createIndexDocRecordsStream(client, stats),
|
||||
]);
|
||||
}
|
||||
// a single stream that emits records from all archive files, in
|
||||
// order, so that createIndexStream can track the state of indexes
|
||||
// across archives and properly skip docs from existing indexes
|
||||
const recordStream = concatStreamProviders(
|
||||
files.map(filename => () => {
|
||||
log.info('[%s] Loading %j', name, filename);
|
||||
|
||||
return pipeline(
|
||||
createReadStream(resolve(inputDir, filename)),
|
||||
...createParseArchiveStreams({ gzip: isGzip(filename) })
|
||||
);
|
||||
}),
|
||||
{ objectMode: true }
|
||||
);
|
||||
|
||||
await createPromiseFromStreams([
|
||||
recordStream,
|
||||
createCreateIndexStream({ client, stats, skipExisting }),
|
||||
createIndexDocRecordsStream(client, stats),
|
||||
]);
|
||||
|
||||
const indicesToRefresh = [];
|
||||
stats.forEachIndex((index, { docs }) => {
|
||||
|
|
|
@ -15,6 +15,7 @@ export {
|
|||
} from './kbn_field_types';
|
||||
|
||||
export {
|
||||
concatStreamProviders,
|
||||
createConcatStream,
|
||||
createIntersperseStream,
|
||||
createJsonParseStream,
|
||||
|
|
60
src/utils/streams/__tests__/concat_stream_providers.js
Normal file
60
src/utils/streams/__tests__/concat_stream_providers.js
Normal file
|
@ -0,0 +1,60 @@
|
|||
import { Readable } from 'stream';
|
||||
|
||||
import sinon from 'sinon';
|
||||
import expect from 'expect.js';
|
||||
|
||||
import { concatStreamProviders } from '../concat_stream_providers';
|
||||
import { createListStream } from '../list_stream';
|
||||
import { createConcatStream } from '../concat_stream';
|
||||
import { createPromiseFromStreams } from '../promise_from_streams';
|
||||
|
||||
describe('concatStreamProviders() helper', () => {
|
||||
it('writes the data from an array of stream providers into a destination stream in order', async () => {
|
||||
const results = await createPromiseFromStreams([
|
||||
concatStreamProviders([
|
||||
() => createListStream([
|
||||
'foo',
|
||||
'bar'
|
||||
]),
|
||||
() => createListStream([
|
||||
'baz',
|
||||
]),
|
||||
() => createListStream([
|
||||
'bug',
|
||||
]),
|
||||
]),
|
||||
createConcatStream('')
|
||||
]);
|
||||
|
||||
expect(results).to.be('foobarbazbug');
|
||||
});
|
||||
|
||||
it('emits the errors from a sub-stream to the destination', async () => {
|
||||
const dest = concatStreamProviders([
|
||||
() => createListStream([
|
||||
'foo',
|
||||
'bar'
|
||||
]),
|
||||
() => new Readable({
|
||||
read() {
|
||||
this.emit('error', new Error('foo'));
|
||||
}
|
||||
}),
|
||||
]);
|
||||
|
||||
const errorListener = sinon.stub();
|
||||
dest.on('error', errorListener);
|
||||
|
||||
try {
|
||||
await createPromiseFromStreams([dest]);
|
||||
throw new Error('Expected createPromiseFromStreams() to reject with error');
|
||||
} catch (error) {
|
||||
expect(error).to.have.property('message', 'foo');
|
||||
}
|
||||
|
||||
sinon.assert.calledOnce(errorListener);
|
||||
sinon.assert.calledWithExactly(errorListener, sinon.match({
|
||||
message: 'foo'
|
||||
}));
|
||||
});
|
||||
});
|
42
src/utils/streams/concat_stream_providers.js
Normal file
42
src/utils/streams/concat_stream_providers.js
Normal file
|
@ -0,0 +1,42 @@
|
|||
import { PassThrough } from 'stream';
|
||||
|
||||
/**
|
||||
* Write the data and errors from a list of stream providers
|
||||
* to a single stream in order. Stream providers are only
|
||||
* called right before they will be consumed, and only one
|
||||
* provider will be active at a time.
|
||||
*
|
||||
* @param {Array<() => ReadableStream>} sourceProviders
|
||||
* @param {PassThroughOptions} options options passed to the PassThrough constructor
|
||||
* @return {WritableStream} combined stream
|
||||
*/
|
||||
export function concatStreamProviders(sourceProviders, options = {}) {
|
||||
const destination = new PassThrough(options);
|
||||
const queue = sourceProviders.slice();
|
||||
|
||||
(function pipeNext() {
|
||||
const provider = queue.shift();
|
||||
|
||||
if (!provider) {
|
||||
return;
|
||||
}
|
||||
|
||||
const source = provider();
|
||||
const isLast = !queue.length;
|
||||
|
||||
// if there are more sources to pipe, hook
|
||||
// into the source completion
|
||||
if (!isLast) {
|
||||
source.once('end', pipeNext);
|
||||
}
|
||||
|
||||
source
|
||||
// proxy errors from the source to the destination
|
||||
.once('error', (error) => destination.emit('error', error))
|
||||
// pipe the source to the destination but only proxy the
|
||||
// end event if this is the last source
|
||||
.pipe(destination, { end: isLast });
|
||||
}());
|
||||
|
||||
return destination;
|
||||
}
|
|
@ -1,3 +1,4 @@
|
|||
export { concatStreamProviders } from './concat_stream_providers';
|
||||
export { createIntersperseStream } from './intersperse_stream';
|
||||
export { createSplitStream } from './split_stream';
|
||||
export { createListStream } from './list_stream';
|
||||
|
|
|
@ -16,11 +16,16 @@ export function createJsonParseStream() {
|
|||
writableObjectMode: true,
|
||||
readableObjectMode: true,
|
||||
transform(json, enc, callback) {
|
||||
let parsed;
|
||||
let error;
|
||||
|
||||
try {
|
||||
callback(null, JSON.parse(json));
|
||||
} catch (err) {
|
||||
callback(err);
|
||||
parsed = JSON.parse(json);
|
||||
} catch (_error) {
|
||||
error = _error;
|
||||
}
|
||||
|
||||
callback(error, parsed);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue