Skip to content

Commit

Permalink
feat(onErrorResumeNextWith): renamed onErrorResumeNext and exported…
Browse files Browse the repository at this point in the history
… from the top level (#6755)

* feat(onErrorResumeNextWith): renamed `onErrorResumeNext` and exported from top level

The `onErrorResumeNext` operator is deprecated due to a rename. The rename was done so we can move all operator exports to the top level `rxjs` export site. We were not currently exporting that operator.

This commit also refactors to logic to be based on the creation function, which is more widely used

* docs: Update import information for onErrorResumeNext, fix info for partition
  • Loading branch information
benlesh authored Dec 3, 2022
1 parent b97fa05 commit 51e3b2c
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 44 deletions.
3 changes: 3 additions & 0 deletions api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ export declare function of<A extends readonly unknown[]>(...values: A): Observab
export declare function onErrorResumeNext<A extends readonly unknown[]>(sources: [...ObservableInputTuple<A>]): Observable<A[number]>;
export declare function onErrorResumeNext<A extends readonly unknown[]>(...sources: [...ObservableInputTuple<A>]): Observable<A[number]>;

export declare function onErrorResumeNextWith<T, A extends readonly unknown[]>(sources: [...ObservableInputTuple<A>]): OperatorFunction<T, T | A[number]>;
export declare function onErrorResumeNextWith<T, A extends readonly unknown[]>(...sources: [...ObservableInputTuple<A>]): OperatorFunction<T, T | A[number]>;

export interface Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}
Expand Down
3 changes: 1 addition & 2 deletions api_guard/dist/types/operators/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ export declare function multicast<T, O extends ObservableInput<any>>(subjectFact

export declare function observeOn<T>(scheduler: SchedulerLike, delay?: number): MonoTypeOperatorFunction<T>;

export declare function onErrorResumeNext<T, A extends readonly unknown[]>(sources: [...ObservableInputTuple<A>]): OperatorFunction<T, T | A[number]>;
export declare function onErrorResumeNext<T, A extends readonly unknown[]>(...sources: [...ObservableInputTuple<A>]): OperatorFunction<T, T | A[number]>;
export declare const onErrorResumeNext: typeof onErrorResumeNextWith;

export declare function pairwise<T>(): OperatorFunction<T, [T, T]>;

Expand Down
5 changes: 3 additions & 2 deletions docs_app/content/guide/importing.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ ending with `With`). Those are:
| [`combineLatest`](/api/operators/combineLatest) | {@link combineLatest} | {@link combineLatestWith} |
| [`concat`](/api/operators/concat) | {@link concat} | {@link concatWith} |
| [`merge`](/api/operators/merge) | {@link merge} | {@link mergeWith} |
| [`onErrorResumeNext`](/api/operators/onErrorResumeNext) | {@link onErrorResumeNext} | - |
| [`partition`](/api/operators/partition) | {@link partition} | - |
| [`onErrorResumeNext`](/api/operators/onErrorResumeNext) | {@link onErrorResumeNext} | {@link onErrorResumeNextWith} |
| [`race`](/api/operators/race) | {@link race} | {@link raceWith} |
| [`zip`](/api/operators/zip) | {@link zip} | {@link zipWith} |

`partition`, the operator, is a special case, as it is deprecated and you should be using the `partition` creation function exported from `'rxjs'` instead.

For example, the old and deprecated way of using [`merge`](/api/operators/merge) from `'rxjs/operators'`
is:

Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ export { mergeWith } from './internal/operators/mergeWith';
export { min } from './internal/operators/min';
export { multicast } from './internal/operators/multicast';
export { observeOn } from './internal/operators/observeOn';
export { onErrorResumeNextWith } from './internal/operators/onErrorResumeNextWith';
export { pairwise } from './internal/operators/pairwise';
export { pluck } from './internal/operators/pluck';
export { publish } from './internal/operators/publish';
Expand Down
28 changes: 25 additions & 3 deletions src/internal/observable/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Observable } from '../Observable';
import { ObservableInputTuple } from '../types';
import { EMPTY } from './empty';
import { onErrorResumeNext as onErrorResumeNextWith } from '../operators/onErrorResumeNext';
import { argsOrArgArray } from '../util/argsOrArgArray';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { noop } from '../util/noop';
import { innerFrom } from './innerFrom';

