Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(afs): Allow multiple subscribers by using share, closes #1191 #1192

Merged
merged 7 commits into from
Oct 5, 2017
4 changes: 3 additions & 1 deletion src/firestore/collection/changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/scan';
import 'rxjs/add/operator/shareReplay';

import { DocumentChangeAction, Action } from '../interfaces';

Expand All @@ -28,7 +29,8 @@ export function sortedChanges(query: firebase.firestore.Query, events: firebase.
return fromCollectionRef(query)
.map(changes => changes.payload.docChanges)
.scan((current, changes) => combineChanges(current, changes, events), [])
.map(changes => changes.map(c => ({ type: c.type, payload: c })));
.map(changes => changes.map(c => ({ type: c.type, payload: c })))
.shareReplay(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need .share() since .shareReplay() caches the data.

}

/**
Expand Down
57 changes: 57 additions & 0 deletions src/firestore/collection/collection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,49 @@ describe('AngularFirestoreCollection', () => {

});

it('should handle multiple subscriptions (hot)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
const sub = changes.subscribe(() => {}).add(
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
sub.unsubscribe();
})
).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});

// We need a clean way of handling now that we are no longer filtering empty
xit('should handle multiple subscriptions (warm)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
changes.take(1).subscribe(() => {}).add(() => {
const sub = changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
}).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});
});

it('should handle multiple subscriptions (cold)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
const sub = changes.subscribe(() => {
sub.unsubscribe();
}).add(() => {
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
}).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});
});

it('should handle dynamic queries that return empty sets', async (done) => {
const ITEMS = 10;
let count = 0;
Expand Down Expand Up @@ -129,6 +172,20 @@ describe('AngularFirestoreCollection', () => {
});
});

it('should handle multiple subscriptions', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.snapshotChanges();
const sub = changes.subscribe(() => {}).add(
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
sub.unsubscribe();
})
).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});

it('should update order on queries', async (done) => {
const ITEMS = 10;
let count = 0;
Expand Down
4 changes: 3 additions & 1 deletion src/firestore/observable/fromRef.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import { Subscription } from 'rxjs/Subscription';
import { observeOn } from 'rxjs/operator/observeOn';
import { ZoneScheduler } from 'angularfire2';
import { Action, Reference } from '../interfaces';

import 'rxjs/add/operator/map';
import 'rxjs/add/operator/share';

function _fromRef<T, R>(ref: Reference<T>): Observable<R> {
const ref$ = new Observable(subscriber => {
Expand All @@ -16,7 +18,7 @@ function _fromRef<T, R>(ref: Reference<T>): Observable<R> {
}

export function fromRef<R>(ref: firebase.firestore.DocumentReference | firebase.firestore.Query) {
return _fromRef<typeof ref, R>(ref);
return _fromRef<typeof ref, R>(ref).share();
}

export function fromDocRef(ref: firebase.firestore.DocumentReference): Observable<Action<firebase.firestore.DocumentSnapshot>>{
Expand Down
2 changes: 2 additions & 0 deletions tools/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const GLOBALS = {
'rxjs/add/observable/fromPromise': 'Rx.Observable.prototype',
'rxjs/add/operator/delay': 'Rx.Observable',
'rxjs/add/operator/debounce': 'Rx.Observable',
'rxjs/add/operator/shareReplay': 'Rx.Observable',
'rxjs/add/operator/share': 'Rx.Observable',
'rxjs/observable/fromEvent': 'Rx.Observable',
'rxjs/observable/from': 'Rx.Observable',
'rxjs/operator': 'Rx.Observable.prototype',
Expand Down