Kill kbn_observable and @kbn/observable (#21944) (#22106)

Closes https://github.com/elastic/kibana/issues/17034
This commit is contained in:
Spencer 2018-08-16 21:22:09 -07:00 committed by GitHub
parent 6c9788dced
commit e07b7e80d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
75 changed files with 46 additions and 4997 deletions

View file

@ -1,44 +0,0 @@
# kbn-internal-native-observable
This package contains a [spec-compliant][spec] observable implementation that
does _not_ implement any additional helper methods on the observable.
NB! It is not intended to be used directly. It is exposed through
`../kbn-observable`, which also exposes several helpers, similar to a subset of
features in RxJS.
## Background
We only want to expose native JavaScript observables in the api, i.e. exposed
observables should _only_ implement the specific methods defined in the spec.
The primary reason for doing this is that we don't want to couple our plugin
api to a specific version of RxJS (or any other observable library that
implements additional methods on top of the spec).
As there exists no other library we can use in the interim while waiting for the
Observable spec to reach stage 3, all exposed observables in the Kibana platform
should rely on this package.
## Why a separate package?
This package is implemented as a separate package instead of directly in the
platform code base for a couple of reasons. We wanted to copy the
implementation from the [observable proposal][spec] directly (so it's easier to
stay up-to-date with the future spec), and we therefore didn't want to start
adding TS types directly to that implementation.
We tried to avoid this by implementing the type declaration file separately and
make that part of the build. However, to handle the JS file we would have to
enable the `allowJs` TypeScript compiler option, which doesn't yet play nicely
with the automatic building of declaration files we do in the `kbn-types`
package.
The best solution we found in the end was to extract this as a separate package
and specify the `types` field in the `package.json`. Then everything works out
of the box.
There is no other reasons for this to be a separate package, so if we find a
solution to the above we should consider inlining this implementation into the
platform.
[spec]: https://github.com/tc39/proposal-observable

View file

@ -1,135 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// This adds a symbol type for `Symbol.observable`, which doesn't exist globally
// in TypeScript yet.
declare global {
export interface SymbolConstructor {
readonly observable: symbol;
}
}
// These types are based on the Observable proposal readme, see
// https://github.com/tc39/proposal-observable#api, with the addition of using
// generics to define the type of the `value`.
interface Subscription {
// A boolean value indicating whether the subscription is closed
closed: boolean;
// Cancels the subscription
unsubscribe(): void;
}
interface Subscribable<T> {
subscribe(
observerOrNext?: SubscriptionObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void
): Subscription;
}
type ObservableInput<T> = Subscribable<T> | Iterable<T>;
interface SubscriptionObserver<T> {
// A boolean value indicating whether the subscription is closed
closed: boolean;
// Sends the next value in the sequence
next(value: T): void;
// Sends the sequence error
error(errorValue: Error): void;
// Sends the completion notification
complete(): void;
}
export interface StartObserver<T> {
start(subscription: Subscription): void;
next?(value: T): void;
error?(err: any): void;
complete?(): void;
}
export interface NextObserver<T> {
start?(subscription: Subscription): void;
next(value: T): void;
error?(err: any): void;
complete?(): void;
}
interface ErrorObserver<T> {
start?(subscription: Subscription): void;
next?(value: T): void;
error(err: any): void;
complete?(): void;
}
interface CompletionObserver<T> {
start?(subscription: Subscription): void;
next?(value: T): void;
error?(err: any): void;
complete(): void;
}
type PartialObserver<T> =
| StartObserver<T>
| NextObserver<T>
| ErrorObserver<T>
| CompletionObserver<T>;
interface Observer<T> {
// Receives the subscription object when `subscribe` is called
start(subscription: Subscription): void;
// Receives the next value in the sequence
next(value: T): void;
// Receives the sequence error
error(errorValue: Error): void;
// Receives a completion notification
complete(): void;
}
type SubscriberFunction<T> = (
observer: SubscriptionObserver<T>
) => void | null | undefined | (() => void) | Subscription;
export class Observable<T> {
public static of<T>(...items: T[]): Observable<T>;
public static from<T>(x: ObservableInput<T>): Observable<T>;
constructor(subscriber: SubscriberFunction<T>);
// Subscribes to the sequence with an observer
public subscribe(): Subscription;
public subscribe(observer: PartialObserver<T>): Subscription;
// Subscribes to the sequence with callbacks
public subscribe(
onNext: (val: T) => void,
onError?: (err: Error) => void,
onComplete?: () => void
): Subscription;
// Returns itself
public [Symbol.observable](): Observable<T>;
}

View file

@ -1,322 +0,0 @@
import symbolObservable from 'symbol-observable';
// This is a fork of the example implementation of the TC39 Observable spec,
// see https://github.com/tc39/proposal-observable.
//
// One change has been applied to work with current libraries: using the
// Symbol.observable ponyfill instead of relying on the implementation in the
// spec.
// === Abstract Operations ===
function nonEnum(obj) {
Object.getOwnPropertyNames(obj).forEach(k => {
Object.defineProperty(obj, k, { enumerable: false });
});
return obj;
}
function getMethod(obj, key) {
let value = obj[key];
if (value == null)
return undefined;
if (typeof value !== "function")
throw new TypeError(value + " is not a function");
return value;
}
function cleanupSubscription(subscription) {
// Assert: observer._observer is undefined
let cleanup = subscription._cleanup;
if (!cleanup)
return;
// Drop the reference to the cleanup function so that we won't call it
// more than once
subscription._cleanup = undefined;
// Call the cleanup function
try {
cleanup();
}
catch(e) {
// HostReportErrors(e);
}
}
function subscriptionClosed(subscription) {
return subscription._observer === undefined;
}
function closeSubscription(subscription) {
if (subscriptionClosed(subscription))
return;
subscription._observer = undefined;
cleanupSubscription(subscription);
}
function cleanupFromSubscription(subscription) {
return _=> { subscription.unsubscribe() };
}
function Subscription(observer, subscriber) {
// Assert: subscriber is callable
// The observer must be an object
this._cleanup = undefined;
this._observer = observer;
// If the observer has a start method, call it with the subscription object
try {
let start = getMethod(observer, "start");
if (start) {
start.call(observer, this);
}
}
catch(e) {
// HostReportErrors(e);
}
// If the observer has unsubscribed from the start method, exit
if (subscriptionClosed(this))
return;
observer = new SubscriptionObserver(this);
try {
// Call the subscriber function
let cleanup = subscriber.call(undefined, observer);
// The return value must be undefined, null, a subscription object, or a function
if (cleanup != null) {
if (typeof cleanup.unsubscribe === "function")
cleanup = cleanupFromSubscription(cleanup);
else if (typeof cleanup !== "function")
throw new TypeError(cleanup + " is not a function");
this._cleanup = cleanup;
}
} catch (e) {
// If an error occurs during startup, then send the error
// to the observer.
observer.error(e);
return;
}
// If the stream is already finished, then perform cleanup
if (subscriptionClosed(this)) {
cleanupSubscription(this);
}
}
Subscription.prototype = nonEnum({
get closed() { return subscriptionClosed(this) },
unsubscribe() { closeSubscription(this) },
});
function SubscriptionObserver(subscription) {
this._subscription = subscription;
}
SubscriptionObserver.prototype = nonEnum({
get closed() {
return subscriptionClosed(this._subscription);
},
next(value) {
let subscription = this._subscription;
// If the stream if closed, then return undefined
if (subscriptionClosed(subscription))
return undefined;
let observer = subscription._observer;
try {
let m = getMethod(observer, "next");
// If the observer doesn't support "next", then return undefined
if (!m)
return undefined;
// Send the next value to the sink
m.call(observer, value);
}
catch(e) {
// HostReportErrors(e);
}
return undefined;
},
error(value) {
let subscription = this._subscription;
// If the stream is closed, throw the error to the caller
if (subscriptionClosed(subscription)) {
return undefined;
}
let observer = subscription._observer;
subscription._observer = undefined;
try {
let m = getMethod(observer, "error");
// If the sink does not support "complete", then return undefined
if (m) {
m.call(observer, value);
}
else {
// HostReportErrors(e);
}
} catch (e) {
// HostReportErrors(e);
}
cleanupSubscription(subscription);
return undefined;
},
complete() {
let subscription = this._subscription;
// If the stream is closed, then return undefined
if (subscriptionClosed(subscription))
return undefined;
let observer = subscription._observer;
subscription._observer = undefined;
try {
let m = getMethod(observer, "complete");
// If the sink does not support "complete", then return undefined
if (m) {
m.call(observer);
}
} catch (e) {
// HostReportErrors(e);
}
cleanupSubscription(subscription);
return undefined;
},
});
export class Observable {
// == Fundamental ==
constructor(subscriber) {
// The stream subscriber must be a function
if (typeof subscriber !== "function")
throw new TypeError("Observable initializer must be a function");
this._subscriber = subscriber;
}
subscribe(observer, ...args) {
if (typeof observer === "function") {
observer = {
next: observer,
error: args[0],
complete: args[1]
};
}
else if (typeof observer !== "object") {
observer = {};
}
return new Subscription(observer, this._subscriber);
}
[symbolObservable]() { return this }
// == Derived ==
static from(x) {
let C = typeof this === "function" ? this : Observable;
if (x == null)
throw new TypeError(x + " is not an object");
let method = getMethod(x, symbolObservable);
if (method) {
let observable = method.call(x);
if (Object(observable) !== observable)
throw new TypeError(observable + " is not an object");
if (observable.constructor === C)
return observable;
return new C(observer => observable.subscribe(observer));
}
method = getMethod(x, Symbol.iterator);
if (!method)
throw new TypeError(x + " is not observable");
return new C(observer => {
for (let item of method.call(x)) {
observer.next(item);
if (observer.closed)
return;
}
observer.complete();
});
}
static of(...items) {
let C = typeof this === "function" ? this : Observable;
return new C(observer => {
for (let i = 0; i < items.length; ++i) {
observer.next(items[i]);
if (observer.closed)
return;
}
observer.complete();
});
}
}

View file

@ -1,138 +0,0 @@
# `kbn-observable`
kbn-observable is an observable library based on the [proposed `Observable`][proposal]
feature. In includes several factory functions and operators, that all return
"native" observable.
Why build this? The main reason is that we don't want to tie our plugin apis
heavily to a large dependency, but rather expose something that's much closer
to "native" observables, and something we have control over ourselves. Also, all
other observable libraries have their own base `Observable` class, while we
wanted to rely on the proposed functionality.
In addition, kbn-observable includes `System.observable`, which enables interop
between observable libraries, which means plugins can use whatever observable
library they want, if they don't want to use `kbn-observable`.
## Example
```js
import { Observable, k$, map, last } from '../kbn_observable';
const source = Observable.of(1, 2, 3);
// When `k$` is called with the source observable it returns a function that
// can be called with "operators" that modify the input value and return an
// observable that reflects all of the modifications.
k$(source)(map(i => 2017 + i), last())
.subscribe(console.log) // logs 2020
```
## Just getting started with Observables?
If you are just getting started with observables, a great place to start is with
Andre Staltz' [The introduction to Reactive Programming you've been missing][staltz-intro],
which is a great introduction to the ideas and concepts.
The ideas in `kbn-observable` is heavily based on [RxJS][rxjs], so the
[RxJS docs][rxjs-docs] are also a good source of introduction to observables and
how they work in this library.
**NOTE**: Do you know about good articles, videos or other resources that does
a great job at explaining observables? Add them here, so it becomes easier for
the next person to learn about them!
## Factories
Just like the `k$` function, factories take arguments and produce an observable.
Different factories are useful for different things, and many behave just like
the static functions attached to the `Rx.Observable` class in RxJS.
See [./src/factories](./src/factories) for more info about each factory.
## Operators
Operators are functions that take some arguments and produce an operator
function. Operators aren't anything fancy, just a function that takes an
observable and returns a new observable with the requested modifications
applied.
Some examples:
```js
map(i => 2017 + i);
filter(i => i % 2 === 0)
reduce((acc, val) => {
return acc + val;
}, 0);
```
Multiple operator functions can be passed to `k$` and will be applied to the
input observable before returning the final observable with all modifications
applied, e.g. like the example above with `map` and `last`.
See [./src/operators](./src/operators) for more info about each operator.
## More advanced topics
This library contains implementations of both `Observable` and `Subject`. To
better understand the difference between them, it's important to understand the
difference between hot and cold observables. Ben Lesh's
[Hot vs Cold Observables][hot-vs-cold] is a great introduction to this topic.
**NOTE**: Do you know about good articles, videos or other resources that goes
deeper into Observables and related topics? Make sure we get them added to this
list!
## Why `kbn-observable`?
While exploring how to handle observables in Kibana we went through multiple
PoCs. We initially used RxJS directly, but we didn't find a simple way to
consistently transform RxJS observables into "native" observables in the plugin
apis. This was something we wanted because of our earlier experiences with
exposing large libraries in our apis, which causes problems e.g. when we need to
perform major upgrades of a lib that has breaking changes, but we can't ship a
new major version of Kibana yet, even though this will cause breaking changes
in our plugin apis.
Then we built the initial version of `kbn-observable` based on the Observable
spec, and we included the `k$` helper and several operators that worked like
this:
```js
import { k$, Observable, map, first } from 'kbn-observable';
// general structure:
const resultObservable = k$(sourceObservable, [...operators]);
// e.g.
const source = Observable.of(1,2,3);
const observable = k$(source, [map(x => x + 1), first()]);
```
Here `Observable` is a copy of the Observable class from the spec. This
would enable us to always work with these spec-ed observables. This api for `k$`
worked nicely in pure JavaScript, but caused a problem with TypeScript, as
TypeScript wasn't able to correctly type the operators array when more than one
operator was specified.
Because of that problem we ended up with `k$(source)(...operators)`. With this
change TypeScript is able to correctly type the operator arguments.
We've also discussed adding a `pipe` method to the `Observable.prototype`, so we
could do `source.pipe(...operators)` instead, but we decided against it because
we didn't want to start adding features directly on the `Observable` object, but
rather follow the spec as close as possible, and only update whenever the spec
receives updates.
## Inspiration
This code is heavily inspired by and based on RxJS, which is licensed under the
Apache License, Version 2.0, see https://github.com/ReactiveX/rxjs.
[proposal]: https://github.com/tc39/proposal-observable
[rxjs]: http://reactivex.io/rxjs/
[rxjs-docs]: http://reactivex.io/rxjs/manual/index.html
[staltz-intro]: https://gist.github.com/staltz/868e7e9bc2a7b8c1f754

View file

@ -1,3 +0,0 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`should throw if it has received an error and getValue() is called 1`] = `"derp"`;

View file

@ -1,186 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { BehaviorSubject } from '../behavior_subject';
import { collect } from '../lib/collect';
import { Observable } from '../observable';
import { Subject } from '../subject';
test('should extend Subject', () => {
const subject = new BehaviorSubject(null);
expect(subject).toBeInstanceOf(Subject);
});
test('should throw if it has received an error and getValue() is called', () => {
const subject = new BehaviorSubject(null);
subject.error(new Error('derp'));
expect(() => {
subject.getValue();
}).toThrowErrorMatchingSnapshot();
});
test('should have a getValue() method to retrieve the current value', () => {
const subject = new BehaviorSubject('foo');
expect(subject.getValue()).toEqual('foo');
subject.next('bar');
expect(subject.getValue()).toEqual('bar');
});
test('should not update value after completed', () => {
const subject = new BehaviorSubject('foo');
expect(subject.getValue()).toEqual('foo');
subject.next('bar');
subject.complete();
subject.next('quux');
expect(subject.getValue()).toEqual('bar');
});
test('should start with an initialization value', async () => {
const subject = new BehaviorSubject('foo');
const res = collect(subject);
subject.next('bar');
subject.complete();
expect(await res).toEqual(['foo', 'bar', 'C']);
});
test('should pump values to multiple subscribers', async () => {
const subject = new BehaviorSubject('init');
const expected = ['init', 'foo', 'bar', 'C'];
const res1 = collect(subject);
const res2 = collect(subject);
expect((subject as any).observers.size).toEqual(2);
subject.next('foo');
subject.next('bar');
subject.complete();
expect(await res1).toEqual(expected);
expect(await res2).toEqual(expected);
});
test('should not pass values nexted after a complete', () => {
const subject = new BehaviorSubject('init');
const results: any[] = [];
subject.subscribe(x => {
results.push(x);
});
expect(results).toEqual(['init']);
subject.next('foo');
expect(results).toEqual(['init', 'foo']);
subject.complete();
expect(results).toEqual(['init', 'foo']);
subject.next('bar');
expect(results).toEqual(['init', 'foo']);
});
test('should clean out unsubscribed subscribers', () => {
const subject = new BehaviorSubject('init');
const sub1 = subject.subscribe();
const sub2 = subject.subscribe();
expect((subject as any).observers.size).toEqual(2);
sub1.unsubscribe();
expect((subject as any).observers.size).toEqual(1);
sub2.unsubscribe();
expect((subject as any).observers.size).toEqual(0);
});
test('should replay the previous value when subscribed', () => {
const subject = new BehaviorSubject(0);
subject.next(1);
subject.next(2);
const s1Actual: number[] = [];
const s1 = subject.subscribe(x => {
s1Actual.push(x);
});
subject.next(3);
subject.next(4);
const s2Actual: number[] = [];
const s2 = subject.subscribe(x => {
s2Actual.push(x);
});
s1.unsubscribe();
subject.next(5);
const s3Actual: number[] = [];
const s3 = subject.subscribe(x => {
s3Actual.push(x);
});
s2.unsubscribe();
s3.unsubscribe();
subject.complete();
expect(s1Actual).toEqual([2, 3, 4]);
expect(s2Actual).toEqual([4, 5]);
expect(s3Actual).toEqual([5]);
});
test('should emit complete when subscribed after completed', () => {
const source = Observable.of(1, 2, 3, 4, 5);
const subject = new BehaviorSubject(0);
const next = jest.fn();
const complete = jest.fn();
subject.complete();
subject.subscribe(next, undefined, complete);
source.subscribe(subject);
expect(next).not.toHaveBeenCalled();
expect(complete).toHaveBeenCalledTimes(1);
});
test('should be an Observer which can be given to Observable.subscribe', async () => {
const source = Observable.of(1, 2, 3, 4, 5);
const subject = new BehaviorSubject(0);
const res = collect(subject);
source.subscribe(subject);
expect(await res).toEqual([0, 1, 2, 3, 4, 5, 'C']);
expect(subject.getValue()).toBe(5);
});

View file

@ -1,90 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { MonoTypeOperatorFunction, OperatorFunction, UnaryFunction } from '../interfaces';
import { k$ } from '../k';
import { Observable } from '../observable';
const plus1: MonoTypeOperatorFunction<number> = source =>
new Observable(observer => {
source.subscribe({
next(val) {
observer.next(val + 1);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
});
});
const toString: OperatorFunction<number, string> = source =>
new Observable(observer => {
source.subscribe({
next(val) {
observer.next(val.toString());
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
});
});
const toPromise: UnaryFunction<Observable<number>, Promise<number>> = source =>
new Promise((resolve, reject) => {
let lastValue: number;
source.subscribe({
next(value) {
lastValue = value;
},
error(error) {
reject(error);
},
complete() {
resolve(lastValue);
},
});
});
test('observable to observable', () => {
const numbers$ = Observable.of(1, 2, 3);
const actual: any[] = [];
k$(numbers$)(plus1, toString).subscribe({
next(x) {
actual.push(x);
},
});
expect(actual).toEqual(['2', '3', '4']);
});
test('observable to promise', async () => {
const numbers$ = Observable.of(1, 2, 3);
const value = await k$(numbers$)(plus1, toPromise);
expect(value).toEqual(4);
});

View file

@ -1,160 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable, SubscriptionObserver } from '../observable';
test('receives values when subscribed', async () => {
let observer: SubscriptionObserver<any>;
const source = new Observable(innerObservable => {
observer = innerObservable;
});
const res: any[] = [];
source.subscribe({
next(x) {
res.push(x);
},
});
observer!.next('foo');
expect(res).toEqual(['foo']);
observer!.next('bar');
expect(res).toEqual(['foo', 'bar']);
});
test('triggers complete when observer is completed', async () => {
let observer: SubscriptionObserver<any>;
const source = new Observable(innerObservable => {
observer = innerObservable;
});
const complete = jest.fn();
source.subscribe({
complete,
});
observer!.complete();
expect(complete).toHaveBeenCalledTimes(1);
});
test('should send errors thrown in the constructor down the error path', async () => {
const err = new Error('this should be handled');
const source = new Observable(observer => {
throw err;
});
const error = jest.fn();
source.subscribe({
error,
});
expect(error).toHaveBeenCalledTimes(1);
expect(error).toHaveBeenCalledWith(err);
});
describe('subscriptions', () => {
test('handles multiple subscriptions and unsubscriptions', () => {
let observers = 0;
const source = new Observable(observer => {
observers++;
return () => {
observers--;
};
});
const sub1 = source.subscribe();
expect(observers).toBe(1);
const sub2 = source.subscribe();
expect(observers).toBe(2);
sub1.unsubscribe();
expect(observers).toBe(1);
sub2.unsubscribe();
expect(observers).toBe(0);
});
});
describe('Observable.from', () => {
test('handles array', () => {
const res: number[] = [];
const complete = jest.fn();
Observable.from([1, 2, 3]).subscribe({
next(x) {
res.push(x);
},
complete,
});
expect(complete).toHaveBeenCalledTimes(1);
expect(res).toEqual([1, 2, 3]);
});
test('handles iterable', () => {
const fooIterable: Iterable<number> = {
*[Symbol.iterator]() {
yield 1;
yield 2;
yield 3;
},
};
const res: number[] = [];
const complete = jest.fn();
Observable.from(fooIterable).subscribe({
next(x) {
res.push(x);
},
complete,
});
expect(complete).toHaveBeenCalledTimes(1);
expect(res).toEqual([1, 2, 3]);
});
});
describe('Observable.of', () => {
test('handles multiple args', () => {
const res: number[] = [];
const complete = jest.fn();
Observable.of(1, 2, 3).subscribe({
next(x) {
res.push(x);
},
complete,
});
expect(complete).toHaveBeenCalledTimes(1);
expect(res).toEqual([1, 2, 3]);
});
});

View file

@ -1,508 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { k$ } from '../k';
import { Observable } from '../observable';
import { first } from '../operators';
import { Subject } from '../subject';
const noop = () => {
// noop
};
test('should pump values right on through itself', () => {
const subject = new Subject<string>();
const actual: string[] = [];
subject.subscribe(x => {
actual.push(x);
});
subject.next('foo');
subject.next('bar');
subject.complete();
expect(actual).toEqual(['foo', 'bar']);
});
test('should pump values to multiple subscribers', () => {
const subject = new Subject<string>();
const actual: string[] = [];
subject.subscribe(x => {
actual.push(`1-${x}`);
});
subject.subscribe(x => {
actual.push(`2-${x}`);
});
expect((subject as any).observers.size).toEqual(2);
subject.next('foo');
subject.next('bar');
subject.complete();
expect(actual).toEqual(['1-foo', '2-foo', '1-bar', '2-bar']);
});
test('should handle subscribers that arrive and leave at different times, subject does not complete', () => {
const subject = new Subject();
const results1: any[] = [];
const results2: any[] = [];
const results3: any[] = [];
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
const subscription1 = subject.subscribe(
x => {
results1.push(x);
},
e => {
results1.push('E');
},
() => {
results1.push('C');
}
);
subject.next(5);
const subscription2 = subject.subscribe(
x => {
results2.push(x);
},
e => {
results2.push('E');
},
() => {
results2.push('C');
}
);
subject.next(6);
subject.next(7);
subscription1.unsubscribe();
subject.next(8);
subscription2.unsubscribe();
subject.next(9);
subject.next(10);
const subscription3 = subject.subscribe(
x => {
results3.push(x);
},
e => {
results3.push('E');
},
() => {
results3.push('C');
}
);
subject.next(11);
subscription3.unsubscribe();
expect(results1).toEqual([5, 6, 7]);
expect(results2).toEqual([6, 7, 8]);
expect(results3).toEqual([11]);
});
test('should handle subscribers that arrive and leave at different times, subject completes', () => {
const subject = new Subject();
const results1: any[] = [];
const results2: any[] = [];
const results3: any[] = [];
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
const subscription1 = subject.subscribe(
x => {
results1.push(x);
},
e => {
results1.push('E');
},
() => {
results1.push('C');
}
);
subject.next(5);
const subscription2 = subject.subscribe(
x => {
results2.push(x);
},
e => {
results2.push('E');
},
() => {
results2.push('C');
}
);
subject.next(6);
subject.next(7);
subscription1.unsubscribe();
subject.complete();
subscription2.unsubscribe();
const subscription3 = subject.subscribe(
x => {
results3.push(x);
},
e => {
results3.push('E');
},
() => {
results3.push('C');
}
);
subscription3.unsubscribe();
expect(results1).toEqual([5, 6, 7]);
expect(results2).toEqual([6, 7, 'C']);
expect(results3).toEqual(['C']);
});
test('should handle subscribers that arrive and leave at different times, subject terminates with an error', () => {
const subject = new Subject();
const results1: any[] = [];
const results2: any[] = [];
const results3: any[] = [];
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
const subscription1 = subject.subscribe(
x => {
results1.push(x);
},
e => {
results1.push('E');
},
() => {
results1.push('C');
}
);
subject.next(5);
const subscription2 = subject.subscribe(
x => {
results2.push(x);
},
e => {
results2.push('E');
},
() => {
results2.push('C');
}
);
subject.next(6);
subject.next(7);
subscription1.unsubscribe();
subject.error(new Error('err'));
subscription2.unsubscribe();
const subscription3 = subject.subscribe(
x => {
results3.push(x);
},
e => {
results3.push('E');
},
() => {
results3.push('C');
}
);
subscription3.unsubscribe();
expect(results1).toEqual([5, 6, 7]);
expect(results2).toEqual([6, 7, 'E']);
expect(results3).toEqual(['E']);
});
test('should handle subscribers that arrive and leave at different times, subject completes before nexting any value', () => {
const subject = new Subject();
const results1: any[] = [];
const results2: any[] = [];
const results3: any[] = [];
const subscription1 = subject.subscribe(
x => {
results1.push(x);
},
e => {
results1.push('E');
},
() => {
results1.push('C');
}
);
const subscription2 = subject.subscribe(
x => {
results2.push(x);
},
e => {
results2.push('E');
},
() => {
results2.push('C');
}
);
subscription1.unsubscribe();
subject.complete();
subscription2.unsubscribe();
const subscription3 = subject.subscribe(
x => {
results3.push(x);
},
e => {
results3.push('E');
},
() => {
results3.push('C');
}
);
subscription3.unsubscribe();
expect(results1).toEqual([]);
expect(results2).toEqual(['C']);
expect(results3).toEqual(['C']);
});
test('should clean out unsubscribed subscribers', () => {
const subject = new Subject();
const sub1 = subject.subscribe(noop);
const sub2 = subject.subscribe(noop);
expect((subject as any).observers.size).toBe(2);
sub1.unsubscribe();
expect((subject as any).observers.size).toBe(1);
sub2.unsubscribe();
expect((subject as any).observers.size).toBe(0);
});
test('should be an Observer which can be given to Observable.subscribe', () => {
const source = Observable.of(1, 2, 3, 4, 5);
const subject = new Subject<number>();
const actual: number[] = [];
const err = jest.fn();
const complete = jest.fn();
subject.subscribe(
x => {
actual.push(x);
},
err,
complete
);
source.subscribe(subject);
expect(actual).toEqual([1, 2, 3, 4, 5]);
expect(err).not.toHaveBeenCalled();
expect(complete).toHaveBeenCalledTimes(1);
});
test('can use subject in $k', async () => {
const values$ = new Subject();
const next = jest.fn();
const complete = jest.fn();
const error = jest.fn();
k$(values$)(first()).subscribe({
complete,
error,
next,
});
values$.next('test');
expect(next).toHaveBeenCalledTimes(1);
expect(next).toHaveBeenCalledWith('test');
expect(error).not.toHaveBeenCalled();
expect(complete).toHaveBeenCalled();
});
test('should not next after completed', () => {
const subject = new Subject<string>();
const results: any[] = [];
subject.subscribe(x => results.push(x), undefined, () => results.push('C'));
subject.next('a');
subject.complete();
subject.next('b');
expect(results).toEqual(['a', 'C']);
});
test('should not next after error', () => {
const error = new Error('wut?');
const subject = new Subject();
const results: any[] = [];
subject.subscribe(x => results.push(x), err => results.push(err));
subject.next('a');
subject.error(error);
subject.next('b');
expect(results).toEqual(['a', error]);
});
describe('asObservable', () => {
test('should hide subject', () => {
const subject = new Subject();
const observable = subject.asObservable();
expect(subject).not.toBe(observable);
expect(observable).toBeInstanceOf(Observable);
expect(observable).not.toBeInstanceOf(Subject);
});
test('should handle subject completes without emits', () => {
const subject = new Subject();
const complete = jest.fn();
subject.asObservable().subscribe({
complete,
});
subject.complete();
expect(complete).toHaveBeenCalledTimes(1);
});
test('should handle subject throws', () => {
const subject = new Subject();
const error = jest.fn();
subject.asObservable().subscribe({
error,
});
const e = new Error('yep');
subject.error(e);
expect(error).toHaveBeenCalledTimes(1);
expect(error).toHaveBeenCalledWith(e);
});
test('should handle subject emits', () => {
const subject = new Subject<number>();
const actual: number[] = [];
subject.asObservable().subscribe({
next(x) {
actual.push(x);
},
});
subject.next(1);
subject.next(2);
subject.complete();
expect(actual).toEqual([1, 2]);
});
test('can unsubscribe', () => {
const subject = new Subject<number>();
const actual: number[] = [];
const sub = subject.asObservable().subscribe({
next(x) {
actual.push(x);
},
});
subject.next(1);
sub.unsubscribe();
subject.next(2);
subject.complete();
expect(actual).toEqual([1]);
});
test('should handle multiple observables', () => {
const subject = new Subject<string>();
const actual: string[] = [];
subject.asObservable().subscribe({
next(x) {
actual.push(`1-${x}`);
},
});
subject.asObservable().subscribe({
next(x) {
actual.push(`2-${x}`);
},
});
subject.next('foo');
subject.next('bar');
subject.complete();
expect(actual).toEqual(['1-foo', '2-foo', '1-bar', '2-bar']);
});
});

View file

@ -1,63 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { SubscriptionObserver } from './observable';
import { Subject } from './subject';
/**
* A BehaviorSubject is a Subject that has a _current_ value.
*
* Whenever an observer subscribes to a BehaviorSubject, it begins by emitting
* the item most recently emitted by the source Observable (or a seed/default
* value if none has yet been emitted) and then continues to emit any other
* items emitted later by the source Observable(s).
*/
export class BehaviorSubject<T> extends Subject<T> {
constructor(private value: T) {
super();
}
/**
* @returns The current value of the BehaviorSubject. Most of the time this
* shouldn't be used directly, but there are situations were it can come in
* handy. Usually a BehaviorSubject is used so you immediately receive the
* latest/current value when subscribing.
*/
public getValue() {
if (this.thrownError !== undefined) {
throw this.thrownError;
}
return this.value;
}
public next(value: T) {
if (!this.isStopped) {
this.value = value;
}
return super.next(value);
}
protected registerObserver(observer: SubscriptionObserver<T>) {
if (!this.isStopped) {
observer.next(this.value);
}
return super.registerObserver(observer);
}
}

View file

@ -1,31 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export class EmptyError extends Error {
public code = 'K$_EMPTY_ERROR';
constructor(producer: string) {
super(`EmptyError: ${producer} requires source stream to emit at least one value.`);
// We're forching this to `any` as `captureStackTrace` is not a standard
// property, but a v8 specific one. There are node typings that we might
// want to use, see https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/node/index.d.ts#L47
(Error as any).captureStackTrace(this, EmptyError);
}
}

View file

@ -1,20 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { EmptyError } from './empty_error';

View file

@ -1,7 +0,0 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`errors if callback is called with more than two args 1`] = `
Array [
[Error: Node callback called with too many args],
]
`;

View file

@ -1,82 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { collect } from '../../lib/collect';
import { $bindNodeCallback } from '../bind_node_callback';
type NodeCallback = (err: any, val?: string) => void;
test('callback with error', async () => {
const error = new Error('fail');
const read = (cb: NodeCallback) => cb(error);
const read$ = $bindNodeCallback(read);
const res = collect(read$());
expect(await res).toEqual([error]);
});
test('callback with value', async () => {
const read = (cb: NodeCallback) => cb(undefined, 'test');
const read$ = $bindNodeCallback(read);
const res = collect(read$());
expect(await res).toEqual(['test', 'C']);
});
test('does not treat `null` as error', async () => {
const read = (cb: NodeCallback) => cb(null, 'test');
const read$ = $bindNodeCallback(read);
const res = collect(read$());
expect(await res).toEqual(['test', 'C']);
});
test('multiple args', async () => {
const read = (arg1: string, arg2: number, cb: NodeCallback) => cb(undefined, `${arg1}/${arg2}`);
const read$ = $bindNodeCallback(read);
const res = collect(read$('foo', 123));
expect(await res).toEqual(['foo/123', 'C']);
});
test('function throws instead of calling callback', async () => {
const error = new Error('fail');
const read = (cb: NodeCallback) => {
throw error;
};
const read$ = $bindNodeCallback(read);
const res = collect(read$());
expect(await res).toEqual([error]);
});
test('errors if callback is called with more than two args', async () => {
const read = (cb: (...args: any[]) => any) => cb(undefined, 'arg1', 'arg2');
const read$ = $bindNodeCallback(read);
const res = collect(read$());
expect(await res).toMatchSnapshot();
});

View file

@ -1,63 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { $combineLatest, $of } from '../../factories';
import { collect } from '../../lib/collect';
import { Subject } from '../../subject';
const tickMs = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
test('emits once for each combination of items', async () => {
const foo$ = new Subject();
const bar$ = new Subject();
const observable = $combineLatest(foo$, bar$);
const res = collect(observable);
await tickMs(10);
bar$.next('a');
await tickMs(5);
foo$.next(1);
await tickMs(5);
bar$.next('b');
await tickMs(5);
foo$.next(2);
bar$.next('c');
await tickMs(10);
foo$.next(3);
bar$.complete();
foo$.complete();
expect(await res).toEqual([[1, 'a'], [1, 'b'], [2, 'b'], [2, 'c'], [3, 'c'], 'C']);
});
test('only emits if every stream emits at least once', async () => {
const empty$ = $of();
const three$ = $of(1, 2, 3);
const observable = $combineLatest(empty$, three$);
const res = collect(observable);
expect(await res).toEqual(['C']);
});

View file

@ -1,73 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { $concat } from '../';
import { collect } from '../../lib/collect';
import { Subject } from '../../subject';
test('continous on next observable when previous completes', async () => {
const a = new Subject();
const b = new Subject();
const observable = $concat(a, b);
const res = collect(observable);
a.next('a1');
b.next('b1');
a.next('a2');
a.complete();
b.next('b2');
b.complete();
expect(await res).toEqual(['a1', 'a2', 'b2', 'C']);
});
test('errors when any observable errors', async () => {
const a = new Subject();
const b = new Subject();
const observable = $concat(a, b);
const res = collect(observable);
const error = new Error('fail');
a.next('a1');
a.error(error);
expect(await res).toEqual(['a1', error]);
});
test('handles early unsubscribe', () => {
const a = new Subject();
const b = new Subject();
const next = jest.fn();
const complete = jest.fn();
const sub = $concat(a, b).subscribe({ next, complete });
a.next('a1');
sub.unsubscribe();
a.next('a2');
a.complete();
b.next('b1');
b.complete();
expect(next).toHaveBeenCalledTimes(1);
expect(next).toHaveBeenCalledWith('a1');
expect(complete).toHaveBeenCalledTimes(0);
});

View file

