Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry: fixes and tests #547

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 97 additions & 5 deletions spec/operators/retry-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,16 @@ describe('Observable.prototype.retry()', function () {
observer.complete();
})
.map(function (x) {
if ((errors += 1) < retries) {
throw 'bad';
}
return x;
errors += 1;
throw 'bad';
})
.retry(retries - 1)
.subscribe(
function (x) {
expect(x).toBe(42);
},
function (err) {
expect(errors).toBe(1);
expect(errors).toBe(2);
done();
}, function () {
expect('this was called').toBe(false);
Expand Down Expand Up @@ -77,4 +75,98 @@ describe('Observable.prototype.retry()', function () {
expect('this was called').toBe(false);
}, done);
});

it('should handle an empty source', function () {
var source = cold('|');
var expected = '|';

var result = source.retry();

expectObservable(result).toBe(expected);
});

it('should handle a never source', function () {
var source = cold('-');
var expected = '-';

var result = source.retry();

expectObservable(result).toBe(expected);
});

it('should return a never observable given an async just-throw source and no count', function () {
var source = cold('-#'); // important that it's not a sync error
var unsub = ' !';
var expected = '--------------------------------------';

var result = source.retry();

expectObservable(result, unsub).toBe(expected);
});

it('should handle a basic source that emits next then completes', function () {
var source = hot('--1--2--^--3--4--5---|');
var subs = '^ !';
var expected = '---3--4--5---|';

var result = source.retry();

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should handle a basic source that emits next but does not complete', function () {
var source = hot('--1--2--^--3--4--5---');
var subs = '^ ';
var expected = '---3--4--5---';

var result = source.retry();

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should handle a basic source that emits next then errors, no count', function () {
var source = cold('--1-2-3-#');
var unsub = ' !';
var subs = ['^ ! ',
' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ !'];
var expected = '--1-2-3---1-2-3---1-2-3---1-2-3---1-2-';

var result = source.retry();

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should handle a basic source that emits next then errors, count=3', function () {
var source = cold('--1-2-3-#');
var subs = ['^ ! ',
' ^ ! ',
' ^ ! ',
' ^ !'];
var expected = '--1-2-3---1-2-3---1-2-3---1-2-3-#';

var result = source.retry(3);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should handle a source which eventually throws, count=3, and result is ' +
'unsubscribed early', function () {
var source = cold('--1-2-3-#');
var unsub = ' ! ';
var subs = ['^ ! ',
' ^ ! '];
var expected = '--1-2-3---1-2-';

var result = source.retry(3);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
76 changes: 64 additions & 12 deletions src/operators/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,88 @@ import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';
import Subscription from '../Subscription';

export default function retry<T>(count: number = 0): Observable<T> {
return this.lift(new RetryOperator(count, this));
}

class RetryOperator<T, R> implements Operator<T, R> {
constructor(private count: number, protected original: Observable<T>) {
constructor(private count: number,
protected source: Observable<T>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new RetrySubscriber<T>(subscriber, this.count, this.original);
return new FirstRetrySubscriber<T>(subscriber, this.count, this.source);
}
}

class RetrySubscriber<T> extends Subscriber<T> {
private retries: number = 0;
constructor(destination: Subscriber<T>, private count: number, private original: Observable<T>) {
super(destination);
class FirstRetrySubscriber<T> extends Subscriber<T> {
private lastSubscription: Subscription<T>;

constructor(public destination: Subscriber<T>,
private count: number,
private source: Observable<T>) {
super(null);
this.lastSubscription = this;
}

_next(value: T) {
this.destination.next(value);
}

error(error?) {
if (!this.isUnsubscribed) {
super.unsubscribe();
this.resubscribe();
}
}

_complete() {
super.unsubscribe();
this.destination.complete();
}

unsubscribe() {
const lastSubscription = this.lastSubscription;
if (lastSubscription === this) {
super.unsubscribe();
} else {
lastSubscription.unsubscribe();
}
}

resubscribe(retried: number = 0) {
this.lastSubscription.unsubscribe();
const nextSubscriber = new RetryMoreSubscriber(this, this.count, retried + 1);
this.lastSubscription = this.source.subscribe(nextSubscriber);
}
}

class RetryMoreSubscriber<T> extends Subscriber<T> {
constructor(private parent: FirstRetrySubscriber<T>,
private count: number,
private retried: number = 0) {
super(null);
}

_next(value: T) {
this.parent.destination.next(value);
}

_error(err: any) {
const parent = this.parent;
const retried = this.retried;
const count = this.count;
if (count && count === (this.retries += 1)) {
this.destination.error(err);

if (count && retried === count) {
parent.destination.error(err);
} else {
this.resubscribe();
parent.resubscribe(retried);
}
}

resubscribe() {
this.original.subscribe(this);
_complete() {
this.parent.destination.complete();
}
}
}