Skip to content

Commit

Permalink
Shift scope of Concast unsubscription fix from Concast to useSubscrip…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
Jonathan committed Apr 20, 2023
1 parent ac7baea commit 8f7156b
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 88 deletions.
2 changes: 1 addition & 1 deletion .changeset/sour-meals-compete.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
"@apollo/client": patch
---

Improve consistency when unsubscribing Concast with rollback when new observer subscribes to otherwise-active Concast right after last observer unsubscribes
Delay Concast subscription teardown slightly in `useSubscription` to prevent unexpected Concast closure when one `useSubscription` hook tears down its in-flight Concast subscription immediately followed by another `useSubscription` hook reusing and subscribing to that same Concast
69 changes: 69 additions & 0 deletions src/react/hooks/__tests__/useSubscription.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -996,4 +996,73 @@ describe('useSubscription Hook', () => {
);
});
});

it('should handle simple subscription after old in-flight teardown immediately \
followed by new in-flight setup', async () => {
const subscription = gql`
subscription {
car {
make
}
}
`;

const results = ['Audi', 'BMW'].map(make => ({
result: { data: { car: { make } } },
}));

const link = new MockSubscriptionLink();
const client = new ApolloClient({
link,
cache: new Cache({ addTypename: false })
});

const { result, unmount, rerender } = renderHook(
({ coin }) => {
const heads = useSubscription(subscription, {
variables: {},
skip: coin === 'tails',
context: { coin: 'heads' }
});
const tails = useSubscription(subscription, {
variables: {},
skip: coin === 'heads',
context: { coin: 'tails' }
});
return { heads, tails };
},
{
initialProps: {
coin: 'heads'
},
wrapper: ({ children }) => (
<ApolloProvider client={client}>
{children}
</ApolloProvider>
)
},
);

rerender({ coin: 'tails' });

await new Promise(resolve => setTimeout(() => resolve('wait'), 20));

link.simulateResult(results[0]);

await waitFor(() => {
expect(result.current.tails.data).toEqual(results[0].result.data);
}, { interval: 1 });
expect(result.current.heads.data).toBeUndefined();

rerender({ coin: 'heads' });

link.simulateResult(results[1]);

await waitFor(() => {
expect(result.current.heads.data).toEqual(results[1].result.data);
}, { interval: 1 });
expect(result.current.tails.data).toBeUndefined();

unmount();
});
});
11 changes: 11 additions & 0 deletions src/react/hooks/useSubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,18 @@ export function useSubscription<TData = any, TVariables extends OperationVariabl
});

return () => {
// immediately stop receiving subscription values, but do not unsubscribe
// until after a short delay in case another useSubscription hook is
// reusing the same underlying observable and is about to subscribe
const ignoredSubscription = observable.subscribe({
next() {},
error() {},
complete() {},
});
subscription.unsubscribe();
setTimeout(() => {
ignoredSubscription.unsubscribe();
});
};
}, [observable]);

Expand Down
36 changes: 8 additions & 28 deletions src/utilities/observables/Concast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,40 +152,15 @@ export class Concast<T> extends Observable<T> {
// easy way to observe the final state of the Concast.
private resolve: (result?: T | PromiseLike<T>) => void;
private reject: (reason: any) => void;
private internalPromise = new Promise<T>((resolve, reject) => {
public readonly promise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
public get promise() {
return this.internalPromise;
}

// Name and argument of the most recently invoked observer method, used
// to deliver latest results immediately to new observers.
private latest?: ["next", T] | ["error", any];

// Delay unsubscribing from the underlying subscription slightly,
// so that immediately subscribing another observer can keep the
// subscription active.
private deferredUnsubscribe() {
const { sub } = this;
this.sub = null;
const isError = this.latest?.[0] === 'error';
if (sub) {
setTimeout(() => {
if (this.observers.size > 0 && !sub.closed && !isError) {
this.sub = sub;
this.internalPromise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
} else {
sub.unsubscribe();
}
});
}
}

// Bound handler functions that can be reused for every internal
// subscription.
private handlers = {
Expand All @@ -200,7 +175,11 @@ export class Concast<T> extends Observable<T> {
error: (error: any) => {
const { sub } = this;
if (sub !== null) {
this.deferredUnsubscribe();
// Delay unsubscribing from the underlying subscription slightly,
// so that immediately subscribing another observer can keep the
// subscription active.
if (sub) setTimeout(() => sub.unsubscribe());
this.sub = null;
this.latest = ["error", error];
this.reject(error);
this.notify("error", error);
Expand All @@ -218,7 +197,8 @@ export class Concast<T> extends Observable<T> {
// eventually have been initialized to a non-empty array.
const value = sources.shift();
if (!value) {
this.deferredUnsubscribe();
if (sub) setTimeout(() => sub.unsubscribe());
this.sub = null;
if (this.latest &&
this.latest[0] === "next") {
this.resolve(this.latest[1]);
Expand Down
59 changes: 0 additions & 59 deletions src/utilities/observables/__tests__/Concast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,63 +172,4 @@ describe("Concast Observable (similar to Behavior Subject in RxJS)", () => {
sub.unsubscribe();
});
});

itAsync("maintains subscription and promise when replacing last observer", (resolve, reject) => {
const concast = new Concast<number>([
new Observable<number>(observer => {
const timerIds = [setTimeout(() => {
observer.next(1);
timerIds.push(setTimeout(() => {
observer.next(2);
observer.complete();
}, 100))
}, 100)];

return () => {
for (const timerId of timerIds) {
clearTimeout(timerId);
}
};
})
]);
const observer = {
next() {
reject(new Error("should not have called observer.next"));
},
error: reject,
complete() {
reject(new Error("should not have called observer.complete"));
},
};

concast.addObserver(observer);
concast.removeObserver(observer);

const secondObserver = {
...observer,
next(value: number) {
if (value === 1) {
Promise.race([
new Promise(resolve => setTimeout(() => resolve(true))),
concast.promise.then(() => false, () => false)
]).then(isPending => {
if (!isPending) {
reject("should not have resolved or rejected concast.promise by first observer.next");
}
});
}
},
complete() {
resolve();
}
};
concast.addObserver(secondObserver);

return new Promise(resolve => setTimeout(resolve, 1000)).then(() => {
if (!(concast as unknown as Record<string, unknown>)['sub']) {
reject(new Error("should not have nulled out concast.sub yet"));
}
reject(new Error("unexpectedly timed out"));
});
});
});

0 comments on commit 8f7156b

Please sign in to comment.