Move streams to kbn/utils (#84033)

* move streams to kbn/std

* import streams from kbn/std

* fix styles

* remove unused shareWeakReplay

* move from kbn/std to kbn/utils

* import from subfolder since test mocks FS module and not compatible with kbn/utils

* remove new line at the end of json file
This commit is contained in:
Mikhail Shustov 2020-11-24 17:19:18 +03:00 committed by GitHub
parent e4ff981b94
commit b3d97764a0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
89 changed files with 50 additions and 1883 deletions

View file

@ -12,6 +12,7 @@
},
"dependencies": {
"@kbn/dev-utils": "link:../kbn-dev-utils",
"@kbn/test": "link:../kbn-test"
"@kbn/test": "link:../kbn-test",
"@kbn/utils": "link:../kbn-utils"
}
}

View file

@ -23,8 +23,7 @@ import { createGunzip, createGzip, Z_BEST_COMPRESSION } from 'zlib';
import { promisify } from 'util';
import globby from 'globby';
import { ToolingLog } from '@kbn/dev-utils';
import { createPromiseFromStreams } from '../lib/streams';
import { createPromiseFromStreams } from '@kbn/utils';
const unlinkAsync = promisify(Fs.unlink);

View file

@ -23,7 +23,7 @@ import { Readable } from 'stream';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { Client } from 'elasticsearch';
import { createPromiseFromStreams, concatStreamProviders } from '../lib/streams';
import { createPromiseFromStreams, concatStreamProviders } from '@kbn/utils';
import {
isGzip,

View file

@ -22,8 +22,7 @@ import { stat, Stats, rename, createReadStream, createWriteStream } from 'fs';
import { Readable, Writable } from 'stream';
import { fromNode } from 'bluebird';
import { ToolingLog } from '@kbn/dev-utils';
import { createPromiseFromStreams } from '../lib/streams';
import { createPromiseFromStreams } from '@kbn/utils';
import {
prioritizeMappings,
readDirectory,

View file

@ -22,8 +22,8 @@ import { createWriteStream, mkdirSync } from 'fs';
import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch';
import { ToolingLog } from '@kbn/dev-utils';
import { createListStream, createPromiseFromStreams } from '@kbn/utils';
import { createListStream, createPromiseFromStreams } from '../lib/streams';
import {
createStats,
createGenerateIndexRecordsStream,

View file

@ -22,8 +22,8 @@ import { createReadStream } from 'fs';
import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { createPromiseFromStreams } from '@kbn/utils';
import { createPromiseFromStreams } from '../lib/streams';
import {
isGzip,
createStats,

View file

@ -21,8 +21,7 @@ import Stream, { Readable, Writable } from 'stream';
import { createGunzip } from 'zlib';
import expect from '@kbn/expect';
import { createListStream, createPromiseFromStreams, createConcatStream } from '../../streams';
import { createListStream, createPromiseFromStreams, createConcatStream } from '@kbn/utils';
import { createFormatArchiveStreams } from '../format';

View file

@ -21,8 +21,7 @@ import Stream, { PassThrough, Readable, Writable, Transform } from 'stream';
import { createGzip } from 'zlib';
import expect from '@kbn/expect';
import { createConcatStream, createListStream, createPromiseFromStreams } from '../../streams';
import { createConcatStream, createListStream, createPromiseFromStreams } from '@kbn/utils';
import { createParseArchiveStreams } from '../parse';

View file

@ -21,7 +21,7 @@ import { createGzip, Z_BEST_COMPRESSION } from 'zlib';
import { PassThrough } from 'stream';
import stringify from 'json-stable-stringify';
import { createMapStream, createIntersperseStream } from '../streams';
import { createMapStream, createIntersperseStream } from '@kbn/utils';
import { RECORD_SEPARATOR } from './constants';
export function createFormatArchiveStreams({ gzip = false }: { gzip?: boolean } = {}) {

View file

@ -24,7 +24,7 @@ import {
createSplitStream,
createReplaceStream,
createMapStream,
} from '../streams';
} from '@kbn/utils';
import { RECORD_SEPARATOR } from './constants';

View file

@ -20,8 +20,7 @@
import sinon from 'sinon';
import expect from '@kbn/expect';
import { delay } from 'bluebird';
import { createListStream, createPromiseFromStreams, createConcatStream } from '../../streams';
import { createListStream, createPromiseFromStreams, createConcatStream } from '@kbn/utils';
import { createGenerateDocRecordsStream } from '../generate_doc_records_stream';
import { Progress } from '../../progress';

View file

@ -19,8 +19,7 @@
import expect from '@kbn/expect';
import { delay } from 'bluebird';
import { createListStream, createPromiseFromStreams } from '../../streams';
import { createListStream, createPromiseFromStreams } from '@kbn/utils';
import { Progress } from '../../progress';
import { createIndexDocRecordsStream } from '../index_doc_records_stream';

View file

@ -20,8 +20,7 @@
import expect from '@kbn/expect';
import sinon from 'sinon';
import Chance from 'chance';
import { createPromiseFromStreams, createConcatStream, createListStream } from '../../streams';
import { createPromiseFromStreams, createConcatStream, createListStream } from '@kbn/utils';
import { createCreateIndexStream } from '../create_index_stream';

View file

@ -19,7 +19,7 @@
import sinon from 'sinon';
import { createListStream, createPromiseFromStreams } from '../../streams';
import { createListStream, createPromiseFromStreams } from '@kbn/utils';
import { createDeleteIndexStream } from '../delete_index_stream';

View file

@ -19,8 +19,7 @@
import sinon from 'sinon';
import expect from '@kbn/expect';
import { createListStream, createPromiseFromStreams, createConcatStream } from '../../streams';
import { createListStream, createPromiseFromStreams, createConcatStream } from '@kbn/utils';
import { createStubClient, createStubStats } from './stubs';

View file

@ -20,7 +20,7 @@
import Chance from 'chance';
import expect from '@kbn/expect';
import { createListStream, createPromiseFromStreams, createConcatStream } from '../../streams';
import { createListStream, createPromiseFromStreams, createConcatStream } from '@kbn/utils';
import { createFilterRecordsStream } from '../filter_records_stream';

View file

@ -1,74 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { createListStream, createPromiseFromStreams, createConcatStream } from './';
describe('concatStream', () => {
test('accepts an initial value', async () => {
const output = await createPromiseFromStreams([
createListStream([1, 2, 3]),
createConcatStream([0]),
]);
expect(output).toEqual([0, 1, 2, 3]);
});
describe(`combines using the previous value's concat method`, () => {
test('works with strings', async () => {
const output = await createPromiseFromStreams([
createListStream(['a', 'b', 'c']),
createConcatStream(),
]);
expect(output).toEqual('abc');
});
test('works with arrays', async () => {
const output = await createPromiseFromStreams([
createListStream([[1], [2, 3, 4], [10]]),
createConcatStream(),
]);
expect(output).toEqual([1, 2, 3, 4, 10]);
});
test('works with a mixture, starting with array', async () => {
const output = await createPromiseFromStreams([
createListStream([[], 1, 2, 3, 4, [5, 6, 7]]),
createConcatStream(),
]);
expect(output).toEqual([1, 2, 3, 4, 5, 6, 7]);
});
test('fails when the value does not have a concat method', async () => {
let promise;
try {
promise = createPromiseFromStreams([createListStream([1, '1']), createConcatStream()]);
} catch (err) {
throw new Error('createPromiseFromStreams() should not fail synchronously');
}
try {
await promise;
throw new Error('Promise should have rejected');
} catch (err) {
expect(err).toBeInstanceOf(Error);
expect(err.message).toContain('concat');
}
});
});
});

View file

@ -1,41 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { createReduceStream } from './reduce_stream';
/**
* Creates a Transform stream that consumes all provided
* values and concatenates them using each values `concat`
* method.
*
* Concatenate strings:
* createListStream(['f', 'o', 'o'])
* .pipe(createConcatStream())
* .on('data', console.log)
* // logs "foo"
*
* Concatenate values into an array:
* createListStream([1,2,3])
* .pipe(createConcatStream([]))
* .on('data', console.log)
* // logs "[1,2,3]"
*/
export function createConcatStream(initial: any) {
return createReduceStream((acc, chunk) => acc.concat(chunk), initial);
}

View file

@ -1,66 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Readable } from 'stream';
import { concatStreamProviders } from './concat_stream_providers';
import { createListStream } from './list_stream';
import { createConcatStream } from './concat_stream';
import { createPromiseFromStreams } from './promise_from_streams';
describe('concatStreamProviders() helper', () => {
test('writes the data from an array of stream providers into a destination stream in order', async () => {
const results = await createPromiseFromStreams([
concatStreamProviders([
() => createListStream(['foo', 'bar']),
() => createListStream(['baz']),
() => createListStream(['bug']),
]),
createConcatStream(''),
]);
expect(results).toBe('foobarbazbug');
});
test('emits the errors from a sub-stream to the destination', async () => {
const dest = concatStreamProviders([
() => createListStream(['foo', 'bar']),
() =>
new Readable({
read() {
this.destroy(new Error('foo'));
},
}),
]);
const errorListener = jest.fn();
dest.on('error', errorListener);
await expect(createPromiseFromStreams([dest])).rejects.toThrowErrorMatchingInlineSnapshot(
`"foo"`
);
expect(errorListener.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
[Error: foo],
],
]
`);
});
});

View file

@ -1,60 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { PassThrough, TransformOptions } from 'stream';
/**
* Write the data and errors from a list of stream providers
* to a single stream in order. Stream providers are only
* called right before they will be consumed, and only one
* provider will be active at a time.
*/
export function concatStreamProviders(
sourceProviders: Array<() => NodeJS.ReadableStream>,
options: TransformOptions = {}
) {
const destination = new PassThrough(options);
const queue = sourceProviders.slice();
(function pipeNext() {
const provider = queue.shift();
if (!provider) {
return;
}
const source = provider();
const isLast = !queue.length;
// if there are more sources to pipe, hook
// into the source completion
if (!isLast) {
source.once('end', pipeNext);
}
source
// proxy errors from the source to the destination
.once('error', (error) => destination.destroy(error))
// pipe the source to the destination but only proxy the
// end event if this is the last source
.pipe(destination, { end: isLast });
})();
return destination;
}

View file

@ -1,77 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import {
createConcatStream,
createFilterStream,
createListStream,
createPromiseFromStreams,
} from './';
describe('createFilterStream()', () => {
test('calls the function with each item in the source stream', async () => {
const filter = jest.fn().mockReturnValue(true);
await createPromiseFromStreams([createListStream(['a', 'b', 'c']), createFilterStream(filter)]);
expect(filter).toMatchInlineSnapshot(`
[MockFunction] {
"calls": Array [
Array [
"a",
],
Array [
"b",
],
Array [
"c",
],
],
"results": Array [
Object {
"type": "return",
"value": true,
},
Object {
"type": "return",
"value": true,
},
Object {
"type": "return",
"value": true,
},
],
}
`);
});
test('send the filtered values on the output stream', async () => {
const result = await createPromiseFromStreams([
createListStream([1, 2, 3]),
createFilterStream<number>((n) => n % 2 === 0),
createConcatStream([]),
]);
expect(result).toMatchInlineSnapshot(`
Array [
2,
]
`);
});
});

View file

@ -1,33 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Transform } from 'stream';
export function createFilterStream<T>(fn: (obj: T) => boolean) {
return new Transform({
objectMode: true,
async transform(obj, _, done) {
const canPushDownStream = fn(obj);
if (canPushDownStream) {
this.push(obj);
}
done();
},
});
}

View file

@ -1,54 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import {
createPromiseFromStreams,
createListStream,
createIntersperseStream,
createConcatStream,
} from './';
describe('intersperseStream', () => {
test('places the intersperse value between each provided value', async () => {
expect(
await createPromiseFromStreams([
createListStream(['to', 'be', 'or', 'not', 'to', 'be']),
createIntersperseStream(' '),
createConcatStream(),
])
).toBe('to be or not to be');
});
test('emits values as soon as possible, does not needlessly buffer', async () => {
const str = createIntersperseStream('y');
const onData = jest.fn();
str.on('data', onData);
str.write('a');
expect(onData).toHaveBeenCalledTimes(1);
expect(onData.mock.calls[0]).toEqual(['a']);
onData.mockClear();
str.write('b');
expect(onData).toHaveBeenCalledTimes(2);
expect(onData.mock.calls[0]).toEqual(['y']);
expect(onData).toHaveBeenCalledTimes(2);
expect(onData.mock.calls[1]).toEqual(['b']);
});
});

View file

@ -1,61 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Transform } from 'stream';
/**
* Create a Transform stream that receives values in object mode,
* and intersperses a chunk between each object received.
*
* This is useful for writing lists:
*
* createListStream(['foo', 'bar'])
* .pipe(createIntersperseStream('\n'))
* .pipe(process.stdout) // outputs "foo\nbar"
*
* Combine with a concat stream to get "join" like functionality:
*
* await createPromiseFromStreams([
* createListStream(['foo', 'bar']),
* createIntersperseStream(' '),
* createConcatStream()
* ]) // produces a single value "foo bar"
*/
export function createIntersperseStream(intersperseChunk: any) {
let first = true;
return new Transform({
writableObjectMode: true,
readableObjectMode: true,
transform(chunk, _, callback) {
try {
if (first) {
first = false;
} else {
this.push(intersperseChunk);
}
this.push(chunk);
callback(undefined);
} catch (err) {
callback(err);
}
},
});
}

View file

@ -1,44 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { createListStream } from './';
describe('listStream', () => {
test('provides the values in the initial list', async () => {
const str = createListStream([1, 2, 3, 4]);
const onData = jest.fn();
str.on('data', onData);
await new Promise((resolve) => str.on('end', resolve));
expect(onData).toHaveBeenCalledTimes(4);
expect(onData.mock.calls[0]).toEqual([1]);
expect(onData.mock.calls[1]).toEqual([2]);
expect(onData.mock.calls[2]).toEqual([3]);
expect(onData.mock.calls[3]).toEqual([4]);
});
test('does not modify the list passed', async () => {
const list = [1, 2, 3, 4];
const str = createListStream(list);
str.resume();
await new Promise((resolve) => str.on('end', resolve));
expect(list).toEqual([1, 2, 3, 4]);
});
});

View file

@ -1,41 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Readable } from 'stream';
/**
* Create a Readable stream that provides the items
* from a list as objects to subscribers
*/
export function createListStream(items: any | any[] = []) {
const queue: any[] = [].concat(items);
return new Readable({
objectMode: true,
read(size) {
queue.splice(0, size).forEach((item) => {
this.push(item);
});
if (!queue.length) {
this.push(null);
}
},
});
}

View file

@ -1,61 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { delay } from 'bluebird';
import { createPromiseFromStreams } from './promise_from_streams';
import { createListStream } from './list_stream';
import { createMapStream } from './map_stream';
import { createConcatStream } from './concat_stream';
describe('createMapStream()', () => {
test('calls the function with each item in the source stream', async () => {
const mapper = jest.fn();
await createPromiseFromStreams([createListStream(['a', 'b', 'c']), createMapStream(mapper)]);
expect(mapper).toHaveBeenCalledTimes(3);
expect(mapper).toHaveBeenCalledWith('a', 0);
expect(mapper).toHaveBeenCalledWith('b', 1);
expect(mapper).toHaveBeenCalledWith('c', 2);
});
test('send the return value from the mapper on the output stream', async () => {
const result = await createPromiseFromStreams([
createListStream([1, 2, 3]),
createMapStream((n) => n * 100),
createConcatStream([]),
]);
expect(result).toEqual([100, 200, 300]);
});
test('supports async mappers', async () => {
const result = await createPromiseFromStreams([
createListStream([1, 2, 3]),
createMapStream(async (n, i) => {
await delay(n);
return n * i;
}),
createConcatStream([]),
]);
expect(result).toEqual([0, 2, 6]);
});
});

View file

@ -1,36 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Transform } from 'stream';
export function createMapStream<T = any>(fn: (chunk: any, i: number) => T | Promise<T>) {
let i = 0;
return new Transform({
objectMode: true,
async transform(value, _, done) {
try {
this.push(await fn(value, i++));
done();
} catch (err) {
done(err);
}
},
});
}

View file

@ -1,136 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Readable, Writable, Duplex, Transform } from 'stream';
import { createListStream, createPromiseFromStreams, createReduceStream } from './';
describe('promiseFromStreams', () => {
test('pipes together an array of streams', async () => {
const str1 = createListStream([1, 2, 3]);
const str2 = createReduceStream((acc, n) => acc + n, 0);
const sumPromise = new Promise((resolve) => str2.once('data', resolve));
createPromiseFromStreams([str1, str2]);
await new Promise((resolve) => str2.once('end', resolve));
expect(await sumPromise).toBe(6);
});
describe('last stream is writable', () => {
test('waits for the last stream to finish writing', async () => {
let written = '';
await createPromiseFromStreams([
createListStream(['a']),
new Writable({
write(chunk, enc, cb) {
setTimeout(() => {
written += chunk;
cb();
}, 100);
},
}),
]);
expect(written).toBe('a');
});
test('resolves to undefined', async () => {
const result = await createPromiseFromStreams([
createListStream(['a']),
new Writable({
write(chunk, enc, cb) {
cb();
},
}),
]);
expect(result).toBe(undefined);
});
});
describe('last stream is readable', () => {
test(`resolves to it's final value`, async () => {
const result = await createPromiseFromStreams([createListStream(['a', 'b', 'c'])]);
expect(result).toBe('c');
});
});
describe('last stream is duplex', () => {
test('waits for writing and resolves to final value', async () => {
let written = '';
const duplexReadQueue = [];
const duplexItemsToPush = ['foo', 'bar', null];
const result = await createPromiseFromStreams([
createListStream(['a', 'b', 'c']),
new Duplex({
async read() {
const result = await duplexReadQueue.shift();
this.push(result);
},
write(chunk, enc, cb) {
duplexReadQueue.push(
new Promise((resolve) => {
setTimeout(() => {
written += chunk;
cb();
resolve(duplexItemsToPush.shift());
}, 50);
})
);
},
}).setEncoding('utf8'),
]);
expect(written).toEqual('abc');
expect(result).toBe('bar');
});
});
describe('error handling', () => {
test('read stream gets destroyed when transform stream fails', async () => {
let destroyCalled = false;
const readStream = new Readable({
read() {
this.push('a');
this.push('b');
this.push('c');
this.push(null);
},
destroy() {
destroyCalled = true;
},
});
const transformStream = new Transform({
transform(chunk, enc, done) {
done(new Error('Test error'));
},
});
try {
await createPromiseFromStreams([readStream, transformStream]);
throw new Error('Should fail');
} catch (e) {
expect(e.message).toBe('Test error');
expect(destroyCalled).toBe(true);
}
});
});
});

