-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(catchError): add higher-order lettable version of
catch
NOTE: The name is now `catchError` because `catch` is an invalid name for a function
- Loading branch information
Showing
3 changed files
with
119 additions
and
51 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import { Operator } from '../Operator'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { Observable, ObservableInput } from '../Observable'; | ||
|
||
import { OuterSubscriber } from '../OuterSubscriber'; | ||
import { subscribeToResult } from '../util/subscribeToResult'; | ||
import { OperatorFunction } from '../interfaces'; | ||
|
||
/** | ||
* Catches errors on the observable to be handled by returning a new observable or throwing an error. | ||
* | ||
* <img src="./img/catch.png" width="100%"> | ||
* | ||
* @example <caption>Continues with a different Observable when there's an error</caption> | ||
* | ||
* Observable.of(1, 2, 3, 4, 5) | ||
* .map(n => { | ||
* if (n == 4) { | ||
* throw 'four!'; | ||
* } | ||
* return n; | ||
* }) | ||
* .catch(err => Observable.of('I', 'II', 'III', 'IV', 'V')) | ||
* .subscribe(x => console.log(x)); | ||
* // 1, 2, 3, I, II, III, IV, V | ||
* | ||
* @example <caption>Retries the caught source Observable again in case of error, similar to retry() operator</caption> | ||
* | ||
* Observable.of(1, 2, 3, 4, 5) | ||
* .map(n => { | ||
* if (n === 4) { | ||
* throw 'four!'; | ||
* } | ||
* return n; | ||
* }) | ||
* .catch((err, caught) => caught) | ||
* .take(30) | ||
* .subscribe(x => console.log(x)); | ||
* // 1, 2, 3, 1, 2, 3, ... | ||
* | ||
* @example <caption>Throws a new error when the source Observable throws an error</caption> | ||
* | ||
* Observable.of(1, 2, 3, 4, 5) | ||
* .map(n => { | ||
* if (n == 4) { | ||
* throw 'four!'; | ||
* } | ||
* return n; | ||
* }) | ||
* .catch(err => { | ||
* throw 'error in source. Details: ' + err; | ||
* }) | ||
* .subscribe( | ||
* x => console.log(x), | ||
* err => console.log(err) | ||
* ); | ||
* // 1, 2, 3, error in source. Details: four! | ||
* | ||
* @param {function} selector a function that takes as arguments `err`, which is the error, and `caught`, which | ||
* is the source observable, in case you'd like to "retry" that observable by returning it again. Whatever observable | ||
* is returned by the `selector` will be used to continue the observable chain. | ||
* @return {Observable} An observable that originates from either the source or the observable returned by the | ||
* catch `selector` function. | ||
* @name catchError | ||
*/ | ||
export function catchError<T, R>(selector: (err: any, caught: Observable<T>) => ObservableInput<R>): OperatorFunction<T, T | R> { | ||
return function catchErrorOperatorFunction(source: Observable<T>): Observable<T | R> { | ||
const operator = new CatchOperator(selector); | ||
const caught = source.lift(operator); | ||
return (operator.caught = caught); | ||
}; | ||
} | ||
|
||
class CatchOperator<T, R> implements Operator<T, T | R> { | ||
caught: Observable<T>; | ||
|
||
constructor(private selector: (err: any, caught: Observable<T>) => ObservableInput<T | R>) { | ||
} | ||
|
||
call(subscriber: Subscriber<R>, source: any): any { | ||
return source.subscribe(new CatchSubscriber(subscriber, this.selector, this.caught)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class CatchSubscriber<T, R> extends OuterSubscriber<T, T | R> { | ||
constructor(destination: Subscriber<any>, | ||
private selector: (err: any, caught: Observable<T>) => ObservableInput<T | R>, | ||
private caught: Observable<T>) { | ||
super(destination); | ||
} | ||
|
||
// NOTE: overriding `error` instead of `_error` because we don't want | ||
// to have this flag this subscriber as `isStopped`. We can mimic the | ||
// behavior of the RetrySubscriber (from the `retry` operator), where | ||
// we unsubscribe from our source chain, reset our Subscriber flags, | ||
// then subscribe to the selector result. | ||
error(err: any) { | ||
if (!this.isStopped) { | ||
let result: any; | ||
try { | ||
result = this.selector(err, this.caught); | ||
} catch (err2) { | ||
super.error(err2); | ||
return; | ||
} | ||
this._unsubscribeAndRecycle(); | ||
this.add(subscribeToResult(this, result)); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters