diff --git a/spec/operators/repeat-spec.js b/spec/operators/repeat-spec.js new file mode 100644 index 0000000000..2e1cf68267 --- /dev/null +++ b/spec/operators/repeat-spec.js @@ -0,0 +1,15 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.repeat()', function () { + it('should resubscribe count number of times', function (done) { + var expected = [1, 2, 1, 2]; + Observable.of(1,2) + .repeat(2) + .subscribe(function(x){ + expect(x).toBe(expected.shift()); + }, null, done); + }); + +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 95ad2c6baf..3f302039db 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -156,8 +156,9 @@ export default class Observable { catch: (selector: (err: any, source: Observable, caught: Observable) => Observable) => Observable; retryWhen: (notifier: (errors: Observable) => Observable) => Observable; + repeat: (count: number) => Observable; groupBy: (keySelector: (value:T) => string, durationSelector?: (group:GroupSubject) => Observable, elementSelector?: (value:T) => R) => Observable; finally: (ensure: () => void, thisArg?: any) => Observable; - } + } \ No newline at end of file diff --git a/src/Rx.ts b/src/Rx.ts index a4d39261a4..574b68ac02 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -147,9 +147,11 @@ observableProto.materialize = materialize; import _catch from './operators/catch'; import retryWhen from './operators/retryWhen'; +import repeat from './operators/repeat'; observableProto.catch = _catch; observableProto.retryWhen = retryWhen; +observableProto.repeat = repeat; import _finally from './operators/finally'; diff --git a/src/operators/repeat.ts b/src/operators/repeat.ts new file mode 100644 index 0000000000..7857fa1cae --- /dev/null +++ b/src/operators/repeat.ts @@ -0,0 +1,42 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; +import Subject from '../Subject'; +import Subscription from '../Subscription'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; + +export default function repeat(count: number): Observable { + return this.lift(new RepeatOperator(count, this)); +} + +export class RepeatOperator extends Operator { + constructor(protected count: number, protected original:Observable) { + super(); + } + + call(observer: Observer): Observer { + return new RepeatSubscriber(observer, this.count, this.original); + } +} + +export class RepeatSubscriber extends Subscriber { + private repeated: number = 0; + constructor(destination: Observer, public count: number, public original: Observable) { + super(destination); + } + + _complete(){ + if (this.count === (this.repeated+=1)){ + this.destination.complete(); + }else{ + this.resubscribe(); + } + } + + resubscribe() { + this.original.subscribe(this); + } +}