@ -1,58 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { $from } from '../../factories';
test('handles array', () => {
const res: number[] = [];
const complete = jest.fn();
$from([1, 2, 3]).subscribe({
next(x) {
res.push(x);
},
complete,
});
expect(complete).toHaveBeenCalledTimes(1);
expect(res).toEqual([1, 2, 3]);
});
test('handles iterable', () => {
const fooIterable: Iterable<number> = {
*[Symbol.iterator]() {
yield 1;
yield 2;
yield 3;
},
};
const res: number[] = [];
const complete = jest.fn();
$from(fooIterable).subscribe({
next(x) {
res.push(x);
},
complete,
});
expect(complete).toHaveBeenCalledTimes(1);
expect(res).toEqual([1, 2, 3]);
});

View file

@ -1,89 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { $from } from '../';
import { collect } from '../../lib/collect';
import { Subject } from '../../subject';
import { $fromCallback } from '../from_callback';
test('returns raw value', async () => {
const observable = $fromCallback(() => 'foo');
const res = collect(observable);
expect(await res).toEqual(['foo', 'C']);
});
test('if undefined is returned, completes immediatley', async () => {
const observable = $fromCallback(() => undefined);
const res = collect(observable);
expect(await res).toEqual(['C']);
});
test('if null is returned, forwards it', async () => {
const observable = $fromCallback(() => null);
const res = collect(observable);
expect(await res).toEqual([null, 'C']);
});
test('returns observable that completes immediately', async () => {
const observable = $fromCallback(() => $from([1, 2, 3]));
const res = collect(observable);
expect(await res).toEqual([1, 2, 3, 'C']);
});
test('returns observable that completes later', () => {
const subject = new Subject();
const next = jest.fn();
const error = jest.fn();
const complete = jest.fn();
$fromCallback(() => subject).subscribe(next, error, complete);
expect(next).not.toHaveBeenCalled();
expect(error).not.toHaveBeenCalled();
expect(complete).not.toHaveBeenCalled();
subject.next('foo');
expect(next).toHaveBeenCalledTimes(1);
expect(error).not.toHaveBeenCalled();
expect(complete).not.toHaveBeenCalled();
subject.complete();
expect(error).not.toHaveBeenCalled();
expect(complete).toHaveBeenCalledTimes(1);
});
test('handles early unsubscribe', () => {
const subject = new Subject();
const next = () => {
// noop
};
const sub = $fromCallback(() => subject).subscribe(next);
subject.next('foo');
expect((subject as any).observers.size).toEqual(1);
sub.unsubscribe();
expect((subject as any).observers.size).toEqual(0);
});

View file

@ -1,100 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable } from '../observable';
export function $bindNodeCallback<R>(
callbackFunc: (callback: (err: any, result: R) => any) => any
): () => Observable<R>;
export function $bindNodeCallback<T, R>(
callbackFunc: (v1: T, callback: (err: any, result: R) => any) => any
): (v1: T) => Observable<R>;
export function $bindNodeCallback<T, T2, R>(
callbackFunc: (v1: T, v2: T2, callback: (err: any, result: R) => any) => any
): (v1: T, v2: T2) => Observable<R>;
export function $bindNodeCallback<T, T2, T3, R>(
callbackFunc: (v1: T, v2: T2, v3: T3, callback: (err: any, result: R) => any) => any
): (v1: T, v2: T2, v3: T3) => Observable<R>;
export function $bindNodeCallback<T, T2, T3, T4, R>(
callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, callback: (err: any, result: R) => any) => any
): (v1: T, v2: T2, v3: T3, v4: T4) => Observable<R>;
export function $bindNodeCallback<T, T2, T3, T4, T5, R>(
callbackFunc: (
v1: T,
v2: T2,
v3: T3,
v4: T4,
v5: T5,
callback: (err: any, result: R) => any
) => any
): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => Observable<R>;
export function $bindNodeCallback<T, T2, T3, T4, T5, T6, R>(
callbackFunc: (
v1: T,
v2: T2,
v3: T3,
v4: T4,
v5: T5,
v6: T6,
callback: (err: any, result: R) => any
) => any
): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => Observable<R>;
/**
* Converts a Node.js-style callback API to a function that returns an
* Observable.
*
* Does NOT handle functions whose callbacks have
* more than two parameters. Only the first value after the
* error argument will be returned.
*
* Example: Read a file from the filesystem and get the data as an Observable:
*
* import fs from 'fs';
* var readFileAsObservable = $bindNodeCallback(fs.readFile);
* var result = readFileAsObservable('./roadNames.txt', 'utf8');
* result.subscribe(
* x => console.log(x),
* e => console.error(e)
* );
*/
export function $bindNodeCallback<T>(callbackFunc: (...args: any[]) => any) {
return function(this: any, ...args: any[]): Observable<T> {
const context = this;
return new Observable(observer => {
function handlerFn(err?: Error, val?: T, ...rest: any[]) {
if (err != null) {
observer.error(err);
} else if (rest.length > 0) {
// If we've received more than two arguments, the function doesn't
// follow the common Node.js callback style. We could return an array
// if that happened, but as most code follow the pattern we don't
// special case it for now.
observer.error(new Error('Node callback called with too many args'));
} else {
observer.next(val!);
observer.complete();
}
}
callbackFunc.apply(context, args.concat([handlerFn]));
});
};
}

View file

@ -1,113 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable, ObservableInput } from '../observable';
import { $from } from './from';
const pending = Symbol('awaiting first value');
export function $combineLatest<T, T2>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>
): Observable<[T, T2]>;
export function $combineLatest<T, T2, T3>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>
): Observable<[T, T2, T3]>;
export function $combineLatest<T, T2, T3, T4>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>
): Observable<[T, T2, T3, T4]>;
export function $combineLatest<T, T2, T3, T4, T5>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
v5: ObservableInput<T5>
): Observable<[T, T2, T3, T4, T5]>;
export function $combineLatest<T, T2, T3, T4, T5, T6>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
v5: ObservableInput<T5>,
v6: ObservableInput<T6>
): Observable<[T, T2, T3, T4, T5, T6]>;
export function $combineLatest<T>(...observables: Array<ObservableInput<T>>): Observable<T[]>;
/**
* Creates an observable that combines the values by subscribing to all
* observables passed and emiting an array with the latest value from each
* observable once after each observable has emitted at least once, and again
* any time an observable emits after that.
*
* @param {Observable...}
* @return {Observable}
*/
export function $combineLatest<T>(...observables: Array<ObservableInput<T>>): Observable<T[]> {
return new Observable(observer => {
// create an array that will receive values as observables
// update and initialize it with `pending` symbols so that
// we know when observables emit for the first time
const values: Array<symbol | T> = observables.map(() => pending);
let needFirstCount = values.length;
let activeCount = values.length;
const subs = observables.map((observable, i) =>
$from(observable).subscribe({
next(value) {
if (values[i] === pending) {
needFirstCount--;
}
values[i] = value;
if (needFirstCount === 0) {
observer.next(values.slice() as T[]);
}
},
error(error) {
observer.error(error);
values.length = 0;
},
complete() {
activeCount--;
if (activeCount === 0) {
observer.complete();
values.length = 0;
}
},
})
);
return () => {
subs.forEach(sub => {
sub.unsubscribe();
});
values.length = 0;
};
});
}

View file

@ -1,64 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable, Subscription } from '../observable';
/**
* Creates an observable that combines all observables passed as arguments into
* a single output observable by subscribing to them in series, i.e. it will
* subscribe to the next observable when the previous completes.
*
* @param {Observable...}
* @return {Observable}
*/
export function $concat<T>(...observables: Array<Observable<T>>) {
return new Observable(observer => {
let subscription: Subscription | undefined;
function subscribe(i: number) {
if (observer.closed) {
return;
}
if (i >= observables.length) {
observer.complete();
}
subscription = observables[i].subscribe({
next(value) {
observer.next(value);
},
error(error) {
observer.error(error);
},
complete() {
subscribe(i + 1);
},
});
}
subscribe(0);
return () => {
if (subscription !== undefined) {
subscription.unsubscribe();
}
};
});
}

View file

@ -1,26 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable } from '../observable';
export function $error<E extends Error>(error: E) {
return new Observable(observer => {
observer.error(error);
});
}

View file

@ -1,32 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable, ObservableInput } from '../observable';
/**
* Alias for `Observable.from`
*
* If you need to handle:
*
* - promises, use `$fromPromise`
* - functions, use `$fromCallback`
*/
export function $from<T>(x: ObservableInput<T>): Observable<T> {
return Observable.from(x);
}

View file

@ -1,48 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { isObservable } from '../lib/is_observable';
import { Observable } from '../observable';
/**
* Creates an observable that calls the specified function with no arguments
* when it is subscribed. The observerable will behave differently based on the
* return value of the factory:
*
* - return `undefined`: observable will immediately complete
* - returns observable: observerable will mirror the returned value
* - otherwise: observable will emit the value and then complete
*
* @param {Function}
* @returns {Observable}
*/
export function $fromCallback<T>(factory: () => T | Observable<T>): Observable<T> {
return new Observable(observer => {
const result = factory();
if (result === undefined) {
observer.complete();
} else if (isObservable(result)) {
return result.subscribe(observer);
} else {
observer.next(result);
observer.complete();
}
});
}

View file

@ -1,42 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable } from '../observable';
/**
* Create an observable that mirrors a promise. If the promise resolves the
* observable will emit the resolved value and then complete. If it rejects then
* the observable will error.
*
* @param {Promise<T>}
* @return {Observable<T>}
*/
export function $fromPromise<T>(promise: Promise<T>): Observable<T> {
return new Observable(observer => {
promise.then(
value => {
observer.next(value);
observer.complete();
},
error => {
observer.error(error);
}
);
});
}

View file

@ -1,27 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { $from } from './from';
export { $combineLatest } from './combine_latest';
export { $concat } from './concat';
export { $fromCallback } from './from_callback';
export { $bindNodeCallback } from './bind_node_callback';
export { $fromPromise } from './from_promise';
export { $of } from './of';
export { $error } from './error';

View file

@ -1,27 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable } from '../observable';
/**
* Alias for `Observable.of`
*/
export function $of<T>(...items: T[]): Observable<T> {
return Observable.of(...items);
}

View file

@ -1,27 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { k$ } from './k';
export * from './observable';
export { Subject } from './subject';
export { BehaviorSubject } from './behavior_subject';
export * from './operators';
export * from './factories';

View file

@ -1,26 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable } from './observable';
export type UnaryFunction<T, R> = (source: T) => R;
export type OperatorFunction<T, R> = UnaryFunction<Observable<T>, Observable<R>>;
export type MonoTypeOperatorFunction<T> = OperatorFunction<T, T>;

View file

@ -1,90 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { $from } from './factories';
import { UnaryFunction } from './interfaces';
import { pipeFromArray } from './lib';
import { Observable, ObservableInput } from './observable';
export function k$<T, R>(source: ObservableInput<T>) {
function kOperations<A>(op1: UnaryFunction<Observable<T>, A>): A;
function kOperations<A, B>(op1: UnaryFunction<Observable<T>, A>, op2: UnaryFunction<A, B>): B;
function kOperations<A, B, C>(
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>
): C;
function kOperations<A, B, C, D>(
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>
): D;
function kOperations<A, B, C, D, E>(
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>
): E;
function kOperations<A, B, C, D, E, F>(
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>
): F;
function kOperations<A, B, C, D, E, F, G>(
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>
): G;
function kOperations<A, B, C, D, E, F, G, H>(
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>,
op8: UnaryFunction<G, H>
): H;
function kOperations<A, B, C, D, E, F, G, H, I>(
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>,
op8: UnaryFunction<G, H>,
op9: UnaryFunction<H, I>
): I;
function kOperations(...operations: Array<UnaryFunction<Observable<T>, R>>) {
return pipeFromArray(operations)($from(source));
}
return kOperations;
}

View file

@ -1,45 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable } from '../observable';
/**
* Test helper that collects all actions, and returns an array with all
* `next`-ed values, plus any `error` received or a `C` if `complete` is
* triggered.
*/
export function collect<T>(source: Observable<T>) {
return new Promise((resolve, reject) => {
const values: any[] = [];
source.subscribe({
next(x) {
values.push(x);
},
error(err) {
values.push(err);
resolve(values);
},
complete() {
values.push('C');
resolve(values);
},
});
});
}

View file

@ -1,20 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { pipe, pipeFromArray } from './pipe';

View file

@ -1,24 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable } from '../observable';
export function isObservable<T>(x: any): x is Observable<T> {
return x !== null && typeof x === 'object' && x[Symbol.observable] !== undefined;
}

View file

@ -1,106 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { UnaryFunction } from '../interfaces';
export function pipe<T>(): UnaryFunction<T, T>;
export function pipe<T, A>(op1: UnaryFunction<T, A>): UnaryFunction<T, A>;
export function pipe<T, A, B>(
op1: UnaryFunction<T, A>,
op2: UnaryFunction<A, B>
): UnaryFunction<T, B>;
export function pipe<T, A, B, C>(
op1: UnaryFunction<T, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>
): UnaryFunction<T, C>;
export function pipe<T, A, B, C, D>(
op1: UnaryFunction<T, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>
): UnaryFunction<T, D>;
export function pipe<T, A, B, C, D, E>(
op1: UnaryFunction<T, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>
): UnaryFunction<T, E>;
export function pipe<T, A, B, C, D, E, F>(
op1: UnaryFunction<T, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>
): UnaryFunction<T, F>;
export function pipe<T, A, B, C, D, E, F, G>(
op1: UnaryFunction<T, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>
): UnaryFunction<T, G>;
export function pipe<T, A, B, C, D, E, F, G, H>(
op1: UnaryFunction<T, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>,
op8: UnaryFunction<G, H>
): UnaryFunction<T, H>;
export function pipe<T, A, B, C, D, E, F, G, H, I>(
op1: UnaryFunction<T, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>,
op8: UnaryFunction<G, H>,
op9: UnaryFunction<H, I>
): UnaryFunction<T, I>;
export function pipe<T, R>(...fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
return pipeFromArray(fns);
}
const noop: () => any = () => {
// noop
};
/* @internal */
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
if (fns.length === 0) {
return noop as UnaryFunction<T, R>;
}
if (fns.length === 1) {
return fns[0];
}
return function piped(input: T): R {
return fns.reduce((prev: any, fn) => fn(prev), input);
};
}

View file

@ -1,28 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export {
Observable,
ObservableInput,
Subscription,
Subscribable,
SubscriptionObserver,
Observer,
PartialObserver,
} from '../kbn_internal_native_observable';

View file

@ -1,7 +0,0 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`returns error if completing without receiving any value 1`] = `
Array [
[Error: EmptyError: first() requires source stream to emit at least one value.],
]
`;

View file

@ -1,9 +0,0 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`returns error if completing without receiving any value 1`] = `
Array [
Array [
[Error: EmptyError: last() requires source stream to emit at least one value.],
],
]
`;

View file

@ -1,9 +0,0 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`rejects if error received 1`] = `
Array [
Array [
[Error: fail],
],
]
`;

View file

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { filter } from '../';
import { $from } from '../../factories';
import { k$ } from '../../k';
import { collect } from '../../lib/collect';
const number$ = $from([1, 2, 3]);
test('returns the filtered values', async () => {
const filter$ = k$(number$)(filter(n => n > 1));
const res = collect(filter$);
expect(await res).toEqual([2, 3, 'C']);
});
test('sends the index as arg 2', async () => {
const filter$ = k$(number$)(filter((n, i) => i > 1));
const res = collect(filter$);
expect(await res).toEqual([3, 'C']);
});

View file

@ -1,59 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { first } from '../';
import { k$ } from '../../k';
import { collect } from '../../lib/collect';
import { Subject } from '../../subject';
test('returns the first value, then completes', async () => {
const values$ = new Subject();
const observable = k$(values$)(first());
const res = collect(observable);
values$.next('foo');
values$.next('bar');
expect(await res).toEqual(['foo', 'C']);
});
test('handles source completing after receiving value', async () => {
const values$ = new Subject();
const observable = k$(values$)(first());
const res = collect(observable);
values$.next('foo');
values$.next('bar');
values$.complete();
expect(await res).toEqual(['foo', 'C']);
});
test('returns error if completing without receiving any value', async () => {
const values$ = new Subject();
const observable = k$(values$)(first());
const res = collect(observable);
values$.complete();
expect(await res).toMatchSnapshot();
});

View file

@ -1,60 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { last } from '../';
import { k$ } from '../../k';
import { Subject } from '../../subject';
test('returns the last value', async () => {
const values$ = new Subject();
const next = jest.fn();
const error = jest.fn();
const complete = jest.fn();
k$(values$)(last()).subscribe(next, error, complete);
values$.next('foo');
expect(next).not.toHaveBeenCalled();
values$.next('bar');
expect(next).not.toHaveBeenCalled();
values$.complete();
expect(next).toHaveBeenCalledTimes(1);
expect(next).toHaveBeenCalledWith('bar');
expect(error).not.toHaveBeenCalled();
expect(complete).toHaveBeenCalledTimes(1);
});
test('returns error if completing without receiving any value', async () => {
const values$ = new Subject();
const error = jest.fn();
k$(values$)(last()).subscribe({
error,
});
values$.complete();
expect(error).toHaveBeenCalledTimes(1);
expect(error.mock.calls).toMatchSnapshot();
});

View file

@ -1,38 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { map, toArray, toPromise } from '../';
import { $from } from '../../factories';
import { k$ } from '../../k';
import { Observable } from '../../observable';
const number$ = $from([1, 2, 3]);
const collect = <T>(source: Observable<T>) => k$(source)(toArray(), toPromise());
test('returns the modified value', async () => {
const numbers = await collect(k$(number$)(map(n => n * 1000)));
expect(numbers).toEqual([1000, 2000, 3000]);
});
test('sends the index as arg 2', async () => {
const numbers = await collect(k$(number$)(map((n, i) => i)));
expect(numbers).toEqual([0, 1, 2]);
});

View file

@ -1,145 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { map, mergeMap } from '../';
import { $error, $of } from '../../factories';
import { k$ } from '../../k';
import { collect } from '../../lib/collect';
import { Observable } from '../../observable';
import { Subject } from '../../subject';
const tickMs = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
test('should mergeMap many outer values to many inner values', async () => {
const inner$ = new Subject();
const outer$ = Observable.from([1, 2, 3, 4]);
const project = (value: number) => k$(inner$)(map(x => `${value}-${x}`));
const observable = k$(outer$)(mergeMap(project));
const res = collect(observable);
await tickMs(10);
inner$.next('a');
await tickMs(10);
inner$.next('b');
await tickMs(10);
inner$.next('c');
inner$.complete();
expect(await res).toEqual([
'1-a',
'2-a',
'3-a',
'4-a',
'1-b',
'2-b',
'3-b',
'4-b',
'1-c',
'2-c',
'3-c',
'4-c',
'C',
]);
});
test('should mergeMap many outer values to many inner values, early complete', async () => {
const outer$ = new Subject<number>();
const inner$ = new Subject();
const project = (value: number) => k$(inner$)(map(x => `${value}-${x}`));
const observable = k$(outer$)(mergeMap(project));
const res = collect(observable);
outer$.next(1);
outer$.next(2);
outer$.complete();
// This shouldn't end up in the results because `outer$` has completed.
outer$.next(3);
await tickMs(5);
inner$.next('a');
await tickMs(5);
inner$.next('b');
await tickMs(5);
inner$.next('c');
inner$.complete();
expect(await res).toEqual(['1-a', '2-a', '1-b', '2-b', '1-c', '2-c', 'C']);
});
test('should mergeMap many outer to many inner, and inner throws', async () => {
const source = Observable.from([1, 2, 3, 4]);
const error = new Error('fail');
const project = (value: number, index: number) => (index > 1 ? $error(error) : $of(value));
const observable = k$(source)(mergeMap(project));
const res = collect(observable);
expect(await res).toEqual([1, 2, error]);
});
test('should mergeMap many outer to many inner, and outer throws', async () => {
const outer$ = new Subject<number>();
const inner$ = new Subject();
const project = (value: number) => k$(inner$)(map(x => `${value}-${x}`));
const observable = k$(outer$)(mergeMap(project));
const res = collect(observable);
outer$.next(1);
outer$.next(2);
const error = new Error('outer fails');
await tickMs(5);
inner$.next('a');
await tickMs(5);
inner$.next('b');
outer$.error(error);
// This shouldn't end up in the results because `outer$` has failed
outer$.next(3);
await tickMs(5);
inner$.next('c');
expect(await res).toEqual(['1-a', '2-a', '1-b', '2-b', error]);
});
test('should mergeMap many outer to an array for each value', async () => {
const source = Observable.from([1, 2, 3]);
const observable = k$(source)(mergeMap(() => $of('a', 'b', 'c')));
const res = collect(observable);
expect(await res).toEqual(['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c', 'C']);
});

View file

@ -1,71 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { reduce } from '../';
import { k$ } from '../../k';
import { collect } from '../../lib/collect';
import { Subject } from '../../subject';
test('completes when source completes', async () => {
const subject = new Subject<string>();
const observable = k$(subject)(
reduce((acc, val) => {
return acc + val;
}, 'foo')
);
const res = collect(observable);
subject.next('bar');
subject.next('baz');
subject.complete();
expect(await res).toEqual(['foobarbaz', 'C']);
});
test('injects index', async () => {
const subject = new Subject<string>();
const observable = k$(subject)(
reduce((acc, val, index) => {
return acc + index;
}, 'foo')
);
const res = collect(observable);
subject.next('bar');
subject.next('baz');
subject.complete();
expect(await res).toEqual(['foo01', 'C']);
});
test('completes with initial value if no values received', async () => {
const subject = new Subject<string>();
const observable = k$(subject)(
reduce((acc, val, index) => {
return acc + val;
}, 'foo')
);
const res = collect(observable);
subject.complete();
expect(await res).toEqual(['foo', 'C']);
});

View file

@ -1,72 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { scan } from '../';
import { k$ } from '../../k';
import { collect } from '../../lib/collect';
import { Subject } from '../../subject';
test('completes when source completes', async () => {
const subject = new Subject<string>();
const observable = k$(subject)(
scan((acc, val) => {
return acc + val;
}, 'foo')
);
const res = collect(observable);
subject.next('bar');
subject.next('baz');
subject.complete();
expect(await res).toEqual(['foobar', 'foobarbaz', 'C']);
});
test('injects index', async () => {
const subject = new Subject<string>();
const observable = k$(subject)(
scan((acc, val, index) => {
return acc + index;
}, 'foo')
);
const res = collect(observable);
subject.next('bar');
subject.next('baz');
subject.complete();
expect(await res).toEqual(['foo0', 'foo01', 'C']);
});
test('completes if no values received', async () => {
const subject = new Subject<string>();
const observable = k$(subject)(
scan((acc, val, index) => {
return acc + val;
}, 'foo')
);
const res = collect(observable);
subject.complete();
expect(await res).toEqual(['C']);
});

View file