View file

@ -1,64 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Take an array of streams, pipe the output
* from each one into the next, listening for
* errors from any of the streams, and then resolve
* the promise once the final stream has finished
* writing/reading.
*
* If the last stream is readable, it's final value
* will be provided as the promise value.
*
* Errors emitted from any stream will cause
* the promise to be rejected with that error.
*/
import { pipeline, Writable } from 'stream';
import { promisify } from 'util';
const asyncPipeline = promisify(pipeline);
export async function createPromiseFromStreams<T = any>(streams: any): Promise<T> {
let finalChunk: any;
const last = streams[streams.length - 1];
if (typeof last.read !== 'function' && streams.length === 1) {
// For a nicer error than what stream.pipeline throws
throw new Error('A minimum of 2 streams is required when a non-readable stream is given');
}
if (typeof last.read === 'function') {
// We are pushing a writable stream to capture the last chunk
streams.push(
new Writable({
// Use object mode even when "last" stream isn't. This allows to
// capture the last chunk as-is.
objectMode: true,
write(chunk, _, done) {
finalChunk = chunk;
done();
},
})
);
}
await asyncPipeline(...(streams as [any]));
return finalChunk;
}

View file

@ -1,84 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { createReduceStream, createPromiseFromStreams, createListStream } from './';
const promiseFromEvent = (name, emitter) =>
new Promise((resolve) => emitter.on(name, () => resolve(name)));
describe('reduceStream', () => {
test('calls the reducer for each item provided', async () => {
const stub = jest.fn();
await createPromiseFromStreams([
createListStream([1, 2, 3]),
createReduceStream((val, chunk, enc) => {
stub(val, chunk, enc);
return chunk;
}, 0),
]);
expect(stub).toHaveBeenCalledTimes(3);
expect(stub.mock.calls[0]).toEqual([0, 1, 'utf8']);
expect(stub.mock.calls[1]).toEqual([1, 2, 'utf8']);
expect(stub.mock.calls[2]).toEqual([2, 3, 'utf8']);
});
test('provides the return value of the last iteration of the reducer', async () => {
const result = await createPromiseFromStreams([
createListStream('abcdefg'.split('')),
createReduceStream((acc) => acc + 1, 0),
]);
expect(result).toBe(7);
});
test('emits an error if an iteration fails', async () => {
const reduce = createReduceStream((acc, i) => expect(i).toBe(1), 0);
const errorEvent = promiseFromEvent('error', reduce);
reduce.write(1);
reduce.write(2);
reduce.resume();
await errorEvent;
});
test('stops calling the reducer if an iteration fails, emits no data', async () => {
const reducer = jest.fn((acc, i) => {
if (i < 100) return acc + i;
else throw new Error(i);
});
const reduce$ = createReduceStream(reducer, 0);
const dataStub = jest.fn();
const errorStub = jest.fn();
reduce$.on('data', dataStub);
reduce$.on('error', errorStub);
const endEvent = promiseFromEvent('end', reduce$);
reduce$.write(1);
reduce$.write(2);
reduce$.write(300);
reduce$.write(400);
reduce$.write(1000);
reduce$.end();
await endEvent;
expect(reducer).toHaveBeenCalledTimes(3);
expect(dataStub).toHaveBeenCalledTimes(0);
expect(errorStub).toHaveBeenCalledTimes(1);
});
});

View file

@ -1,77 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Transform } from 'stream';
/**
* Create a transform stream that consumes each chunk it receives
* and passes it to the reducer, which will return the new value
* for the stream. Once all chunks have been received the reduce
* stream provides the result of final call to the reducer to
* subscribers.
*/
export function createReduceStream(
reducer: (acc: any, chunk: any, env: string) => any,
initial: any
) {
let i = -1;
let value = initial;
// if the reducer throws an error then the value is
// considered invalid and the stream will never provide
// it to subscribers. We will also stop calling the
// reducer for any new data that is provided to us
let failed = false;
if (typeof reducer !== 'function') {
throw new TypeError('reducer must be a function');
}
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk, enc, callback) {
try {
if (failed) {
return callback();
}
i += 1;
if (i === 0 && initial === undefined) {
value = chunk;
} else {
value = await reducer(value, chunk, enc);
}
callback();
} catch (err) {
failed = true;
callback(err);
}
},
flush(callback) {
if (!failed) {
this.push(value);
}
callback();
},
});
}

View file

@ -1,130 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import {
createReplaceStream,
createConcatStream,
createPromiseFromStreams,
createListStream,
createMapStream,
} from './';
async function concatToString(streams) {
return await createPromiseFromStreams([
...streams,
createMapStream((buff) => buff.toString('utf8')),
createConcatStream(''),
]);
}
describe('replaceStream', () => {
test('produces buffers when it receives buffers', async () => {
const chunks = await createPromiseFromStreams([
createListStream([Buffer.from('foo'), Buffer.from('bar')]),
createReplaceStream('o', '0'),
createConcatStream([]),
]);
chunks.forEach((chunk) => {
expect(chunk).toBeInstanceOf(Buffer);
});
});
test('produces buffers when it receives strings', async () => {
const chunks = await createPromiseFromStreams([
createListStream(['foo', 'bar']),
createReplaceStream('o', '0'),
createConcatStream([]),
]);
chunks.forEach((chunk) => {
expect(chunk).toBeInstanceOf(Buffer);
});
});
test('expects toReplace to be a string', () => {
expect(() => createReplaceStream(Buffer.from('foo'))).toThrowError(/be a string/);
});
test('replaces multiple single-char instances in a single chunk', async () => {
expect(
await concatToString([
createListStream([Buffer.from('f00 bar')]),
createReplaceStream('0', 'o'),
])
).toBe('foo bar');
});
test('replaces multiple single-char instances in multiple chunks', async () => {
expect(
await concatToString([
createListStream([Buffer.from('f0'), Buffer.from('0 bar')]),
createReplaceStream('0', 'o'),
])
).toBe('foo bar');
});
test('replaces single multi-char instances in single chunks', async () => {
expect(
await concatToString([
createListStream([Buffer.from('f0'), Buffer.from('0 bar')]),
createReplaceStream('0', 'o'),
])
).toBe('foo bar');
});
test('replaces multiple multi-char instances in single chunks', async () => {
expect(
await concatToString([
createListStream([Buffer.from('foo ba'), Buffer.from('r b'), Buffer.from('az bar')]),
createReplaceStream('bar', '*'),
])
).toBe('foo * baz *');
});
test('replaces multi-char instance that stretches multiple chunks', async () => {
expect(
await concatToString([
createListStream([
Buffer.from('foo supe'),
Buffer.from('rcalifra'),
Buffer.from('gilistic'),
Buffer.from('expialid'),
Buffer.from('ocious bar'),
]),
createReplaceStream('supercalifragilisticexpialidocious', '*'),
])
).toBe('foo * bar');
});
test('ignores missing multi-char instance', async () => {
expect(
await concatToString([
createListStream([
Buffer.from('foo supe'),
Buffer.from('rcalifra'),
Buffer.from('gili stic'),
Buffer.from('expialid'),
Buffer.from('ocious bar'),
]),
createReplaceStream('supercalifragilisticexpialidocious', '*'),
])
).toBe('foo supercalifragili sticexpialidocious bar');
});
});