/* tslint:disable:max-line-length */
export function onErrorResumeNext<A extends readonly unknown[]>(sources: [...ObservableInputTuple<A>]): Observable<A[number]>;
Expand Down Expand Up @@ -75,5 +76,26 @@ export function onErrorResumeNext<A extends readonly unknown[]>(...sources: [...
export function onErrorResumeNext<A extends readonly unknown[]>(
...sources: [[...ObservableInputTuple<A>]] | [...ObservableInputTuple<A>]
): Observable<A[number]> {
return onErrorResumeNextWith(argsOrArgArray(sources))(EMPTY);
const nextSources: ObservableInputTuple<A> = argsOrArgArray(sources) as any;

return new Observable((subscriber) => {
let sourceIndex = 0;
const subscribeNext = () => {
if (sourceIndex < nextSources.length) {
let nextSource: Observable<A[number]>;
try {
nextSource = innerFrom(nextSources[sourceIndex++]);
} catch (err) {
subscribeNext();
return;
}
const innerSubscriber = new OperatorSubscriber(subscriber, undefined, noop, noop);
nextSource.subscribe(innerSubscriber);
innerSubscriber.add(subscribeNext);
} else {
subscriber.complete();
}
};
subscribeNext();
});
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import { Observable } from '../Observable';
import { ObservableInputTuple, OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { innerFrom } from '../observable/innerFrom';
import { argsOrArgArray } from '../util/argsOrArgArray';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { noop } from '../util/noop';
import { onErrorResumeNext as oERNCreate } from '../observable/onErrorResumeNext';

export function onErrorResumeNext<T, A extends readonly unknown[]>(
export function onErrorResumeNextWith<T, A extends readonly unknown[]>(
sources: [...ObservableInputTuple<A>]
): OperatorFunction<T, T | A[number]>;
export function onErrorResumeNext<T, A extends readonly unknown[]>(
export function onErrorResumeNextWith<T, A extends readonly unknown[]>(
...sources: [...ObservableInputTuple<A>]
): OperatorFunction<T, T | A[number]>;

Expand Down Expand Up @@ -85,7 +81,7 @@ export function onErrorResumeNext<T, A extends readonly unknown[]>(
* Observable, but - if it errors - subscribes to the next passed Observable
* and so on, until it completes or runs out of Observables.
*/
export function onErrorResumeNext<T, A extends readonly unknown[]>(
export function onErrorResumeNextWith<T, A extends readonly unknown[]>(
...sources: [[...ObservableInputTuple<A>]] | [...ObservableInputTuple<A>]
): OperatorFunction<T, T | A[number]> {
// For some reason, TS 4.1 RC gets the inference wrong here and infers the
Expand All @@ -94,32 +90,10 @@ export function onErrorResumeNext<T, A extends readonly unknown[]>(
// asserted explicitly.
const nextSources = argsOrArgArray(sources) as unknown as ObservableInputTuple<A>;

return operate((source, subscriber) => {
const remaining = [source, ...nextSources];
const subscribeNext = () => {
if (!subscriber.closed) {
if (remaining.length > 0) {
let nextSource: Observable<A[number]>;
try {
nextSource = innerFrom(remaining.shift()!);
} catch (err) {
subscribeNext();
return;
}

// Here we have to use one of our Subscribers, or it does not wire up
// The `closed` property of upstream Subscribers synchronously, that
// would result in situation were we could not stop a synchronous firehose
// with something like `take(3)`.
const innerSub = createOperatorSubscriber(subscriber, undefined, noop, noop);
nextSource.subscribe(innerSub);
innerSub.add(subscribeNext);
} else {
subscriber.complete();
}
}
};

subscribeNext();
});
return (source) => oERNCreate(source, ...nextSources);
}

/**
* @deprecated Renamed. Use {@link onErrorResumeNextWith} instead. Will be removed in v8.
*/
export const onErrorResumeNext = onErrorResumeNextWith;
2 changes: 1 addition & 1 deletion src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export { mergeWith } from '../internal/operators/mergeWith';
export { min } from '../internal/operators/min';
export { multicast } from '../internal/operators/multicast';
export { observeOn } from '../internal/operators/observeOn';
export { onErrorResumeNext } from '../internal/operators/onErrorResumeNext';
export { onErrorResumeNext } from '../internal/operators/onErrorResumeNextWith';
export { pairwise } from '../internal/operators/pairwise';
export { partition } from '../internal/operators/partition';
export { pluck } from '../internal/operators/pluck';
Expand Down

0 comments on commit 51e3b2c

Please sign in to comment.