diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index 3f1914e455..c81f697766 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -1,9 +1,9 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { shareReplay, mergeMapTo, retry, take } from 'rxjs/operators'; +import { shareReplay, mergeMapTo, retry } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { Observable, interval, Operator, Observer, of } from 'rxjs'; +import { Observable, Operator, Observer, of, from } from 'rxjs'; declare function asDiagram(arg: string): Function; declare const rxTestScheduler: TestScheduler; @@ -243,4 +243,23 @@ describe('shareReplay operator', () => { done(); }); }); + + it('should not skip values on a sync source', () => { + const a = from(['a', 'b', 'c', 'd']); + // We would like for the previous line to read like this: + // + // const a = cold('(abcd|)'); + // + // However, that would synchronously emit multiple values at frame 0, + // but it's not synchronous upon-subscription. + // TODO: revisit once https://github.com/ReactiveX/rxjs/issues/5523 is fixed + + const x = cold( 'x-------x'); + const expected = '(abcd)--d'; + + const shared = a.pipe(shareReplay(1)); + const result = x.pipe(mergeMapTo(shared)); + expectObservable(result).toBe(expected); + }); + }); diff --git a/src/internal/operators/shareReplay.ts b/src/internal/operators/shareReplay.ts index 400ddc8de5..e8976b3c7d 100644 --- a/src/internal/operators/shareReplay.ts +++ b/src/internal/operators/shareReplay.ts @@ -91,9 +91,11 @@ function shareReplayOperator({ return function shareReplayOperation(this: Subscriber, source: Observable) { refCount++; + let innerSub: Subscription; if (!subject || hasError) { hasError = false; subject = new ReplaySubject(bufferSize, windowTime, scheduler); + innerSub = subject.subscribe(this); subscription = source.subscribe({ next(value) { subject.next(value); }, error(err) { @@ -106,9 +108,10 @@ function shareReplayOperator({ subject.complete(); }, }); + } else { + innerSub = subject.subscribe(this); } - const innerSub = subject.subscribe(this); this.add(() => { refCount--; innerSub.unsubscribe();