View file

@ -1,84 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Transform } from 'stream';
export function createReplaceStream(toReplace: string, replacement: string) {
if (typeof toReplace !== 'string') {
throw new TypeError('toReplace must be a string');
}
let buffer = Buffer.alloc(0);
return new Transform({
objectMode: false,
async transform(value, _, done) {
try {
buffer = Buffer.concat([buffer, value], buffer.length + value.length);
while (true) {
// try to find the next instance of `toReplace` in buffer
const index = buffer.indexOf(toReplace);
// if there is no next instance, break
if (index === -1) {
break;
}
// flush everything to the left of the next instance
// of `toReplace`
this.push(buffer.slice(0, index));
// then flush an instance of `replacement`
this.push(replacement);
// and finally update the buffer to include everything
// to the right of `toReplace`, dropping to replace from the buffer
buffer = buffer.slice(index + toReplace.length);
}
// until now we have only flushed data that is to the left
// of a discovered instance of `toReplace`. If `toReplace` is
// never found this would lead to us buffering the entire stream.
//
// Instead, we only keep enough buffer to complete a potentially
// partial instance of `toReplace`
if (buffer.length > toReplace.length) {
// the entire buffer except the last `toReplace.length` bytes
// so that if all but one byte from `toReplace` is in the buffer,
// and the next chunk delivers the necessary byte, the buffer will then
// contain a complete `toReplace` token.
this.push(buffer.slice(0, buffer.length - toReplace.length));
buffer = buffer.slice(-toReplace.length);
}
done();
} catch (err) {
done(err);
}
},
flush(callback) {
if (buffer.length) {
this.push(buffer);
}
callback();
},
});
}

View file

@ -1,71 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { createSplitStream, createConcatStream, createPromiseFromStreams } from './';
async function split(stream, input) {
const concat = createConcatStream();
concat.write([]);
stream.pipe(concat);
const output = createPromiseFromStreams([concat]);
input.forEach((i) => {
stream.write(i);
});
stream.end();
return await output;
}
describe('splitStream', () => {
test('splits buffers, produces strings', async () => {
const output = await split(createSplitStream('&'), [Buffer.from('foo&bar')]);
expect(output).toEqual(['foo', 'bar']);
});
test('supports mixed input', async () => {
const output = await split(createSplitStream('&'), [Buffer.from('foo&b'), 'ar']);
expect(output).toEqual(['foo', 'bar']);
});
test('supports buffer split chunks', async () => {
const output = await split(createSplitStream(Buffer.from('&')), ['foo&b', 'ar']);
expect(output).toEqual(['foo', 'bar']);
});
test('splits provided values by a delimiter', async () => {
const output = await split(createSplitStream('&'), ['foo&b', 'ar']);
expect(output).toEqual(['foo', 'bar']);
});
test('handles multi-character delimiters', async () => {
const output = await split(createSplitStream('oo'), ['foo&b', 'ar']);
expect(output).toEqual(['f', '&bar']);
});
test('handles delimiters that span multiple chunks', async () => {
const output = await split(createSplitStream('ba'), ['foo&b', 'ar']);
expect(output).toEqual(['foo&', 'r']);
});
test('produces an empty chunk if the split char is at the end of the input', async () => {
const output = await split(createSplitStream('&bar'), ['foo&b', 'ar']);
expect(output).toEqual(['foo', '']);
});
});

View file

@ -1,71 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Transform } from 'stream';
/**
* Creates a Transform stream that consumes a stream of Buffers
* and produces a stream of strings (in object mode) by splitting
* the received bytes using the splitChunk.
*
* Ways this is behaves like String#split:
* - instances of splitChunk are removed from the input
* - splitChunk can be on any size
* - if there are no bytes found after the last splitChunk
* a final empty chunk is emitted
*
* Ways this deviates from String#split:
* - splitChunk cannot be a regexp
* - an empty string or Buffer will not produce a stream of individual
* bytes like `string.split('')` would
*/
export function createSplitStream(splitChunk: string) {
let unsplitBuffer = Buffer.alloc(0);
return new Transform({
writableObjectMode: false,
readableObjectMode: true,
transform(chunk, _, callback) {
try {
let i;
let toSplit = Buffer.concat([unsplitBuffer, chunk]);
while ((i = toSplit.indexOf(splitChunk)) !== -1) {
const slice = toSplit.slice(0, i);
toSplit = toSplit.slice(i + splitChunk.length);
this.push(slice.toString('utf8'));
}
unsplitBuffer = toSplit;
callback(undefined);
} catch (err) {
callback(err);
}
},
flush(callback) {
try {
this.push(unsplitBuffer.toString('utf8'));
callback(undefined);
} catch (err) {
callback(err);
}
},
});
}

View file

@ -10,6 +10,6 @@
"kbn:watch": "yarn build --watch"
},
"dependencies": {
"@kbn/std": "link:../kbn-std"
"@kbn/utils": "link:../kbn-utils"
}
}

View file

@ -20,7 +20,7 @@
import moment from 'moment';
import { attachMetaData } from './metadata';
import { createListStream, createPromiseFromStreams } from './test_utils';
import { createListStream, createPromiseFromStreams } from '@kbn/utils';
import { KbnLoggerJsonFormat } from './log_format_json';
const time = +moment('2010-01-01T05:15:59Z', moment.ISO_8601);

View file

@ -20,7 +20,7 @@
import moment from 'moment';
import { attachMetaData } from './metadata';
import { createListStream, createPromiseFromStreams } from './test_utils';
import { createListStream, createPromiseFromStreams } from '@kbn/utils';
import { KbnLoggerStringFormat } from './log_format_string';
const time = +moment('2010-01-01T05:15:59Z', moment.ISO_8601);

View file

@ -1,20 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { createListStream, createPromiseFromStreams } from './streams';

View file

