Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(publishReplay): add selector function to publishReplay #2885

Merged

Conversation

martinsik
Copy link
Contributor

Description:

This PR adds selector function to the publishReplay operator similarly to publish.

I didn't modify any existing tests. However, this could be probably a breaking change because I added the parameter before scheduler (I'm not sure this is a problem or not).

Related issue (if exists):

#2844

@rxjs-bot
Copy link

rxjs-bot commented Oct 1, 2017

Messages
📖

CJS: 1345.9KB, global: 745.4KB (gzipped: 140.0KB), min: 145.3KB (gzipped: 30.9KB)

Generated by 🚫 dangerJS

@coveralls
Copy link

Coverage Status

Coverage increased (+0.0003%) to 97.477% when pulling 6d1742e on martinsik:2844-add-selector-to-publishreplay into e8d8c08 on ReactiveX:master.

Copy link
Member

@benlesh benlesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A change to the one test, and a request that more test coverage be added.

@@ -409,4 +409,15 @@ describe('Observable.prototype.publishReplay', () => {

published.connect();
});

it('should mirror a simple source Observable with selector', () => {
const selector = observable => observable.map(v => String.fromCharCode(96 + parseInt(v)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is a little too clever. :)

Can you use something more straightforward? A hash lookup or something?


expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add tests for:

  • errors thrown in selector directly.
  • when selector returns an errored Observable (--a--b--#)
  • when selector returns empty Observable
  • when selector returns never
  • when selector returns Observable.throw

@benlesh
Copy link
Member

benlesh commented Oct 4, 2017

Also, @martinsik, if you can explain your cool pixel art profile pic, that would be good too. What is it?

Copy link
Member

@benlesh benlesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last thing, we'll need to add function signature overrides for both operator/publishReplay and operators/publishReplay... they're slightly different. I've linked examples.

Thanks again!

* @param scheduler
* @return {ConnectableObservable<T>}
* @return {Observable<T> | ConnectableObservable<T>}
* @method publishReplay
* @owner Observable
*/
export function publishReplay<T>(this: Observable<T>, bufferSize: number = Number.POSITIVE_INFINITY,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like below, we'll need to add some function overloads for TypeScript users to keep this change from being breaking. Here's what the publish counterpart to this file is doing...

export function publish<T>(this: Observable<T>): ConnectableObservable<T>;
export function publish<T>(this: Observable<T>, selector: (source: Observable<T>) => Observable<T>): Observable<T>;
export function publish<T, R>(this: Observable<T>, selector: (source: Observable<T>) => Observable<R>): Observable<R>;

@@ -7,6 +7,8 @@ import { UnaryFunction } from '../interfaces';

export function publishReplay<T>(bufferSize: number = Number.POSITIVE_INFINITY,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll need some function signature overloads for this so we don't break compatability for people expecting ConnectableObservable<T> to be the only return type...

See what publish is doing here for a better idea:

export function publish<T>(): MonoTypeOperatorFunction<T>;
export function publish<T>(selector: MonoTypeOperatorFunction<T>): MonoTypeOperatorFunction<T>;
export function publish<T, R>(selector: OperatorFunction<T, R>): OperatorFunction<T, R>;

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.03%) to 97.44% when pulling 2a157ef on martinsik:2844-add-selector-to-publishreplay into 0939f61 on ReactiveX:master.

@benlesh
Copy link
Member

benlesh commented Oct 6, 2017

LGTM.... @kwonoj?

@martinsik
Copy link
Contributor Author

martinsik commented Oct 6, 2017

@benlesh Sorry, it's work in progress I just pushed it because It was too late yesterday :). I'm not sure about a few things there.

btw, my avatar is a screenshot from this very old game https://en.wikipedia.org/wiki/Moonstone:_A_Hard_Days_Knight

const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector);

// The exception is thrown outside Rx chain (not as an error notification).
expect(() => published.subscribe()).to.throw(error);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception is thrown outside the chain so I need to check it like this. This situations isn't tested in neither publish nor multicast (if I didn't miss anything) so I'm not sure I'm doing it right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH GEEZ! I missed this... So what should happen is you should catch the error and send it down the error path with observer.error(err). There are all sorts of examples of how to handle this around teh library.

Basically any user-supplied function needs to have it's errors caught and sent down the error channel of observation.

Copy link
Contributor Author

@martinsik martinsik Oct 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I would expect multicast to catch this error for me (for example like the map() operator does) because that's where the selector is called: https://github.com/ReactiveX/rxjs/blob/master/src/operators/multicast.ts#L64

selectorOrScheduler?: IScheduler | OperatorFunction<T, R>,
scheduler?: IScheduler): Observable<R> | ConnectableObservable<R> {

return higherOrder(bufferSize, windowTime, selectorOrScheduler as any, scheduler)(this);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any other sane way to call higherOrder. TypeScript didn't allow me just using:

higherOrder(bufferSize, windowTime, selectorOrScheduler, scheduler)

.. but I think this should work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will do, the other thing you can do is duplicate the logic that figures out what was passed from the other method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, I didn't want to write the same logic twice so I'll leave it as it is.

scheduler?: IScheduler): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
return (source: Observable<T>) => multicast(new ReplaySubject<T>(bufferSize, windowTime, scheduler))(source) as ConnectableObservable<T>;
/* tslint:disable:max-line-length */
export function publishReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: IScheduler): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
Copy link
Contributor Author

@martinsik martinsik Oct 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The publish operator defines MonoTypeOperatorFunction return type but I don't think that's correct because this results into Observable<T> while publish should return ConnectableObservable<T>.

https://github.com/ReactiveX/rxjs/blob/master/src/operators/publish.ts#L6-L8

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for now... we might need to address this more generally across the library.

export function publishReplay<T, R>(bufferSize?: number, windowTime?: number, selector?: OperatorFunction<T, R>, scheduler?: IScheduler): OperatorFunction<T, R>;
/* tslint:enable:max-line-length */

export function publishReplay<T, R>(bufferSize?: number,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed all default values here and left it up to ReplaySubject to decide.

Copy link
Member

@benlesh benlesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add appropriate error handling.

const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector);

// The exception is thrown outside Rx chain (not as an error notification).
expect(() => published.subscribe()).to.throw(error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH GEEZ! I missed this... So what should happen is you should catch the error and send it down the error path with observer.error(err). There are all sorts of examples of how to handle this around teh library.

Basically any user-supplied function needs to have it's errors caught and sent down the error channel of observation.

selectorOrScheduler?: IScheduler | OperatorFunction<T, R>,
scheduler?: IScheduler): Observable<R> | ConnectableObservable<R> {

return higherOrder(bufferSize, windowTime, selectorOrScheduler as any, scheduler)(this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will do, the other thing you can do is duplicate the logic that figures out what was passed from the other method.

scheduler?: IScheduler): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
return (source: Observable<T>) => multicast(new ReplaySubject<T>(bufferSize, windowTime, scheduler))(source) as ConnectableObservable<T>;
/* tslint:disable:max-line-length */
export function publishReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: IScheduler): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for now... we might need to address this more generally across the library.

@martinsik
Copy link
Contributor Author

@benlesh I added a comment above that I'd expect multicast to catch the error because that's where the selector is called https://github.com/ReactiveX/rxjs/blob/master/src/operators/multicast.ts#L64.

@benlesh benlesh merged commit e0efd13 into ReactiveX:master Oct 18, 2017
@benlesh
Copy link
Member

benlesh commented Oct 18, 2017

Merged! Thanks @martinsik!

@lock
Copy link

lock bot commented Jun 6, 2018

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators Jun 6, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants