diff --git a/src/operator/publishReplay.ts b/src/operator/publishReplay.ts index d9098b7a93..715936f961 100644 --- a/src/operator/publishReplay.ts +++ b/src/operator/publishReplay.ts @@ -1,8 +1,7 @@ import { Observable } from '../Observable'; -import { ReplaySubject } from '../ReplaySubject'; import { IScheduler } from '../Scheduler'; -import { multicast } from './multicast'; import { ConnectableObservable } from '../observable/ConnectableObservable'; +import { publishReplay as higherOrder } from '../operators'; /** * @param bufferSize @@ -15,5 +14,5 @@ import { ConnectableObservable } from '../observable/ConnectableObservable'; export function publishReplay(this: Observable, bufferSize: number = Number.POSITIVE_INFINITY, windowTime: number = Number.POSITIVE_INFINITY, scheduler?: IScheduler): ConnectableObservable { - return multicast.call(this, new ReplaySubject(bufferSize, windowTime, scheduler)); + return higherOrder(bufferSize, windowTime, scheduler)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 815274df5e..457aeb5ee0 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -55,6 +55,7 @@ export { pluck } from './pluck'; export { publish } from './publish'; export { publishBehavior } from './publishBehavior'; export { publishLast } from './publishLast'; +export { publishReplay } from './publishReplay'; export { race } from './race'; export { reduce } from './reduce'; export { refCount } from './refCount'; diff --git a/src/operators/publishReplay.ts b/src/operators/publishReplay.ts new file mode 100644 index 0000000000..5943db4acb --- /dev/null +++ b/src/operators/publishReplay.ts @@ -0,0 +1,12 @@ +import { Observable } from '../Observable'; +import { ReplaySubject } from '../ReplaySubject'; +import { IScheduler } from '../Scheduler'; +import { multicast } from './multicast'; +import { ConnectableObservable } from '../observable/ConnectableObservable'; +import { UnaryFunction } from '../interfaces'; + +export function publishReplay(bufferSize: number = Number.POSITIVE_INFINITY, + windowTime: number = Number.POSITIVE_INFINITY, + scheduler?: IScheduler): UnaryFunction, ConnectableObservable> { + return (source: Observable) => multicast(new ReplaySubject(bufferSize, windowTime, scheduler))(source) as ConnectableObservable; +}