mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
[ui/utils] share sync subscribe logic (#23341)
In `ui/public/config/config.js` we have subscription logic that delivers values from an observable synchronously and also ensuring that values are received within a digest cycle. I need to do the same in #23217 but want to keep as few checks for `$rootScope.$$phase` as possible, so I broke the subscription logic into a shared util. In order to make the utility a little more helpful it will also trigger fatal errors if an observable errors without having an error handler, or if `observer.next()`, `observer.error()`, or `observer.complete()` throw, which would normally be swallowed by RxJS.
This commit is contained in:
parent
aa85072ca6
commit
c0e13fdb2b
3 changed files with 234 additions and 12 deletions
|
@ -21,6 +21,7 @@ import angular from 'angular';
|
|||
import chrome from '../chrome';
|
||||
import { isPlainObject } from 'lodash';
|
||||
import { uiModules } from '../modules';
|
||||
import { subscribeWithScope } from '../utils/subscribe_with_scope';
|
||||
|
||||
const module = uiModules.get('kibana/config');
|
||||
|
||||
|
@ -59,20 +60,11 @@ module.service(`config`, function ($rootScope, Promise) {
|
|||
//* angular specific methods *
|
||||
//////////////////////////////
|
||||
|
||||
const subscription = uiSettings.getUpdate$().subscribe(({ key, newValue, oldValue }) => {
|
||||
const emit = () => {
|
||||
const subscription = subscribeWithScope($rootScope, uiSettings.getUpdate$(), {
|
||||
next: ({ key, newValue, oldValue }) => {
|
||||
$rootScope.$broadcast('change:config', newValue, oldValue, key, this);
|
||||
$rootScope.$broadcast(`change:config.${key}`, newValue, oldValue, key, this);
|
||||
};
|
||||
|
||||
// this is terrible, but necessary to emulate the same API
|
||||
// that the `config` service had before where changes were
|
||||
// emitted to scopes synchronously. All methods that don't
|
||||
// require knowing if we are currently in a digest cycle are
|
||||
// async and would deliver events too late for several usecases
|
||||
//
|
||||
// If you copy this code elsewhere you better have a good reason :)
|
||||
$rootScope.$$phase ? emit() : $rootScope.$apply(emit);
|
||||
}
|
||||
});
|
||||
$rootScope.$on('$destroy', () => subscription.unsubscribe());
|
||||
|
||||
|
|
154
src/ui/public/utils/subscribe_with_scope.test.ts
Normal file
154
src/ui/public/utils/subscribe_with_scope.test.ts
Normal file
|
@ -0,0 +1,154 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
const mockFatalError = jest.fn();
|
||||
jest.mock('ui/notify/fatal_error', () => ({
|
||||
fatalError: mockFatalError,
|
||||
}));
|
||||
|
||||
import * as Rx from 'rxjs';
|
||||
import { subscribeWithScope } from './subscribe_with_scope';
|
||||
|
||||
let $rootScope: Scope;
|
||||
|
||||
class Scope {
|
||||
public $$phase?: string;
|
||||
public $root = $rootScope;
|
||||
public $apply = jest.fn((fn: () => void) => fn());
|
||||
}
|
||||
|
||||
$rootScope = new Scope();
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
it('subscribes to the passed observable, returns subscription', () => {
|
||||
const $scope = new Scope();
|
||||
|
||||
const unsubSpy = jest.fn();
|
||||
const subSpy = jest.fn(() => unsubSpy);
|
||||
const observable = new Rx.Observable(subSpy);
|
||||
|
||||
const subscription = subscribeWithScope($scope as any, observable);
|
||||
expect(subSpy).toHaveBeenCalledTimes(1);
|
||||
expect(unsubSpy).not.toHaveBeenCalled();
|
||||
|
||||
subscription.unsubscribe();
|
||||
|
||||
expect(subSpy).toHaveBeenCalledTimes(1);
|
||||
expect(unsubSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('calls observer.next() if already in a digest cycle, wraps in $scope.$apply if not', () => {
|
||||
const subject = new Rx.Subject();
|
||||
const nextSpy = jest.fn();
|
||||
const $scope = new Scope();
|
||||
|
||||
subscribeWithScope($scope as any, subject, { next: nextSpy });
|
||||
|
||||
subject.next();
|
||||
expect($scope.$apply).toHaveBeenCalledTimes(1);
|
||||
expect(nextSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
jest.clearAllMocks();
|
||||
|
||||
$rootScope.$$phase = '$digest';
|
||||
subject.next();
|
||||
expect($scope.$apply).not.toHaveBeenCalled();
|
||||
expect(nextSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('reports fatalError if observer.next() throws', () => {
|
||||
const $scope = new Scope();
|
||||
subscribeWithScope($scope as any, Rx.of(undefined), {
|
||||
next() {
|
||||
throw new Error('foo bar');
|
||||
},
|
||||
});
|
||||
|
||||
expect(mockFatalError.mock.calls).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Array [
|
||||
[Error: foo bar],
|
||||
],
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('reports fatal error if observer.error is not defined and observable errors', () => {
|
||||
const $scope = new Scope();
|
||||
const error = new Error('foo');
|
||||
error.stack = `${error.message}\n---stack trace ---`;
|
||||
subscribeWithScope($scope as any, Rx.throwError(error));
|
||||
|
||||
expect(mockFatalError.mock.calls).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Array [
|
||||
[Error: Uncaught error in subscribeWithScope(): foo
|
||||
---stack trace ---],
|
||||
],
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('reports fatal error if observer.error throws', () => {
|
||||
const $scope = new Scope();
|
||||
subscribeWithScope($scope as any, Rx.throwError(new Error('foo')), {
|
||||
error: () => {
|
||||
throw new Error('foo');
|
||||
},
|
||||
});
|
||||
|
||||
expect(mockFatalError.mock.calls).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Array [
|
||||
[Error: foo],
|
||||
],
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('does not report fatal error if observer.error handles the error', () => {
|
||||
const $scope = new Scope();
|
||||
subscribeWithScope($scope as any, Rx.throwError(new Error('foo')), {
|
||||
error: () => {
|
||||
// noop, swallow error
|
||||
},
|
||||
});
|
||||
|
||||
expect(mockFatalError.mock.calls).toEqual([]);
|
||||
});
|
||||
|
||||
it('reports fatal error if observer.complete throws', () => {
|
||||
const $scope = new Scope();
|
||||
subscribeWithScope($scope as any, Rx.EMPTY, {
|
||||
complete: () => {
|
||||
throw new Error('foo');
|
||||
},
|
||||
});
|
||||
|
||||
expect(mockFatalError.mock.calls).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Array [
|
||||
[Error: foo],
|
||||
],
|
||||
]
|
||||
`);
|
||||
});
|
76
src/ui/public/utils/subscribe_with_scope.ts
Normal file
76
src/ui/public/utils/subscribe_with_scope.ts
Normal file
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
import { IScope } from 'angular';
|
||||
import * as Rx from 'rxjs';
|
||||
import { fatalError } from 'ui/notify/fatal_error';
|
||||
|
||||
function callInDigest<T extends any[]>($scope: IScope, fn: (...args: T) => void, ...args: T) {
|
||||
try {
|
||||
// this is terrible, but necessary to synchronously deliver subscription values
|
||||
// to angular scopes. This is required by some APIs, like the `config` service,
|
||||
// and beneficial for root level directives where additional digest cycles make
|
||||
// kibana sluggish to load.
|
||||
//
|
||||
// If you copy this code elsewhere you better have a good reason :)
|
||||
if ($scope.$root.$$phase) {
|
||||
fn(...args);
|
||||
} else {
|
||||
$scope.$apply(() => fn(...args));
|
||||
}
|
||||
} catch (error) {
|
||||
fatalError(error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to an observable at a $scope, ensuring that the digest cycle
|
||||
* is run for subscriber hooks and routing errors to fatalError if not handled.
|
||||
*/
|
||||
export function subscribeWithScope<T>(
|
||||
$scope: IScope,
|
||||
observable: Rx.Observable<T>,
|
||||
observer?: Rx.PartialObserver<T>
|
||||
) {
|
||||
return observable.subscribe({
|
||||
next(value) {
|
||||
if (observer && observer.next) {
|
||||
callInDigest($scope, observer.next, value);
|
||||
}
|
||||
},
|
||||
error(error) {
|
||||
callInDigest($scope, () => {
|
||||
if (observer && observer.error) {
|
||||
observer.error(error);
|
||||
} else {
|
||||
throw new Error(
|
||||
`Uncaught error in subscribeWithScope(): ${
|
||||
error ? error.stack || error.message : error
|
||||
}`
|
||||
);
|
||||
}
|
||||
});
|
||||
},
|
||||
complete() {
|
||||
if (observer && observer.complete) {
|
||||
callInDigest($scope, observer.complete);
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue