Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(afs): Allow multiple subscribers by using share, closes #1191 #1192

Merged
merged 7 commits into from
Oct 5, 2017
8 changes: 6 additions & 2 deletions src/firestore/collection/changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ export function combineChanges(current: firebase.firestore.DocumentChange[], cha
*/
export function combineChange(combined: firebase.firestore.DocumentChange[], change: firebase.firestore.DocumentChange): firebase.firestore.DocumentChange[] {
switch(change.type) {
case 'added':
combined.splice(change.newIndex, 0, change);
case 'added':
if (combined[change.newIndex] && combined[change.newIndex].doc.id == change.doc.id) {
// Not sure why the duplicates are getting fired
} else {
combined.splice(change.newIndex, 0, change);
}
break;
case 'modified':
// When an item changes position we first remove it
Expand Down
81 changes: 81 additions & 0 deletions src/firestore/collection/collection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,33 @@ describe('AngularFirestoreCollection', () => {

});

it('should handle multiple subscriptions (hot)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
const sub = changes.subscribe(() => {}).add(
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
sub.unsubscribe();
})
).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});

it('should handle multiple subscriptions (warm)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
changes.take(1).subscribe(() => {}).add(() => {
const sub = changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
}).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});
});

it('should handle dynamic queries that return empty sets', async (done) => {
const ITEMS = 10;
let count = 0;
Expand Down Expand Up @@ -129,6 +156,33 @@ describe('AngularFirestoreCollection', () => {
});
});

it('should handle multiple subscriptions (hot)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.snapshotChanges();
const sub = changes.subscribe(() => {}).add(
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
sub.unsubscribe();
})
).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});

it('should handle multiple subscriptions (warm)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.snapshotChanges();
changes.take(1).subscribe(() => {}).add(() => {
const sub = changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
}).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});
});

it('should update order on queries', async (done) => {
const ITEMS = 10;
let count = 0;
Expand Down Expand Up @@ -279,6 +333,33 @@ describe('AngularFirestoreCollection', () => {
}
});
});

it('should handle multiple subscriptions (hot)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.stateChanges();
const sub = changes.subscribe(() => {}).add(
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
sub.unsubscribe();
})
).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});

it('should handle multiple subscriptions (warm)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.stateChanges();
changes.take(1).subscribe(() => {}).add(() => {
const sub = changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
}).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});
});

it('should be able to filter stateChanges() types - modified', async (done) => {
const ITEMS = 10;
Expand Down
6 changes: 3 additions & 3 deletions src/firestore/collection/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export class AngularFirestoreCollection<T> {
}

/**
* Create a stream of synchronized shanges. This method keeps the local array in sorted
* Create a stream of synchronized changes. This method keeps the local array in sorted
* query order.
* @param events
*/
Expand All @@ -96,8 +96,8 @@ export class AngularFirestoreCollection<T> {
* Listen to all documents in the collection and its possible query as an Observable.
*/
valueChanges(events?: firebase.firestore.DocumentChangeType[]): Observable<T[]> {
return this.snapshotChanges()
.map(actions => actions.map(a => a.payload.doc.data()) as T[]);
return fromCollectionRef(this.query)
.map(actions => actions.payload.docs.map(a => a.data()) as T[]);
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/firestore/observable/fromRef.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import { Subscription } from 'rxjs/Subscription';
import { observeOn } from 'rxjs/operator/observeOn';
import { ZoneScheduler } from 'angularfire2';
import { Action, Reference } from '../interfaces';

import 'rxjs/add/operator/map';
import 'rxjs/add/operator/share';

function _fromRef<T, R>(ref: Reference<T>): Observable<R> {
const ref$ = new Observable(subscriber => {
Expand All @@ -16,7 +18,7 @@ function _fromRef<T, R>(ref: Reference<T>): Observable<R> {
}

export function fromRef<R>(ref: firebase.firestore.DocumentReference | firebase.firestore.Query) {
return _fromRef<typeof ref, R>(ref);
return _fromRef<typeof ref, R>(ref).share();
}

export function fromDocRef(ref: firebase.firestore.DocumentReference): Observable<Action<firebase.firestore.DocumentSnapshot>>{
Expand Down
1 change: 1 addition & 0 deletions tools/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const GLOBALS = {
'rxjs/add/observable/fromPromise': 'Rx.Observable.prototype',
'rxjs/add/operator/delay': 'Rx.Observable',
'rxjs/add/operator/debounce': 'Rx.Observable',
'rxjs/add/operator/share': 'Rx.Observable',
'rxjs/observable/fromEvent': 'Rx.Observable',
'rxjs/observable/from': 'Rx.Observable',
'rxjs/operator': 'Rx.Observable.prototype',
Expand Down