@ -1,96 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { pipeline, Writable, Readable } from 'stream';
/**
* Create a Readable stream that provides the items
* from a list as objects to subscribers
*
* @param {Array<any>} items - the list of items to provide
* @return {Readable}
*/
export function createListStream<T = any>(items: T | T[] = []) {
const queue = Array.isArray(items) ? [...items] : [items];
return new Readable({
objectMode: true,
read(size) {
queue.splice(0, size).forEach((item) => {
this.push(item);
});
if (!queue.length) {
this.push(null);
}
},
});
}
/**
* Take an array of streams, pipe the output
* from each one into the next, listening for
* errors from any of the streams, and then resolve
* the promise once the final stream has finished
* writing/reading.
*
* If the last stream is readable, it's final value
* will be provided as the promise value.
*
* Errors emitted from any stream will cause
* the promise to be rejected with that error.
*
* @param {Array<Stream>} streams
* @return {Promise<any>}
*/
function isReadable(stream: Readable | Writable): stream is Readable {
return 'read' in stream && typeof stream.read === 'function';
}
export async function createPromiseFromStreams<T>(streams: [Readable, ...Writable[]]): Promise<T> {
let finalChunk: any;
const last = streams[streams.length - 1];
if (!isReadable(last) && streams.length === 1) {
// For a nicer error than what stream.pipeline throws
throw new Error('A minimum of 2 streams is required when a non-readable stream is given');
}
if (isReadable(last)) {
// We are pushing a writable stream to capture the last chunk
streams.push(
new Writable({
// Use object mode even when "last" stream isn't. This allows to
// capture the last chunk as-is.
objectMode: true,
write(chunk, enc, done) {
finalChunk = chunk;
done();
},
})
);
}
return new Promise((resolve, reject) => {
// @ts-expect-error 'pipeline' doesn't support variable length of arguments
pipeline(...streams, (err) => {
if (err) return reject(err);
resolve(finalChunk);
});
});
}

View file

@ -20,3 +20,4 @@
export * from './package_json';
export * from './path';
export * from './repo_root';
export * from './streams';

View file

@ -19,7 +19,8 @@
import { Logger } from '../cli_plugin/lib/logger';
import { confirm, question } from './utils';
import { createPromiseFromStreams, createConcatStream } from '../core/server/utils';
// import from path since add.test.js mocks 'fs' required for @kbn/utils
import { createPromiseFromStreams, createConcatStream } from '@kbn/utils/target/streams';
/**
* @param {Keystore} keystore

View file

@ -17,6 +17,5 @@
* under the License.
*/
export { shareWeakReplay } from './share_weak_replay';
export { Sha256 } from './crypto';
export { MountWrapper, mountReactNode } from './mount';

View file

@ -1,237 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import * as Rx from 'rxjs';
import { map, materialize, take, toArray } from 'rxjs/operators';
import { shareWeakReplay } from './share_weak_replay';
let completedCounts = 0;
function counter({ async = true }: { async?: boolean } = {}) {
let subCounter = 0;
function sendCount(subscriber: Rx.Subscriber<string>) {
let notifCounter = 0;
const sub = ++subCounter;
while (!subscriber.closed) {
subscriber.next(`${sub}:${++notifCounter}`);
}
completedCounts += 1;
}
return new Rx.Observable<string>((subscriber) => {
if (!async) {
sendCount(subscriber);
return;
}
const id = setTimeout(() => sendCount(subscriber));
return () => clearTimeout(id);
});
}
async function record<T>(observable: Rx.Observable<T>) {
return observable
.pipe(
materialize(),
map((n) => (n.kind === 'N' ? `N:${n.value}` : n.kind === 'E' ? `E:${n.error.message}` : 'C')),
toArray()
)
.toPromise();
}
afterEach(() => {
completedCounts = 0;
});
it('multicasts an observable to multiple children, unsubs once all children do, and resubscribes on next subscription', async () => {
const shared = counter().pipe(shareWeakReplay(1));
await expect(Promise.all([record(shared.pipe(take(1))), record(shared.pipe(take(2)))])).resolves
.toMatchInlineSnapshot(`
Array [
Array [
"N:1:1",
"C",
],
Array [
"N:1:1",
"N:1:2",
"C",
],
]
`);
await expect(Promise.all([record(shared.pipe(take(3))), record(shared.pipe(take(4)))])).resolves
.toMatchInlineSnapshot(`
Array [
Array [
"N:2:1",
"N:2:2",
"N:2:3",
"C",
],
Array [
"N:2:1",
"N:2:2",
"N:2:3",
"N:2:4",
"C",
],
]
`);
expect(completedCounts).toBe(2);
});
it('resubscribes if parent errors', async () => {
let errorCounter = 0;
const shared = counter().pipe(
map((v, i) => {
if (i === 3) {
throw new Error(`error ${++errorCounter}`);
}
return v;
}),
shareWeakReplay(2)
);
await expect(Promise.all([record(shared), record(shared)])).resolves.toMatchInlineSnapshot(`
Array [
Array [
"N:1:1",
"N:1:2",
"N:1:3",
"E:error 1",
],
Array [
"N:1:1",
"N:1:2",
"N:1:3",
"E:error 1",
],
]
`);
await expect(Promise.all([record(shared), record(shared)])).resolves.toMatchInlineSnapshot(`
Array [
Array [
"N:2:1",
"N:2:2",
"N:2:3",
"E:error 2",
],
Array [
"N:2:1",
"N:2:2",
"N:2:3",
"E:error 2",
],
]
`);
expect(completedCounts).toBe(2);
});
it('resubscribes if parent completes', async () => {
const shared = counter().pipe(take(4), shareWeakReplay(4));
await expect(Promise.all([record(shared.pipe(take(1))), record(shared)])).resolves
.toMatchInlineSnapshot(`
Array [
Array [
"N:1:1",
"C",
],
Array [
"N:1:1",
"N:1:2",
"N:1:3",
"N:1:4",
"C",
],
]
`);
await expect(Promise.all([record(shared.pipe(take(2))), record(shared)])).resolves
.toMatchInlineSnapshot(`
Array [
Array [
"N:2:1",
"N:2:2",
"C",
],
Array [
"N:2:1",
"N:2:2",
"N:2:3",
"N:2:4",
"C",
],
]
`);
expect(completedCounts).toBe(2);
});
it('supports parents that complete synchronously', async () => {
const next = jest.fn();
const complete = jest.fn();
const shared = counter({ async: false }).pipe(take(3), shareWeakReplay(1));
shared.subscribe({ next, complete });
expect(next.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
"1:1",
],
Array [
"1:2",
],
Array [
"1:3",
],
]
`);
expect(complete).toHaveBeenCalledTimes(1);
next.mockClear();
complete.mockClear();
shared.subscribe({ next, complete });
expect(next.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
"2:1",
],
Array [
"2:2",
],
Array [
"2:3",
],
]
`);
expect(complete).toHaveBeenCalledTimes(1);
expect(completedCounts).toBe(2);
});

