Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(afs): Allow multiple subscribers by using share, closes #1191 #1192

Merged
merged 7 commits into from
Oct 5, 2017
Prev Previous commit
Next Next commit
fix(afs): mutiple subscribes/unsubscribes could still get confused
  • Loading branch information
jamesdaniels committed Oct 5, 2017
commit 28aef60bdaf27c17c710d0c28f81338e758be514
26 changes: 17 additions & 9 deletions src/firestore/collection/changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { DocumentChangeAction, Action } from '../interfaces';
*/
export function docChanges(query: firebase.firestore.Query): Observable<DocumentChangeAction[]> {
return fromCollectionRef(query)
.filter(action => !!action)
.map(action =>
action.payload.docChanges
.map(change => ({ type: change.type, payload: change })));
Expand All @@ -27,7 +28,7 @@ export function docChanges(query: firebase.firestore.Query): Observable<Document
*/
export function sortedChanges(query: firebase.firestore.Query, events: firebase.firestore.DocumentChangeType[]): Observable<DocumentChangeAction[]> {
return fromCollectionRef(query)
.map(changes => changes.payload.docChanges)
.map(changes => changes && changes.payload.docChanges)
.scan((current, changes) => combineChanges(current, changes, events), [])
.map(changes => changes.map(c => ({ type: c.type, payload: c })))
.filter(changes => changes.length > 0)
Expand All @@ -41,14 +42,21 @@ export function sortedChanges(query: firebase.firestore.Query, events: firebase.
* @param changes
* @param events
*/
export function combineChanges(current: firebase.firestore.DocumentChange[], changes: firebase.firestore.DocumentChange[], events: firebase.firestore.DocumentChangeType[]) {
changes.forEach(change => {
// skip unwanted change types
if(events.indexOf(change.type) > -1) {
current = combineChange(current, change);
}
});
return current;
export function combineChanges(current: firebase.firestore.DocumentChange[], changes: firebase.firestore.DocumentChange[] | undefined, events: firebase.firestore.DocumentChangeType[]) {
if (changes) {
changes.forEach(change => {
// skip unwanted change types
if(events.indexOf(change.type) > -1) {
current = combineChange(current, change);
}
});
return current;
} else {
// in the case of undefined, empty current
// if you do odd things with the subscribes/unsubscrbes you can mess things
// up and get double or tripled results
return [];
}
}

/**
Expand Down
35 changes: 27 additions & 8 deletions src/firestore/collection/collection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,45 @@ describe('AngularFirestoreCollection', () => {

});

it('should handle multiple subscriptions', async (done: any) => {
it('should handle multiple subscriptions (hot)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
const sub = changes.subscribe(() => {}).add(
changes.subscribe(data => {
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
sub.unsubscribe();
})
).add(done);
).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});

it('should handle multiple subscriptions (warm)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
changes.take(1).subscribe(() => {}).add(() => {
const sub = changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
}).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});
});

it('should handle multiple subscriptions + cold observer', async (done: any) => {
it('should handle multiple subscriptions (cold)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
const sub = changes.take(1).subscribe(() => {
const sub = changes.subscribe(() => {
sub.unsubscribe();
}).add(() => {
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
}).add(done);
}).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});
});

Expand Down Expand Up @@ -132,11 +149,13 @@ describe('AngularFirestoreCollection', () => {
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.snapshotChanges();
const sub = changes.subscribe(() => {}).add(
changes.subscribe(data => {
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
sub.unsubscribe();
})
).add(done);
).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});

it('should update order on queries', async (done) => {
Expand Down
4 changes: 3 additions & 1 deletion src/firestore/observable/fromRef.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import 'rxjs/add/operator/share';

function _fromRef<T, R>(ref: Reference<T>): Observable<R> {
const ref$ = new Observable(subscriber => {
subscriber.next(undefined); // fire an undefined to let subcribers know this is a new Observable
const unsubscribe = ref.onSnapshot(subscriber);
return { unsubscribe };
});
Expand All @@ -23,9 +24,10 @@ export function fromRef<R>(ref: firebase.firestore.DocumentReference | firebase.

export function fromDocRef(ref: firebase.firestore.DocumentReference): Observable<Action<firebase.firestore.DocumentSnapshot>>{
return fromRef<firebase.firestore.DocumentSnapshot>(ref)
.filter(payload => !!payload)
.map(payload => ({ payload, type: 'value' }));
}

export function fromCollectionRef(ref: firebase.firestore.Query): Observable<Action<firebase.firestore.QuerySnapshot>> {
return fromRef<firebase.firestore.QuerySnapshot>(ref).map(payload => ({ payload, type: 'query' }))
return fromRef<firebase.firestore.QuerySnapshot>(ref).map(payload => payload && ({ payload, type: 'query' }))
}