Skip to content

Commit

Permalink
feat(rxjs): fix angular#830, monkey patch rxjs to make rxjs run in co…
Browse files Browse the repository at this point in the history
…rrect zone
  • Loading branch information
JiaLiPassion committed Jul 19, 2017
1 parent 5a010c6 commit d37e332
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 3 deletions.
1 change: 1 addition & 0 deletions karma-base.conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module.exports = function (config) {
'node_modules/systemjs/dist/system-polyfills.js',
'node_modules/systemjs/dist/system.src.js',
'node_modules/whatwg-fetch/fetch.js',
{pattern: 'node_modules/rxjs/bundles/Rx.js', included: true, watched: true},
{pattern: 'test/assets/**/*.*', watched: true, served: true, included: false},
{pattern: 'build/**/*.js.map', watched: true, served: true, included: false},
{pattern: 'build/**/*.js', watched: true, served: true, included: false}
Expand Down
3 changes: 2 additions & 1 deletion lib/node/rollup-main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
import '../zone';
import '../common/promise';
import '../common/to-string';
import './node';
import './node';
import '../rxjs/rxjs';
110 changes: 110 additions & 0 deletions lib/rxjs/rxjs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
Zone.__load_patch('rxjs', (global: any, Zone: ZoneType, api: _ZonePrivate) => {
let rxjs;

const subscribeSource = 'rxjs.subscribe';
const nextSource = 'rxjs.Subscriber.next';
const errorSource = 'rxjs.Subscriber.error';
const completeSource = 'rxjs.Subscriber.complete';
const unsubscribeSource = 'rxjs.Subscriber.unsubscribe';

try {
rxjs = require('rxjs');
} catch (error) {
return;
}

const Observable = rxjs.Observable;

rxjs.Observable = function () {
Observable.apply(this, arguments);
this._zone = Zone.current;

const _subscribe = this._subscribe;
this._subscribe = function () {
const currentZone = Zone.current;
const observableZone = this._zone;
let sub;
if (observableZone && observableZone !== currentZone) {
sub = observableZone.run(_subscribe, this, arguments);
if (sub) {
sub._zone = observableZone;
}
} else {
sub = _subscribe.apply(this, arguments);
if (sub) {
sub._zone = currentZone;
}
}
return sub;
};
return this;
};

rxjs.Observable.prototype = Observable.prototype;
const subscribe = Observable.prototype.subscribe;
Observable.prototype.subscribe = function() {
const sub = subscribe.apply(this, arguments);
if (sub) {
sub._zone = Zone.current;
}
return sub;
}

const Subscriber = rxjs.Subscriber;

const next = Subscriber.prototype.next;
const error = Subscriber.prototype.error;
const complete = Subscriber.prototype.complete;
const unsubscribe = Subscriber.prototype.unsubscribe;

Subscriber.prototype.next = function () {
const currentZone = Zone.current;
const observableZone = this._zone;

if (observableZone && observableZone !== currentZone) {
return observableZone.run(next, this, arguments, nextSource);
} else {
return next.apply(this, arguments);
}
}

Subscriber.prototype.error = function () {
const currentZone = Zone.current;
const observableZone = this._zone;

if (observableZone && observableZone !== currentZone) {
return observableZone.run(error, this, arguments, errorSource);
} else {
return error.apply(this, arguments);
}
}

Subscriber.prototype.complete = function () {
const currentZone = Zone.current;
const observableZone = this._zone;

if (observableZone && observableZone !== currentZone) {
return observableZone.run(complete, this, arguments, completeSource);
} else {
return complete.apply(this, arguments);
}
}

Subscriber.prototype.unsubscribe = function () {
const currentZone = Zone.current;
const observableZone = this._zone;

if (observableZone && observableZone !== currentZone) {
return observableZone.run(unsubscribe, this, arguments, unsubscribeSource);
} else {
return unsubscribe.apply(this, arguments);
}
}
});
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
"phantomjs": "^2.1.7",
"promises-aplus-tests": "^2.1.2",
"pump": "^1.0.1",
"rxjs": "^5.4.2",
"selenium-webdriver": "^3.4.0",
"systemjs": "^0.19.37",
"ts-loader": "^0.6.0",
Expand Down
3 changes: 2 additions & 1 deletion test/browser-zone-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ import '../lib/zone-spec/proxy';
import '../lib/zone-spec/sync-test';
import '../lib/zone-spec/task-tracking';
import '../lib/zone-spec/wtf';
import '../lib/extra/cordova';
import '../lib/extra/cordova';
import '../lib/rxjs/rxjs';
3 changes: 2 additions & 1 deletion test/browser_entry_point.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ import './browser/MediaQuery.spec';
import './browser/Notification.spec';
import './mocha-patch.spec';
import './jasmine-patch.spec';
import './extra/cordova.spec';
import './extra/cordova.spec';
import './rxjs/rxjs.spec';
1 change: 1 addition & 0 deletions test/common_tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ import './zone-spec/sync-test.spec';
import './zone-spec/fake-async-test.spec';
import './zone-spec/proxy.spec';
import './zone-spec/task-tracking.spec';
import './rxjs/rxjs.spec';

Error.stackTraceLimit = Number.POSITIVE_INFINITY;
1 change: 1 addition & 0 deletions test/node_entry_point.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import '../lib/zone-spec/proxy';
import '../lib/zone-spec/sync-test';
import '../lib/zone-spec/task-tracking';
import '../lib/zone-spec/wtf';
import '../lib/rxjs/rxjs';

// Setup test environment
import './test-env-setup-jasmine';
Expand Down
145 changes: 145 additions & 0 deletions test/rxjs/rxjs.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
import {Subscriber, Observable} from 'rxjs';

/**
* The point of these tests, is to ensure that all callbacks execute in the Zone which was active
* when the callback was passed into the Rx.
*
* The implications are:
* - Observable callback passed into `Observable` executes in the same Zone as when the
* `new Observable` was invoked.
* - The subscription callbacks passed into `subscribe` execute in the same Zone as when the
* `subscribe` method was invoked.
* - The operator callbacks passe into `map`, etc..., execute in the same Zone as when the
* `operator` (`lift`) method was invoked.
*/
describe('Zone interaction', () => {
it('should run methods in the zone of declaration', () => {
const log: string[] = [];
const constructorZone: Zone = Zone.current.fork({ name: 'Constructor Zone'});
const subscriptionZone: Zone = Zone.current.fork({ name: 'Subscription Zone'});
let subscriber: Subscriber<string> = null;
const observable = constructorZone.run(() => new Observable<string>((_subscriber) => {
subscriber = _subscriber;
log.push('setup');
expect(Zone.current.name).toEqual(constructorZone.name);
return () => {
expect(Zone.current.name).toEqual(constructorZone.name);
log.push('cleanup');
};
})) as Observable<string>;
subscriptionZone.run(() => observable.subscribe(
() => {
expect(Zone.current.name).toEqual(subscriptionZone.name);
log.push('next');
},
() => null,
() => {
(process as any)._rawDebug('complete callback');
expect(Zone.current.name).toEqual(subscriptionZone.name);
log.push('complete');
}
));
subscriber.next('MyValue');
subscriber.complete();

expect(log).toEqual(['setup', 'next', 'complete', 'cleanup']);
log.length = 0;

subscriptionZone.run(() => observable.subscribe(
() => null,
() => {
expect(Zone.current.name).toEqual(subscriptionZone.name);
log.push('error');
},
() => null
));
subscriber.next('MyValue');
subscriber.error('MyError');

expect(log).toEqual(['setup', 'error', 'cleanup']);
});

xit('should run methods in the zone of declaration when nexting synchronously', () => {
const log: string[] = [];
const rootZone: Zone = Zone.current;
const constructorZone: Zone = Zone.current.fork({ name: 'Constructor Zone'});
const subscriptionZone: Zone = Zone.current.fork({ name: 'Subscription Zone'});
const observable = constructorZone.run(() => new Observable<string>((subscriber) => {
// Execute the `next`/`complete` in different zone, and assert that correct zone
// is restored.
rootZone.run(() => {
subscriber.next('MyValue');
subscriber.complete();
});
return () => {
expect(Zone.current.name).toEqual(constructorZone.name);
log.push('cleanup');
};
})) as Observable<string>;

subscriptionZone.run(() => observable.subscribe(
() => {
expect(Zone.current.name).toEqual(subscriptionZone.name);
log.push('next');
},
() => null,
() => {
expect(Zone.current.name).toEqual(subscriptionZone.name);
log.push('complete');
}
));

expect(log).toEqual(['next', 'complete', 'cleanup']);
});

xit('should run operators in the zone of declaration', () => {
const log: string[] = [];
const rootZone: Zone = Zone.current;
const constructorZone: Zone = Zone.current.fork({ name: 'Constructor Zone'});
const operatorZone: Zone = Zone.current.fork({ name: 'Operator Zone'});
const subscriptionZone: Zone = Zone.current.fork({ name: 'Subscription Zone'});
let observable = constructorZone.run(() => new Observable<string>((subscriber) => {
// Execute the `next`/`complete` in different zone, and assert that correct zone
// is restored.
rootZone.run(() => {
subscriber.next('MyValue');
subscriber.complete();
});
return () => {
expect(Zone.current.name).toEqual(constructorZone.name);
log.push('cleanup');
};
})) as Observable<string>;

observable = operatorZone.run(() => observable.map((value) => {
expect(Zone.current.name).toEqual(operatorZone.name);
log.push('map: ' + value);
return value;
})) as Observable<string>;

subscriptionZone.run(() => observable.subscribe(
() => {
expect(Zone.current.name).toEqual(subscriptionZone.name);
log.push('next');
},
(e) => {
expect(Zone.current.name).toEqual(subscriptionZone.name);
log.push('error: ' + e);
},
() => {
expect(Zone.current.name).toEqual(subscriptionZone.name);
log.push('complete');
}
));

expect(log).toEqual(['map: MyValue', 'next', 'complete', 'cleanup']);
});

});

0 comments on commit d37e332

Please sign in to comment.