Skip to content

Commit

Permalink
feat(scheduled): Adds schduled creation method
Browse files Browse the repository at this point in the history
The `scheduled` creation method is meant to create an observable with a scheduler. The goal of this PR is to set us up to remove scheduling from very common code paths like `from` and `of`, amongst others.

This deprecates versions of `from`, `concat`, `merge`, `startWith` and `endWith` that allow passing a scheduler.

This PR also simplifies a few common code paths.
  • Loading branch information
benlesh committed Feb 26, 2019
1 parent 8f7d7fb commit 372c2c2
Show file tree
Hide file tree
Showing 22 changed files with 436 additions and 193 deletions.
48 changes: 48 additions & 0 deletions spec/helpers/observableMatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import * as _ from 'lodash';

function stringify(x: any): string {
return JSON.stringify(x, function (key: string, value: any) {
if (Array.isArray(value)) {
return '[' + value
.map(function (i) {
return '\n\t' + stringify(i);
}) + '\n]';
}
return value;
})
.replace(/\\"/g, '"')
.replace(/\\t/g, '\t')
.replace(/\\n/g, '\n');
}

function deleteErrorNotificationStack(marble: any) {
const { notification } = marble;
if (notification) {
const { kind, error } = notification;
if (kind === 'E' && error instanceof Error) {
notification.error = { name: error.name, message: error.message };
}
}
return marble;
}

export function observableMatcher(actual: any, expected: any) {
if (Array.isArray(actual) && Array.isArray(expected)) {
actual = actual.map(deleteErrorNotificationStack);
expected = expected.map(deleteErrorNotificationStack);
const passed = _.isEqual(actual, expected);
if (passed) {
return;
}

let message = '\nExpected \n';
actual.forEach((x: any) => message += `\t${stringify(x)}\n`);

message += '\t\nto deep equal \n';
expected.forEach((x: any) => message += `\t${stringify(x)}\n`);

chai.assert(passed, message);
} else {
chai.assert.deepEqual(actual, expected);
}
}
13 changes: 6 additions & 7 deletions spec/helpers/test-helper.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
declare const global: any;

import * as Rx from 'rxjs/Rx';
import { ObservableInput } from 'rxjs';
import { ObservableInput, of, asyncScheduler, Observable } from 'rxjs';
import { iterator } from 'rxjs/symbol/iterator';
import { root } from 'rxjs/util/root';
import $$symbolObservable from 'symbol-observable';

export function lowerCaseO<T>(...args: Array<any>): Rx.Observable<T> {
export function lowerCaseO<T>(...args: Array<any>): Observable<T> {
const o = {
subscribe(observer: any) {
args.forEach(v => observer.next(v));
Expand All @@ -24,9 +23,9 @@ export function lowerCaseO<T>(...args: Array<any>): Rx.Observable<T> {
return <any>o;
}

export const createObservableInputs = <T>(value: T) => Rx.Observable.of<ObservableInput<T>>(
Rx.Observable.of<T>(value),
Rx.Observable.of<T>(value, Rx.Scheduler.async),
export const createObservableInputs = <T>(value: T) => of<ObservableInput<T>>(
of(value),
of(value, asyncScheduler),
[value],
Promise.resolve(value),
<any>({
Expand All @@ -42,7 +41,7 @@ export const createObservableInputs = <T>(value: T) => Rx.Observable.of<Observab
};
}
}),
<any>({ [$$symbolObservable]: () => Rx.Observable.of(value) })
<any>({ [$$symbolObservable]: () => of(value) })
);

global.__root__ = root;
17 changes: 0 additions & 17 deletions spec/observables/ScalarObservable-spec.ts

This file was deleted.

9 changes: 2 additions & 7 deletions spec/observables/of-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { empty, of, Observable } from 'rxjs';
import { of, Observable } from 'rxjs';
import { expectObservable } from '../helpers/marble-testing';
import { TestScheduler } from 'rxjs/testing';
import { concatMap, delay, concatAll } from 'rxjs/operators';
Expand Down Expand Up @@ -33,18 +33,13 @@ describe('of', () => {
});
});

it('should return an empty observable if passed no values', () => {
const obs = of();
expect(obs).to.equal(empty());
});

it('should emit one value', (done: MochaDone) => {
let calls = 0;

of(42).subscribe((x: number) => {
expect(++calls).to.equal(1);
expect(x).to.equal(42);
}, (err: any) => {
}, (err: any) => {
done(new Error('should not be called'));
}, () => {
done();
Expand Down
63 changes: 63 additions & 0 deletions spec/scheduled/scheduled-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { scheduled, of } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { lowerCaseO } from '../helpers/test-helper';
import { observableMatcher } from '../helpers/observableMatcher';
import { expect } from 'chai';

describe('scheduled', () => {
let testScheduler: TestScheduler;

beforeEach(() => {
testScheduler = new TestScheduler(observableMatcher);
});

it('should schedule a sync observable', () => {
const input = of('a', 'b', 'c');
testScheduler.run(({ expectObservable }) => {
expectObservable(scheduled(input, testScheduler)).toBe('(abc|)');
});
});

it('should schedule an array', () => {
const input = ['a', 'b', 'c'];
testScheduler.run(({ expectObservable }) => {
expectObservable(scheduled(input, testScheduler)).toBe('(abc|)');
});
});

it('should schedule an iterable', () => {
const input = 'abc'; // strings are iterables
testScheduler.run(({ expectObservable }) => {
expectObservable(scheduled(input, testScheduler)).toBe('(abc|)');
});
});

it('should schedule an observable-like', () => {
const input = lowerCaseO('a', 'b', 'c'); // strings are iterables
testScheduler.run(({ expectObservable }) => {
expectObservable(scheduled(input, testScheduler)).toBe('(abc|)');
});
});

it('should schedule a promise', done => {
const results: any[] = [];
const input = Promise.resolve('x'); // strings are iterables
scheduled(input, testScheduler).subscribe({
next(value) { results.push(value); },
complete() { results.push('done'); },
});

expect(results).to.deep.equal([]);

// Promises force async, so we can't schedule synchronously, no matter what.
testScheduler.flush();
expect(results).to.deep.equal([]);

Promise.resolve().then(() => {
// NOW it should work, as the other promise should have resolved.
testScheduler.flush();
expect(results).to.deep.equal(['x', 'done']);
done();
});
});
});
11 changes: 0 additions & 11 deletions spec/util/subscribeToResult-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,6 @@ import { iterator, subscribeToResult, OuterSubscriber } from 'rxjs/internal-comp
import $$symbolObservable from 'symbol-observable';

describe('subscribeToResult', () => {
it('should synchronously complete when subscribe to scalarObservable', () => {
const result = Rx.Observable.of(42);
let expected: number;
const subscriber = new OuterSubscriber<number, number>((x) => expected = x);

const subscription = subscribeToResult(subscriber, result);

expect(expected).to.be.equal(42);
expect(subscription).to.not.exist;
});

it('should subscribe to observables that are an instanceof Rx.Observable', (done) => {
const expected = [1, 2, 3];
const result = Rx.Observable.range(1, 3);
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export { throwError } from './internal/observable/throwError';
export { timer } from './internal/observable/timer';
export { using } from './internal/observable/using';
export { zip } from './internal/observable/zip';
export { scheduled } from './internal/scheduled/scheduled';

/* Constants */
export { EMPTY } from './internal/observable/empty';
Expand Down
29 changes: 23 additions & 6 deletions src/internal/observable/concat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,30 @@ import { from } from './from';
import { concatAll } from '../operators/concatAll';

/* tslint:disable:max-line-length */
export function concat<O1 extends ObservableInput<any>>(v1: O1, scheduler?: SchedulerLike): Observable<ObservedValueOf<O1>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(v1: O1, v2: O2, scheduler?: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, scheduler?: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, scheduler?: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, scheduler?: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4> | ObservedValueOf<O5>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, scheduler?: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4> | ObservedValueOf<O5> | ObservedValueOf<O6>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O1 extends ObservableInput<any>>(v1: O1, scheduler: SchedulerLike): Observable<ObservedValueOf<O1>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(v1: O1, v2: O2, scheduler: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, scheduler: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, scheduler: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, scheduler: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4> | ObservedValueOf<O5>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, scheduler: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4> | ObservedValueOf<O5> | ObservedValueOf<O6>>;

export function concat<O1 extends ObservableInput<any>>(v1: O1): Observable<ObservedValueOf<O1>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(v1: O1, v2: O2): Observable<ObservedValueOf<O1> | ObservedValueOf<O2>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4> | ObservedValueOf<O5>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4> | ObservedValueOf<O5> | ObservedValueOf<O6>>;
export function concat<O extends ObservableInput<any>>(...observables: O[]): Observable<ObservedValueOf<O>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O extends ObservableInput<any>>(...observables: (O | SchedulerLike)[]): Observable<ObservedValueOf<O>>;
export function concat<R>(...observables: ObservableInput<any>[]): Observable<R>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<R>(...observables: (ObservableInput<any> | SchedulerLike)[]): Observable<R>;
/* tslint:enable:max-line-length */
/**
Expand Down
11 changes: 4 additions & 7 deletions src/internal/observable/empty.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,16 @@ export const EMPTY = new Observable<never>(subscriber => subscriber.complete());
* @see {@link of}
* @see {@link throwError}
*
* @param {SchedulerLike} [scheduler] A {@link SchedulerLike} to use for scheduling
* @param scheduler A {@link SchedulerLike} to use for scheduling
* the emission of the complete notification.
* @return {Observable} An "empty" Observable: emits only the complete
* @return An "empty" Observable: emits only the complete
* notification.
* @static true
* @name empty
* @owner Observable
* @deprecated Deprecated in favor of using {@link index/EMPTY} constant.
* @deprecated Deprecated in favor of using {@link EMPTY} constant, or {@link scheduled} (e.g. `scheduled([], scheduler)`)
*/
export function empty(scheduler?: SchedulerLike) {
return scheduler ? emptyScheduled(scheduler) : EMPTY;
}

export function emptyScheduled(scheduler: SchedulerLike) {
function emptyScheduled(scheduler: SchedulerLike) {
return new Observable<never>(subscriber => scheduler.schedule(() => subscriber.complete()));
}
30 changes: 6 additions & 24 deletions src/internal/observable/from.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
import { Observable } from '../Observable';
import { isPromise } from '../util/isPromise';
import { isArrayLike } from '../util/isArrayLike';
import { isInteropObservable } from '../util/isInteropObservable';
import { isIterable } from '../util/isIterable';
import { fromArray } from './fromArray';
import { fromPromise } from './fromPromise';
import { fromIterable } from './fromIterable';
import { fromObservable } from './fromObservable';
import { subscribeTo } from '../util/subscribeTo';
import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types';
import { scheduled } from 'rxjs/internal/scheduled/scheduled';

export function from<O extends ObservableInput<any>>(input: O, scheduler?: SchedulerLike): Observable<ObservedValueOf<O>>;
export function from<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>;
/** @deprecated use {@link scheduled} instead. */
export function from<O extends ObservableInput<any>>(input: O, scheduler: SchedulerLike): Observable<ObservedValueOf<O>>;

/**
* Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.
Expand Down Expand Up @@ -93,26 +88,13 @@ export function from<O extends ObservableInput<any>>(input: O, scheduler?: Sched
* @name from
* @owner Observable
*/

export function from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T> {
if (!scheduler) {
if (input instanceof Observable) {
return input;
}
return new Observable<T>(subscribeTo(input));
} else {
return scheduled(input, scheduler);
}

if (input != null) {
if (isInteropObservable(input)) {
return fromObservable(input, scheduler);
} else if (isPromise(input)) {
return fromPromise(input, scheduler);
} else if (isArrayLike(input)) {
return fromArray(input, scheduler);
} else if (isIterable(input) || typeof input === 'string') {
return fromIterable(input, scheduler);
}
}

throw new TypeError((input !== null && typeof input || input) + ' is not observable');
}
Loading

0 comments on commit 372c2c2

Please sign in to comment.