@ -1,208 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { skipRepeats } from '../';
import { $of } from '../../factories';
import { k$ } from '../../k';
import { collect } from '../../lib/collect';
import { Observable } from '../../observable';
import { Subject } from '../../subject';
test('should distinguish between values', async () => {
const values$ = new Subject<string>();
const observable = k$(values$)(skipRepeats());
const res = collect(observable);
values$.next('a');
values$.next('a');
values$.next('a');
values$.next('b');
values$.next('b');
values$.next('a');
values$.next('a');
values$.complete();
expect(await res).toEqual(['a', 'b', 'a', 'C']);
});
test('should distinguish between values and does not complete', () => {
const values$ = new Subject<string>();
const actual: any[] = [];
k$(values$)(skipRepeats()).subscribe({
next(v) {
actual.push(v);
},
});
values$.next('a');
values$.next('a');
values$.next('a');
values$.next('b');
values$.next('b');
values$.next('a');
values$.next('a');
expect(actual).toEqual(['a', 'b', 'a']);
});
test('should complete if source is empty', done => {
const values$ = $of();
k$(values$)(skipRepeats()).subscribe({
complete: done,
});
});
test('should emit if source emits single element only', () => {
const values$ = new Subject<string>();
const actual: any[] = [];
k$(values$)(skipRepeats()).subscribe({
next(x) {
actual.push(x);
},
});
values$.next('a');
expect(actual).toEqual(['a']);
});
test('should emit if source is scalar', () => {
const values$ = $of('a');
const actual: any[] = [];
k$(values$)(skipRepeats()).subscribe({
next(v) {
actual.push(v);
},
});
expect(actual).toEqual(['a']);
});
test('should raise error if source raises error', async () => {
const values$ = new Subject<string>();
const observable = k$(values$)(skipRepeats());
const res = collect(observable);
values$.next('a');
values$.next('a');
const thrownError = new Error('nope');
values$.error(thrownError);
expect(await res).toEqual(['a', thrownError]);
});
test('should raise error if source throws', () => {
const thrownError = new Error('fail');
const obs = new Observable(observer => {
observer.error(thrownError);
});
const error = jest.fn();
k$(obs)(skipRepeats()).subscribe({
error,
});
expect(error).toHaveBeenCalledWith(thrownError);
});
test('should allow unsubscribing early and explicitly', () => {
const values$ = new Subject<string>();
const actual: any[] = [];
const sub = k$(values$)(skipRepeats()).subscribe({
next(v) {
actual.push(v);
},
});
values$.next('a');
values$.next('a');
values$.next('b');
sub.unsubscribe();
values$.next('c');
values$.next('d');
expect(actual).toEqual(['a', 'b']);
});
test('should emit once if comparator returns true always regardless of source emits', () => {
const values$ = new Subject<string>();
const actual: any[] = [];
k$(values$)(skipRepeats(() => true)).subscribe({
next(v) {
actual.push(v);
},
});
values$.next('a');
values$.next('a');
values$.next('b');
values$.next('c');
expect(actual).toEqual(['a']);
});
test('should emit all if comparator returns false always regardless of source emits', () => {
const values$ = new Subject<string>();
const actual: any[] = [];
k$(values$)(skipRepeats(() => false)).subscribe({
next(v) {
actual.push(v);
},
});
values$.next('a');
values$.next('a');
values$.next('a');
values$.next('a');
expect(actual).toEqual(['a', 'a', 'a', 'a']);
});
test('should distinguish values by comparator', () => {
const values$ = new Subject<number>();
const comparator = (x: number, y: number) => y % 2 === 0;
const actual: any[] = [];
k$(values$)(skipRepeats(comparator)).subscribe({
next(v) {
actual.push(v);
},
});
values$.next(1);
values$.next(2);
values$.next(3);
values$.next(4);
expect(actual).toEqual([1, 3]);
});

View file

@ -1,302 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { switchMap } from '../';
import { $of } from '../../factories';
import { k$ } from '../../k';
import { collect } from '../../lib/collect';
import { Observable } from '../../observable';
import { Subject } from '../../subject';
const number$ = $of(1, 2, 3);
test('returns the modified value', async () => {
const expected = ['a1', 'b1', 'c1', 'a2', 'b2', 'c2', 'a3', 'b3', 'c3', 'C'];
const observable = k$(number$)(switchMap(x => $of('a' + x, 'b' + x, 'c' + x)));
const res = collect(observable);
expect(await res).toEqual(expected);
});
test('injects index to map', async () => {
const observable = k$(number$)(switchMap((x, i) => $of(i)));
const res = collect(observable);
expect(await res).toEqual([0, 1, 2, 'C']);
});
test('should unsubscribe inner observable when source observable emits new value', async () => {
const unsubbed: string[] = [];
const subject = new Subject<string>();
k$(subject)(
switchMap(
x =>
new Observable(observer => {
return () => {
unsubbed.push(x);
};
})
)
).subscribe();
subject.next('a');
expect(unsubbed).toEqual([]);
subject.next('b');
expect(unsubbed).toEqual(['a']);
subject.next('c');
expect(unsubbed).toEqual(['a', 'b']);
subject.complete();
expect(unsubbed).toEqual(['a', 'b', 'c']);
});
test('should unsubscribe inner observable when source observable errors', async () => {
const unsubbed: string[] = [];
const subject = new Subject<string>();
k$(subject)(
switchMap(
x =>
new Observable(observer => {
return () => {
unsubbed.push(x);
};
})
)
).subscribe();
subject.next('a');
subject.error(new Error('fail'));
expect(unsubbed).toEqual(['a']);
});
test('should unsubscribe inner observables if inner observer completes', async () => {
const unsubbed: string[] = [];
const subject = new Subject<string>();
k$(subject)(
switchMap(
x =>
new Observable(observer => {
observer.complete();
return () => {
unsubbed.push(x);
};
})
)
).subscribe();
subject.next('a');
expect(unsubbed).toEqual(['a']);
subject.next('b');
expect(unsubbed).toEqual(['a', 'b']);
subject.complete();
expect(unsubbed).toEqual(['a', 'b']);
});
test('should unsubscribe inner observables if inner observer errors', async () => {
const unsubbed: string[] = [];
const subject = new Subject<string>();
const error = jest.fn();
const thrownError = new Error('fail');
k$(subject)(
switchMap(
x =>
new Observable(observer => {
observer.error(thrownError);
return () => {
unsubbed.push(x);
};
})
)
).subscribe({
error,
});
subject.next('a');
expect(unsubbed).toEqual(['a']);
expect(error).toHaveBeenCalledTimes(1);
expect(error).toHaveBeenCalledWith(thrownError);
});
test('should switch inner observables', () => {
const outer$ = new Subject<'x' | 'y'>();
const inner$ = {
x: new Subject(),
y: new Subject(),
};
const actual: any[] = [];
k$(outer$)(switchMap(x => inner$[x])).subscribe({
next(val) {
actual.push(val);
},
});
outer$.next('x');
inner$.x.next('foo');
inner$.x.next('bar');
outer$.next('y');
inner$.x.next('baz');
inner$.y.next('quux');
outer$.complete();
expect(actual).toEqual(['foo', 'bar', 'quux']);
});
test('should switch inner empty and empty', () => {
const outer$ = new Subject<'x' | 'y'>();
const inner$ = {
x: new Subject(),
y: new Subject(),
};
const next = jest.fn();
k$(outer$)(switchMap(x => inner$[x])).subscribe(next);
outer$.next('x');
inner$.x.complete();
outer$.next('y');
inner$.y.complete();
outer$.complete();
expect(next).not.toHaveBeenCalled();
});
test('should switch inner never and throw', async () => {
const error = new Error('sad');
const outer$ = new Subject<'x' | 'y'>();
const inner$ = {
x: new Subject(),
y: new Subject(),
};
inner$.y.error(error);
const observable = k$(outer$)(switchMap(x => inner$[x]));
const res = collect(observable);
outer$.next('x');
outer$.next('y');
outer$.complete();
expect(await res).toEqual([error]);
});
test('should handle outer throw', async () => {
const error = new Error('foo');
const outer$ = new Observable<string>(observer => {
throw error;
});
const observable = k$(outer$)(switchMap(x => $of(x)));
const res = collect(observable);
expect(await res).toEqual([error]);
});
test('should handle outer error', async () => {
const outer$ = new Subject<'x'>();
const inner$ = {
x: new Subject(),
};
const observable = k$(outer$)(switchMap(x => inner$[x]));
const res = collect(observable);
outer$.next('x');
inner$.x.next('a');
inner$.x.next('b');
inner$.x.next('c');
const error = new Error('foo');
outer$.error(error);
inner$.x.next('d');
inner$.x.next('e');
expect(await res).toEqual(['a', 'b', 'c', error]);
});
test('should raise error when projection throws', async () => {
const outer$ = new Subject<string>();
const error = new Error('foo');
const observable = k$(outer$)(
switchMap(x => {
throw error;
})
);
const res = collect(observable);
outer$.next('x');
expect(await res).toEqual([error]);
});
test('should switch inner cold observables, outer is unsubscribed early', () => {
const outer$ = new Subject<'x' | 'y'>();
const inner$ = {
x: new Subject(),
y: new Subject(),
};
const actual: any[] = [];
const sub = k$(outer$)(switchMap(x => inner$[x])).subscribe({
next(val) {
actual.push(val);
},
});
outer$.next('x');
inner$.x.next('foo');
inner$.x.next('bar');
outer$.next('y');
inner$.y.next('baz');
inner$.y.next('quux');
sub.unsubscribe();
inner$.x.next('post x');
inner$.x.complete();
inner$.y.next('post y');
inner$.y.complete();
expect(actual).toEqual(['foo', 'bar', 'baz', 'quux']);
});

View file

@ -1,85 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { toPromise } from '../';
import { k$ } from '../../k';
import { Subject } from '../../subject';
// Promises are always async, so we add a simple helper that we can `await` to
// make sure they have completed.
const tick = () => Promise.resolve();
test('returns the last value', async () => {
const values$ = new Subject();
const resolved = jest.fn();
const rejected = jest.fn();
k$(values$)(toPromise()).then(resolved, rejected);
values$.next('foo');
await tick();
expect(resolved).not.toHaveBeenCalled();
expect(rejected).not.toHaveBeenCalled();
values$.next('bar');
await tick();
expect(resolved).not.toHaveBeenCalled();
expect(rejected).not.toHaveBeenCalled();
values$.complete();
await tick();
expect(resolved).toHaveBeenCalledTimes(1);
expect(resolved).toHaveBeenCalledWith('bar');
expect(rejected).not.toHaveBeenCalled();
});
test('resolves even if no values received', async () => {
const values$ = new Subject();
const resolved = jest.fn();
const rejected = jest.fn();
k$(values$)(toPromise()).then(resolved, rejected);
values$.complete();
await tick();
expect(rejected).not.toHaveBeenCalled();
expect(resolved).toHaveBeenCalledTimes(1);
});
test('rejects if error received', async () => {
const values$ = new Subject();
const resolved = jest.fn();
const rejected = jest.fn();
k$(values$)(toPromise()).then(resolved, rejected);
values$.error(new Error('fail'));
await tick();
expect(resolved).not.toHaveBeenCalled();
expect(rejected).toHaveBeenCalledTimes(1);
expect(rejected.mock.calls).toMatchSnapshot();
});

View file

@ -1,64 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { MonoTypeOperatorFunction } from '../interfaces';
import { Observable } from '../observable';
/**
* Filter items emitted by the source Observable by only emitting those that
* satisfy a specified predicate.
*
* @param predicate A function that evaluates each value emitted by the source
* Observable. If it returns `true`, the value is emitted, if `false` the value
* is not passed to the output Observable. The `index` parameter is the number
* `i` for the i-th source emission that has happened since the subscription,
* starting from the number `0`.
* @return An Observable of values from the source that were allowed by the
* `predicate` function.
*/
export function filter<T>(
predicate: (value: T, index: number) => boolean
): MonoTypeOperatorFunction<T> {
return function filterOperation(source) {
return new Observable(observer => {
let i = 0;
return source.subscribe({
next(value) {
let result = false;
try {
result = predicate(value, i++);
} catch (e) {
observer.error(e);
return;
}
if (result) {
observer.next(value);
}
},
error(error) {
observer.error(error);
},
complete() {
observer.complete();
},
});
});
};
}

View file

@ -1,51 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { EmptyError } from '../errors';
import { MonoTypeOperatorFunction } from '../interfaces';
import { Observable } from '../observable';
/**
* Emits the first value emitted by the source Observable, then immediately
* completes.
*
* @throws {EmptyError} Delivers an EmptyError to the Observer's `error`
* callback if the Observable completes before any `next` notification was sent.
*
* @returns An Observable of the first item received.
*/
export function first<T>(): MonoTypeOperatorFunction<T> {
return function firstOperation(source) {
return new Observable(observer => {
return source.subscribe({
next(value) {
observer.next(value);
observer.complete();
},
error(error) {
observer.error(error);
},
complete() {
// The only time we end up here, is if we never received any values.
observer.error(new EmptyError('first()'));
},
});
});
};
}

View file

@ -1,62 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { $fromCallback } from '../factories';
import { MonoTypeOperatorFunction } from '../interfaces';
import { Observable } from '../observable';
/**
* Modifies a stream so that when the source completes without emitting any
* values a new observable is created via `factory()` (see `$fromCallback`) that
* will be mirrored to completion.
*
* @param factory
* @return
*/
export function ifEmpty<T>(factory: () => T): MonoTypeOperatorFunction<T> {
return function ifEmptyOperation(source) {
return new Observable(observer => {
let hasReceivedValue = false;
const subs = [
source.subscribe({
next(value) {
hasReceivedValue = true;
observer.next(value);
},
error(error) {
observer.error(error);
},
complete() {
if (hasReceivedValue) {
observer.complete();
} else {
subs.push($fromCallback(factory).subscribe(observer));
}
},
}),
];
return () => {
subs.forEach(sub => sub.unsubscribe());
subs.length = 0;
};
});
};
}

View file

@ -1,31 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { ifEmpty } from './if_empty';
export { last } from './last';
export { first } from './first';
export { map } from './map';
export { filter } from './filter';
export { reduce } from './reduce';
export { scan } from './scan';
export { toArray } from './to_array';
export { switchMap } from './switch_map';
export { mergeMap } from './merge_map';
export { skipRepeats } from './skip_repeats';
export { toPromise } from './to_promise';

View file

@ -1,58 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { EmptyError } from '../errors';
import { MonoTypeOperatorFunction } from '../interfaces';
import { Observable } from '../observable';
/**
* Emits the last value emitted by the source Observable, then immediately
* completes.
*
* @throws {EmptyError} Delivers an EmptyError to the Observer's `error`
* callback if the Observable completes before any `next` notification was sent.
*
* @returns An Observable of the last item received.
*/
export function last<T>(): MonoTypeOperatorFunction<T> {
return function lastOperation(source) {
return new Observable(observer => {
let hasReceivedValue = false;
let latest: T;
return source.subscribe({
next(value) {
hasReceivedValue = true;
latest = value;
},
error(error) {
observer.error(error);
},
complete() {
if (hasReceivedValue) {
observer.next(latest);
observer.complete();
} else {
observer.error(new EmptyError('last()'));
}
},
});
});
};
}

View file

@ -1,58 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { OperatorFunction } from '../interfaces';
import { Observable } from '../observable';
/**
* Modifies each value from the source by passing it to `fn(item, i)` and
* emitting the return value of that function instead.
*
* @param fn The function to apply to each `value` emitted by the source
* Observable. The `index` parameter is the number `i` for the i-th emission
* that has happened since the subscription, starting from the number `0`.
* @return An Observable that emits the values from the source Observable
* transformed by the given `fn` function.
*/
export function map<T, R>(fn: (value: T, index: number) => R): OperatorFunction<T, R> {
return function mapOperation(source) {
return new Observable(observer => {
let i = 0;
return source.subscribe({
next(value) {
let result: R;
try {
result = fn(value, i++);
} catch (e) {
observer.error(e);
return;
}
observer.next(result);
},
error(error) {
observer.error(error);
},
complete() {
observer.complete();
},
});
});
};
}

View file

@ -1,118 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { $from } from '../factories';
import { OperatorFunction } from '../interfaces';
import { Observable, ObservableInput } from '../observable';
/**
* Projects each source value to an Observable which is merged in the output
* Observable.
*
* Example:
*
* ```js
* const source = Observable.from([1, 2, 3]);
* const observable = k$(source)(
* mergeMap(x => Observable.of('a', x + 1))
* );
* ```
*
* Results in the following items emitted:
* - a
* - 2
* - a
* - 3
* - a
* - 4
*
* As you can see it merges the returned observable and emits every value from
* that observable. You can think of it as being the same as `flatMap` on an
* array, just that you return an Observable instead of an array.
*
* For more complex use-cases where you need the source variable for each value
* in the newly created observable, an often used pattern is using `map` within
* the `mergeMap`. E.g. let's say we want to return both the current value and
* the newly created value:
*
* ```js
* mergeMap(val =>
* k$(someFn(val))(
* map(newVal => ({ val, newVal })
* )
* )
* ```
*
* Here you would go from having an observable of `val`s, to having an
* observable of `{ val, newVal }` objects.
*
* @param project A function that, when applied to an item emitted by the source
* Observable, returns an Observable.
*/
export function mergeMap<T, R>(
project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R> {
return function mergeMapOperation(source) {
return new Observable(destination => {
let completed = false;
let active = 0;
let i = 0;
source.subscribe({
next(value) {
let result;
try {
result = project(value, i++);
} catch (error) {
destination.error(error);
return;
}
active++;
$from(result).subscribe({
next(innerValue) {
destination.next(innerValue);
},
error(err) {
destination.error(err);
},
complete() {
active--;
if (active === 0 && completed) {
destination.complete();
}
},
});
},
error(err) {
destination.error(err);
},
complete() {
completed = true;
if (active === 0) {
destination.complete();
}
},
});
});
};
}

View file

@ -1,51 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { OperatorFunction } from '../interfaces';
import { pipe } from '../lib';
import { ifEmpty } from './if_empty';
import { last } from './last';
import { scan } from './scan';
/**
* Applies the accumulator function to every value in the source observable and
* emits the return value when the source completes.
*
* It's like {@link scan}, but only emits when the source observable completes,
* not the current accumulation whenever the source emits a value.
*
* If no values are emitted, the `initialValue` will be emitted.
*
* @param accumulator The accumulator function called on each source value.
* @param initialValue The initial accumulation value.
* @return An Observable that emits a single value that is the result of
* accumulating the values emitted by the source Observable.
*/
export function reduce<T, R>(
accumulator: (acc: R, value: T, index: number) => R,
initialValue: R
): OperatorFunction<T, R> {
return function reduceOperation(source) {
return pipe(
scan(accumulator, initialValue),
ifEmpty(() => initialValue),
last()
)(source);
};
}

View file

@ -1,64 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { OperatorFunction } from '../interfaces';
import { Observable } from '../observable';
/**
* Applies the accumulator function to every value in the source stream and
* emits the return value of each invocation.
*
* It's like {@link reduce}, but emits the current accumulation whenever the
* source emits a value instead of emitting only when completed.
*
* @param accumulator The accumulator function called on each source value.
* @param initialValue The initial accumulation value.
* @return An observable of the accumulated values.
*/
export function scan<T, R>(
accumulator: (acc: R, value: T, index: number) => R,
initialValue: R
): OperatorFunction<T, R> {
return function scanOperation(source) {
return new Observable(observer => {
let i = -1;
let acc = initialValue;
return source.subscribe({
next(value) {
i += 1;
try {
acc = accumulator(acc, value, i);
observer.next(acc);
} catch (error) {
observer.error(error);
}
},
error(error) {
observer.error(error);
},
complete() {
observer.complete();
},
});
});
};
}

View file

@ -1,68 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { MonoTypeOperatorFunction } from '../interfaces';
import { Observable } from '../observable';
const isStrictlyEqual = (a: any, b: any) => a === b;
/**
* Returns an Observable that emits all items emitted by the source Observable
* that are not equal to the previous item.
*
* @param [equals] Optional comparison function called to test if an item is
* equal to the previous item in the source. Should return `true` if equal,
* otherwise `false`. By default compares using reference equality, aka `===`.
* @return An Observable that emits items from the source Observable with
* distinct values.
*/
export function skipRepeats<T>(
equals: (x: T, y: T) => boolean = isStrictlyEqual
): MonoTypeOperatorFunction<T> {
return function skipRepeatsOperation(source) {
return new Observable(observer => {
let hasInitialValue = false;
let currentValue: T;
return source.subscribe({
next(value) {
if (!hasInitialValue) {
hasInitialValue = true;
currentValue = value;
observer.next(value);
return;
}
const isEqual = equals(currentValue, value);
if (!isEqual) {
observer.next(value);
currentValue = value;
}
},
error(error) {
observer.error(error);
},
complete() {
observer.complete();
},
});
});
};
}

View file

@ -1,121 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { OperatorFunction } from '../interfaces';
import { Observable, Subscription } from '../observable';
/**
* Projects each source value to an Observable which is merged in the output
* Observable, emitting values only from the most recently projected Observable.
*
* To understand how `switchMap` works, take a look at:
* https://medium.com/@w.dave.w/becoming-more-reactive-with-rxjs-flatmap-and-switchmap-ccd3fb7b67fa
*
* It's kinda like a normal `flatMap`, except it's producing observables and you
* _only_ care about the latest observable it produced. One use-case for
* `switchMap` is if need to control what happens both when you create and when
* you're done with an observable, like in the example below where we want to
* write the pid file when we receive a pid config, and delete it when we
* receive new config values (or when we stop the pid service).
*
* ```js
* switchMap(config => {
* return new Observable(() => {
* const pid = new PidFile(config);
* pid.writeFile();
*
* // Whenever a new observable is returned, `switchMap` will unsubscribe
* // from the previous observable. That means that we can e.g. run teardown
* // logic in the unsubscribe.
* return function unsubscribe() {
* pid.deleteFile();
* };
* });
* });
* ```
*
* Another example could be emitting a value X seconds after receiving it from
* the source observable, but cancelling if another value is received before the
* timeout, e.g.
*
* ```js
* switchMap(value => {
* return new Observable(observer => {
* const id = setTimeout(() => {
* observer.next(value);
* }, 5000);
*
* return function unsubscribe() {
* clearTimeout(id);
* };
* });
* });
* ```
*/
export function switchMap<T, R>(
project: (value: T, index: number) => Observable<R>
): OperatorFunction<T, R> {
return function switchMapOperation(source) {
return new Observable(observer => {
let i = 0;
let innerSubscription: Subscription | undefined;
return source.subscribe({
next(value) {
let result;
try {
result = project(value, i++);
} catch (error) {
observer.error(error);
return;
}
if (innerSubscription !== undefined) {
innerSubscription.unsubscribe();
}
innerSubscription = result.subscribe({
next(innerVal) {
observer.next(innerVal);
},
error(err) {
observer.error(err);
},
});
},
error(err) {
if (innerSubscription !== undefined) {
innerSubscription.unsubscribe();
innerSubscription = undefined;
}
observer.error(err);
},
complete() {
if (innerSubscription !== undefined) {
innerSubscription.unsubscribe();
innerSubscription = undefined;
}
observer.complete();
},
});
});
};
}

View file

@ -1,36 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { OperatorFunction } from '../interfaces';
import { Observable } from '../observable';
import { reduce } from './reduce';
function concat<T>(source: Observable<T>) {
return reduce<T, T[]>((acc, item) => acc.concat([item]), [] as T[])(source);
}
/**
* Modify a stream to produce a single array containing all of the items emitted
* by source.
*/
export function toArray<T>(): OperatorFunction<T, T[]> {
return function toArrayOperation(source) {
return concat(source);
};
}

View file

@ -1,41 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { UnaryFunction } from '../interfaces';
import { Observable } from '../observable';
export function toPromise<T>(): UnaryFunction<Observable<T>, Promise<T>> {
return function toPromiseOperation(source) {
return new Promise((resolve, reject) => {
let lastValue: T;
source.subscribe({
next(value) {
lastValue = value;
},
error(error) {
reject(error);
},
complete() {
resolve(lastValue);
},
});
});
};
}

View file

@ -1,114 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable, SubscriptionObserver } from './observable';
/**
* A Subject is a special type of Observable that allows values to be
* multicasted to many Observers. While plain Observables are unicast (each
* subscribed Observer owns an independent execution of the Observable),
* Subjects are multicast.
*
* Every Subject is an Observable. Given a Subject, you can subscribe to it in
* the same way you subscribe to any Observable, and you will start receiving
* values normally. From the perspective of the Observer, it cannot tell whether
* the Observable execution is coming from a plain unicast Observable or a
* Subject.
*
* Internally to the Subject, `subscribe` does not invoke a new execution that
* delivers values. It simply registers the given Observer in a list of
* Observers, similarly to how `addListener` usually works in other libraries
* and languages.
*
* Every Subject is an Observer. It is an object with the methods `next(v)`,
* `error(e)`, and `complete()`. To feed a new value to the Subject, just call
* `next(theValue)`, and it will be multicasted to the Observers registered to
* listen to the Subject.
*
* Learn more about Subjects:
* - http://reactivex.io/documentation/subject.html
* - http://davesexton.com/blog/post/To-Use-Subject-Or-Not-To-Use-Subject.aspx
*/
export class Subject<T> extends Observable<T> {
protected observers: Set<SubscriptionObserver<T>> = new Set();
protected isStopped = false;
protected thrownError?: Error;
constructor() {
super(observer => this.registerObserver(observer));
}
/**
* @param value The value that will be forwarded to every observer subscribed
* to this subject.
*/
public next(value: T) {
for (const observer of this.observers) {
observer.next(value);
}
}
/**
* @param error The error that will be forwarded to every observer subscribed
* to this subject.
*/
public error(error: Error) {
this.thrownError = error;
this.isStopped = true;
for (const observer of this.observers) {
observer.error(error);
}
this.observers.clear();
}
/**
* Completes all the subscribed observers, then clears the list of observers.
*/
public complete() {
this.isStopped = true;
for (const observer of this.observers) {
observer.complete();
}
this.observers.clear();
}
/**
* Returns an observable, so the observer methods are hidden.
*/
public asObservable(): Observable<T> {
return new Observable(observer => this.subscribe(observer));
}
protected registerObserver(observer: SubscriptionObserver<T>) {
if (this.isStopped) {
if (this.thrownError !== undefined) {
observer.error(this.thrownError);
} else {
observer.complete();
}
} else {
this.observers.add(observer);
return () => this.observers.delete(observer);
}
}
}

View file

@ -18,7 +18,8 @@
*/
/* tslint:disable max-classes-per-file */
import { BehaviorSubject, first, k$, toPromise } from '../../../lib/kbn_observable';
import { BehaviorSubject } from 'rxjs';
import { first } from 'rxjs/operators';
import { schema, Type, TypeOf } from '../schema';
import { ConfigService, ObjectToRawConfigAdapter } from '..';
@ -34,7 +35,7 @@ test('returns config at path as observable', async () => {
const configService = new ConfigService(config$, defaultEnv, logger);
const configs = configService.atPath('key', ExampleClassWithStringSchema);
const exampleConfig = await k$(configs)(first(), toPromise());
const exampleConfig = await configs.pipe(first()).toPromise();
expect(exampleConfig.value).toBe('foo');
});
@ -48,7 +49,7 @@ test('throws if config at path does not match schema', async () => {
const configs = configService.atPath('key', ExampleClassWithStringSchema);
try {
await k$(configs)(first(), toPromise());
await configs.pipe(first()).toPromise();
} catch (e) {
expect(e.message).toMatchSnapshot();
}
@ -59,7 +60,7 @@ test("returns undefined if fetching optional config at a path that doesn't exist
const configService = new ConfigService(config$, defaultEnv, logger);
const configs = configService.optionalAtPath('unique-name', ExampleClassWithStringSchema);
const exampleConfig = await k$(configs)(first(), toPromise());
const exampleConfig = await configs.pipe(first()).toPromise();
expect(exampleConfig).toBeUndefined();
});
@ -69,7 +70,7 @@ test('returns observable config at optional path if it exists', async () => {
const configService = new ConfigService(config$, defaultEnv, logger);
const configs = configService.optionalAtPath('value', ExampleClassWithStringSchema);
const exampleConfig: any = await k$(configs)(first(), toPromise());
const exampleConfig: any = await configs.pipe(first()).toPromise();
expect(exampleConfig).toBeDefined();
expect(exampleConfig.value).toBe('bar');
@ -114,7 +115,7 @@ test("throws error if config class does not implement 'schema'", async () => {
const configs = configService.atPath('key', ExampleClass as any);
try {
await k$(configs)(first(), toPromise());
await configs.pipe(first()).toPromise();
} catch (e) {
expect(e).toMatchSnapshot();
}
@ -198,7 +199,7 @@ test('correctly passes context', async () => {
)
);
expect(await k$(configs)(first(), toPromise())).toMatchSnapshot();
expect(await configs.pipe(first()).toPromise()).toMatchSnapshot();
});
test('handles enabled path, but only marks the enabled path as used', async () => {

View file

@ -23,7 +23,7 @@ jest.mock('../read_config', () => ({
getConfigFromFile: mockGetConfigFromFile,
}));
import { first, k$, toPromise } from '../../../lib/kbn_observable';
import { first } from 'rxjs/operators';
import { RawConfigService } from '../raw_config_service';
const configFile = '/config/kibana.yml';
@ -63,7 +63,10 @@ test('returns config at path as observable', async () => {
configService.loadConfig();
const exampleConfig = await k$(configService.getConfig$())(first(), toPromise());
const exampleConfig = await configService
.getConfig$()
.pipe(first())
.toPromise();
expect(exampleConfig.get('key')).toEqual('value');
expect(exampleConfig.getFlattenedPaths()).toEqual(['key']);

View file

@ -18,7 +18,8 @@
*/
import { isEqual } from 'lodash';
import { first, k$, map, Observable, skipRepeats, toPromise } from '../../lib/kbn_observable';
import { Observable } from 'rxjs';
import { distinctUntilChanged, first, map } from 'rxjs/operators';
import { Logger, LoggerFactory } from '../logging';
import { ConfigWithSchema } from './config_with_schema';
@ -65,7 +66,7 @@ export class ConfigService {
path: ConfigPath,
ConfigClass: ConfigWithSchema<Schema, Config>
) {
return k$(this.getDistinctRawConfig(path))(
return this.getDistinctRawConfig(path).pipe(
map(rawConfig => this.createConfig(path, rawConfig, ConfigClass))
);
}
@ -80,7 +81,7 @@ export class ConfigService {
path: ConfigPath,
ConfigClass: ConfigWithSchema<Schema, Config>
) {
return k$(this.getDistinctRawConfig(path))(
return this.getDistinctRawConfig(path).pipe(
map(
rawConfig =>
rawConfig === undefined ? undefined : this.createConfig(path, rawConfig, ConfigClass)
@ -91,7 +92,7 @@ export class ConfigService {
public async isEnabledAtPath(path: ConfigPath) {
const enabledPath = createPluginEnabledPath(path);
const config = await k$(this.config$)(first(), toPromise());
const config = await this.config$.pipe(first()).toPromise();
if (!config.has(enabledPath)) {
return true;
@ -114,7 +115,7 @@ export class ConfigService {
}
public async getUnusedPaths(): Promise<string[]> {
const config = await k$(this.config$)(first(), toPromise());
const config = await this.config$.pipe(first()).toPromise();
const handledPaths = this.handledPaths.map(pathToString);
return config.getFlattenedPaths().filter(path => !isPathHandled(path, handledPaths));
@ -153,7 +154,7 @@ export class ConfigService {
private getDistinctRawConfig(path: ConfigPath) {
this.markAsHandled(path);
return k$(this.config$)(map(config => config.get(path)), skipRepeats(isEqual));
return this.config$.pipe(map(config => config.get(path)), distinctUntilChanged(isEqual));
}
private markAsHandled(path: ConfigPath) {

View file

@ -18,15 +18,9 @@
*/
import { isEqual, isPlainObject } from 'lodash';
import { BehaviorSubject, Observable } from 'rxjs';
import { distinctUntilChanged, filter, map } from 'rxjs/operators';
import typeDetect from 'type-detect';
import {
BehaviorSubject,
filter,
k$,
map,
Observable,
skipRepeats,
} from '../../lib/kbn_observable';
import { ObjectToRawConfigAdapter } from './object_to_raw_config_adapter';
import { RawConfig } from './raw_config';
@ -46,12 +40,12 @@ export class RawConfigService {
* As we have a notion of a _current_ config we rely on a BehaviorSubject so
* every new subscription will immediately receive the current config.
*/
private readonly rawConfigFromFile$: BehaviorSubject<any> = new BehaviorSubject(notRead);
private readonly rawConfigFromFile$ = new BehaviorSubject<any>(notRead);
private readonly config$: Observable<RawConfig>;
constructor(readonly configFile: string) {
this.config$ = k$(this.rawConfigFromFile$)(
this.config$ = this.rawConfigFromFile$.pipe(
filter(rawConfig => rawConfig !== notRead),
map(rawConfig => {
// If the raw config is null, e.g. if empty config file, we default to
@ -68,7 +62,7 @@ export class RawConfigService {
throw new Error(`the raw config must be an object, got [${typeDetect(rawConfig)}]`);
}),
// We only want to update the config if there are changes to it
skipRepeats(isEqual)
distinctUntilChanged(isEqual)
);
}

View file

@ -26,7 +26,7 @@ jest.mock('../http_server', () => ({
}));
import { noop } from 'lodash';
import { BehaviorSubject } from '../../../lib/kbn_observable';
import { BehaviorSubject } from 'rxjs';
import { Env } from '../../config';
import { logger } from '../../logging/__mocks__';

View file

@ -17,7 +17,8 @@
* under the License.
*/
import { first, k$, Observable, Subscription, toPromise } from '../../lib/kbn_observable';
import { Observable, Subscription } from 'rxjs';
import { first } from 'rxjs/operators';
import { CoreService } from '../../types/core_service';
import { Env } from '../config';
@ -52,7 +53,7 @@ export class HttpService implements CoreService {
}
});
const config = await k$(this.config$)(first(), toPromise());
const config = await this.config$.pipe(first()).toPromise();
// If a redirect port is specified, we start an HTTP server at this port and
// redirect all requests to the SSL port.

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { Observable } from '../../lib/kbn_observable';
import { Observable } from 'rxjs';
import { Env } from '../config';
import { LoggerFactory } from '../logging';

View file

@ -17,6 +17,8 @@
* under the License.
*/
import { BehaviorSubject } from 'rxjs';
import { map } from 'rxjs/operators';
/** @internal */
export { LegacyPlatformProxifier } from './legacy_platform_proxifier';
/** @internal */
@ -30,7 +32,6 @@ import {
LegacyKbnServer,
LegacyPlatformProxifier,
} from '.';
import { BehaviorSubject, k$, map } from '../../lib/kbn_observable';
import { Env } from '../config';
import { Root } from '../root';
import { BasePathProxyRoot } from '../root/base_path_proxy_root';
@ -39,7 +40,7 @@ function initEnvironment(rawKbnServer: any) {
const config: LegacyConfig = rawKbnServer.config;
const legacyConfig$ = new BehaviorSubject(config);
const config$ = k$(legacyConfig$)(
const config$ = legacyConfig$.pipe(
map(legacyConfig => new LegacyConfigToRawConfigAdapter(legacyConfig))
);

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { BehaviorSubject } from '../../../lib/kbn_observable';
import { BehaviorSubject } from 'rxjs';
import { MutableLoggerFactory } from '../logger_factory';
import { LoggingConfig } from '../logging_config';
import { LoggingService } from '../logging_service';

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { Observable, Subscription } from '../../lib/kbn_observable';
import { Observable, Subscription } from 'rxjs';
import { MutableLoggerFactory } from './logger_factory';
import { LoggingConfig } from './logging_config';

View file

@ -49,7 +49,7 @@ const mockMutableLoggerFactory = jest.fn(() => logger);
const mockLoggingService = jest.fn(() => loggingService);
import { BehaviorSubject } from '../../../lib/kbn_observable';
import { BehaviorSubject } from 'rxjs';
jest.mock('../../config', () => ({ ConfigService: mockConfigService }));
jest.mock('../../', () => ({ Server: mockServer }));

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { first, k$, toPromise } from '../../lib/kbn_observable';
import { first } from 'rxjs/operators';
import { Root } from '.';
import { DevConfig } from '../dev';
@ -35,8 +35,14 @@ export class BasePathProxyRoot extends Root {
shouldRedirectFromOldBasePath,
}: Pick<BasePathProxyServerOptions, 'blockUntil' | 'shouldRedirectFromOldBasePath'>) {
const [devConfig, httpConfig] = await Promise.all([
k$(this.configService.atPath('dev', DevConfig))(first(), toPromise()),
k$(this.configService.atPath('server', HttpConfig))(first(), toPromise()),
this.configService
.atPath('dev', DevConfig)
.pipe(first())
.toPromise(),
this.configService
.atPath('server', HttpConfig)
.pipe(first())
.toPromise(),
]);
this.basePathProxy = new BasePathProxyServer(this.logger.get('server'), {

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { Observable } from '../../lib/kbn_observable';
import { Observable } from 'rxjs';
import { Server } from '..';
import { ConfigService, Env, RawConfig } from '../config';