View file

@ -1,66 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import * as Rx from 'rxjs';
import { takeUntil } from 'rxjs/operators';
/**
* Just like the [`shareReplay()`](https://rxjs-dev.firebaseapp.com/api/operators/shareReplay) operator from
* RxJS except for a few key differences:
*
* - If all downstream subscribers unsubscribe the source subscription will be unsubscribed.
*
* - Replay-ability is only maintained while the source is active, if it completes or errors
* then complete/error is sent to the current subscribers and the replay buffer is cleared.
*
* - Any subscription after the the source completes or errors will create a new subscription
* to the source observable.
*
* @param bufferSize Optional, default is `Number.POSITIVE_INFINITY`
*/
export function shareWeakReplay<T>(bufferSize?: number): Rx.MonoTypeOperatorFunction<T> {
return (source: Rx.Observable<T>) => {
let subject: Rx.ReplaySubject<T> | undefined;
const stop$ = new Rx.Subject();
return new Rx.Observable((observer) => {
if (!subject) {
subject = new Rx.ReplaySubject<T>(bufferSize);
}
subject.subscribe(observer).add(() => {
if (!subject) {
return;
}
if (subject.observers.length === 0) {
stop$.next();
}
if (subject.closed || subject.isStopped) {
subject = undefined;
}
});
if (subject && subject.observers.length === 1) {
source.pipe(takeUntil(stop$)).subscribe(subject);
}
});
};
}

View file

@ -20,7 +20,7 @@
import { exportSavedObjectsToStream } from './get_sorted_objects_for_export';
import { savedObjectsClientMock } from '../service/saved_objects_client.mock';
import { Readable } from 'stream';
import { createPromiseFromStreams, createConcatStream } from '../../utils/streams';
import { createPromiseFromStreams, createConcatStream } from '@kbn/utils';
async function readStreamToCompletion(stream: Readable) {
return createPromiseFromStreams([stream, createConcatStream([])]);

View file

@ -18,7 +18,7 @@
*/
import Boom from '@hapi/boom';
import { createListStream } from '../../utils/streams';
import { createListStream } from '@kbn/utils';
import {
SavedObjectsClientContract,
SavedObject,

View file

@ -23,7 +23,8 @@ import {
createFilterStream,
createMapStream,
createPromiseFromStreams,
} from '../../utils/streams';
} from '@kbn/utils';
import { SavedObject } from '../types';
import { createLimitStream } from './create_limit_stream';
import { SavedObjectsImportError } from './types';

View file

@ -17,11 +17,7 @@
* under the License.
*/
import {
createConcatStream,
createListStream,
createPromiseFromStreams,
} from '../../utils/streams';
import { createConcatStream, createListStream, createPromiseFromStreams } from '@kbn/utils';
import { createLimitStream } from './create_limit_stream';
describe('createLimitStream()', () => {

View file

@ -19,7 +19,8 @@
import { schema } from '@kbn/config-schema';
import stringify from 'json-stable-stringify';
import { createPromiseFromStreams, createMapStream, createConcatStream } from '../../utils/streams';
import { createPromiseFromStreams, createMapStream, createConcatStream } from '@kbn/utils';
import { IRouter } from '../../http';
import { SavedObjectConfig } from '../saved_objects_config';
import { exportSavedObjectsToStream } from '../export';

View file

@ -22,9 +22,9 @@ jest.mock('../../export', () => ({
}));
import * as exportMock from '../../export';
import { createListStream } from '../../../utils/streams';
import supertest from 'supertest';
import { UnwrapPromise } from '@kbn/utility-types';
import type { UnwrapPromise } from '@kbn/utility-types';
import { createListStream } from '@kbn/utils';
import { SavedObjectConfig } from '../../saved_objects_config';
import { registerExportRoute } from '../export';
import { setupServer, createExportableType } from '../test_utils';

View file

@ -19,7 +19,7 @@
import { createSavedObjectsStreamFromNdJson, validateTypes, validateObjects } from './utils';
import { Readable } from 'stream';
import { createPromiseFromStreams, createConcatStream } from '../../utils/streams';
import { createPromiseFromStreams, createConcatStream } from '@kbn/utils';
async function readStreamToCompletion(stream: Readable) {
return createPromiseFromStreams([stream, createConcatStream([])]);

View file

@ -26,7 +26,7 @@ import {
createPromiseFromStreams,
createListStream,
createConcatStream,
} from '../../utils/streams';
} from '@kbn/utils';
export async function createSavedObjectsStreamFromNdJson(ndJsonStream: Readable) {
const savedObjects = await createPromiseFromStreams([

View file

@ -20,4 +20,3 @@
export * from './crypto';
export * from './from_root';
export * from './package_json';
export * from './streams';

View file

@ -1,29 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { concatStreamProviders } from './concat_stream_providers';
export { createIntersperseStream } from './intersperse_stream';
export { createSplitStream } from './split_stream';
export { createListStream } from './list_stream';
export { createReduceStream } from './reduce_stream';
export { createPromiseFromStreams } from './promise_from_streams';
export { createConcatStream } from './concat_stream';
export { createMapStream } from './map_stream';
export { createReplaceStream } from './replace_stream';
export { createFilterStream } from './filter_stream';

View file

@ -20,11 +20,7 @@
import { Transform } from 'stream';
import { ExecaChildProcess } from 'execa';
import {
createPromiseFromStreams,
createSplitStream,
createMapStream,
} from '../../../core/server/utils';
import { createPromiseFromStreams, createSplitStream, createMapStream } from '@kbn/utils';
// creates a stream that skips empty lines unless they are followed by
// another line, preventing the empty lines produced by splitStream

View file

@ -7,6 +7,7 @@
import { chunk } from 'lodash/fp';
import { extname } from 'path';
import { schema } from '@kbn/config-schema';
import { createPromiseFromStreams } from '@kbn/utils';
import { validate } from '../../../../../common/validate';
import {
@ -20,7 +21,6 @@ import {
} from '../../../../../common/detection_engine/schemas/response/import_rules_schema';
import { isMlRule } from '../../../../../common/machine_learning/helpers';
import { IRouter } from '../../../../../../../../src/core/server';
import { createPromiseFromStreams } from '../../../../../../../../src/core/server/utils/';
import { DETECTION_ENGINE_RULES_URL } from '../../../../../common/constants';
import { ConfigType } from '../../../../config';
import { SetupPlugins } from '../../../../plugin';

View file

@ -4,6 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { Readable } from 'stream';
import { createPromiseFromStreams } from '@kbn/utils';
import {
transformAlertToRule,
getIdError,
@ -22,7 +24,6 @@ import { INTERNAL_IDENTIFIER } from '../../../../../common/constants';
import { PartialFilter, RuleTypeParams } from '../../types';
import { BulkError, ImportSuccessError } from '../utils';
import { getOutputRuleAlertForRest } from '../__mocks__/utils';
import { createPromiseFromStreams } from '../../../../../../../../src/core/server/utils';
import { PartialAlert } from '../../../../../../alerts/server';
import { SanitizedAlert } from '../../../../../../alerts/server/types';
import { createRulesStreamFromNdJson } from '../../rules/create_rules_stream_from_ndjson';

View file

@ -4,8 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { Readable } from 'stream';
import { createPromiseFromStreams } from '@kbn/utils';
import { createRulesStreamFromNdJson } from './create_rules_stream_from_ndjson';
import { createPromiseFromStreams } from 'src/core/server/utils';
import { BadRequestError } from '../errors/bad_request_error';
import { ImportRulesSchemaDecoded } from '../../../../common/detection_engine/schemas/request/import_rules_schema';

View file

@ -7,6 +7,8 @@ import { Transform } from 'stream';
import * as t from 'io-ts';
import { pipe } from 'fp-ts/lib/pipeable';
import { fold } from 'fp-ts/lib/Either';
import { createSplitStream, createMapStream, createConcatStream } from '@kbn/utils';
import { formatErrors } from '../../../../common/format_errors';
import { importRuleValidateTypeDependents } from '../../../../common/detection_engine/schemas/request/import_rules_type_dependents';
import { exactCheck } from '../../../../common/exact_check';
@ -15,11 +17,6 @@ import {
ImportRulesSchema,
ImportRulesSchemaDecoded,
} from '../../../../common/detection_engine/schemas/request/import_rules_schema';
import {
createSplitStream,
createMapStream,
createConcatStream,
} from '../../../../../../../src/core/server/utils';
import { BadRequestError } from '../errors/bad_request_error';
import {
parseNdjsonStrings,

View file

@ -9,11 +9,7 @@ import { pipe } from 'fp-ts/lib/pipeable';
import { fold } from 'fp-ts/lib/Either';
import { failure } from 'io-ts/lib/PathReporter';
import { identity } from 'fp-ts/lib/function';
import {
createConcatStream,
createSplitStream,
createMapStream,
} from '../../../../../../src/core/server/utils';
import { createConcatStream, createSplitStream, createMapStream } from '@kbn/utils';
import {
parseNdjsonStrings,
filterExportedCounts,

View file

@ -79,7 +79,7 @@ describe('import timelines', () => {
};
});
jest.doMock('../../../../../../../src/core/server/utils', () => {
jest.doMock('@kbn/utils', () => {
return {
createPromiseFromStreams: jest.fn().mockReturnValue(mockParsedObjects),
};
@ -545,7 +545,7 @@ describe('import timeline templates', () => {
};
});
jest.doMock('../../../../../../../src/core/server/utils', () => {
jest.doMock('@kbn/utils', () => {
return {
createPromiseFromStreams: jest.fn().mockReturnValue(mockParsedTemplateTimelineObjects),
};

View file

@ -7,11 +7,9 @@ import { set } from '@elastic/safer-lodash-set/fp';
import readline from 'readline';
import fs from 'fs';
import { Readable } from 'stream';
import { createListStream } from '@kbn/utils';
import { KibanaRequest, RequestHandlerContext } from 'src/core/server';
import { createListStream } from '../../../../../../../../src/core/server/utils';
import { SetupPlugins } from '../../../../plugin';
import { FrameworkRequest } from '../../../framework';

View file

@ -7,6 +7,7 @@
import { has, chunk, omit } from 'lodash/fp';
import { Readable } from 'stream';
import uuid from 'uuid';
import { createPromiseFromStreams } from '@kbn/utils';
import {
TimelineStatus,
@ -21,7 +22,6 @@ import { createBulkErrorObject, BulkError } from '../../../detection_engine/rout
import { createTimelines } from './create_timelines';
import { FrameworkRequest } from '../../../framework';
import { createTimelinesStreamFromNdJson } from '../../create_timelines_stream_from_ndjson';
import { createPromiseFromStreams } from '../../../../../../../../src/core/server/utils';
import { getTupleDuplicateErrorsAndUniqueTimeline } from './get_timelines_from_stream';
import { CompareTimelinesStatus } from './compare_timelines_status';

View file

@ -5,7 +5,7 @@
*/
import { join, resolve } from 'path';
import { createPromiseFromStreams } from '../../../../../../../../src/core/server/utils';
import { createPromiseFromStreams } from '@kbn/utils';
import { SecurityPluginSetup } from '../../../../../../security/server';
import { FrameworkRequest } from '../../../framework';

View file

@ -8,6 +8,8 @@ import { has, isString } from 'lodash/fp';
import { pipe } from 'fp-ts/lib/pipeable';
import { fold } from 'fp-ts/lib/Either';
import * as t from 'io-ts';
import { createMapStream, createFilterStream } from '@kbn/utils';
import { formatErrors } from '../../../common/format_errors';
import { importRuleValidateTypeDependents } from '../../../common/detection_engine/schemas/request/import_rules_type_dependents';
import {
@ -16,7 +18,6 @@ import {
ImportRulesSchema,
} from '../../../common/detection_engine/schemas/request/import_rules_schema';
import { exactCheck } from '../../../common/exact_check';
import { createMapStream, createFilterStream } from '../../../../../../src/core/server/utils';
import { BadRequestError } from '../../lib/detection_engine/errors/bad_request_error';
export interface RulesObjectsExportResultDetails {

View file

@ -5,7 +5,7 @@
*/
import { Readable } from 'stream';
import { createPromiseFromStreams, createConcatStream } from 'src/core/server/utils';
import { createPromiseFromStreams, createConcatStream } from '@kbn/utils';
async function readStreamToCompletion(stream: Readable) {
return (await (createPromiseFromStreams([stream, createConcatStream([])]) as unknown)) as any[];