diff --git a/.changeset/sour-meals-compete.md b/.changeset/sour-meals-compete.md
index 81ea0f264eb..923523babd9 100644
--- a/.changeset/sour-meals-compete.md
+++ b/.changeset/sour-meals-compete.md
@@ -2,4 +2,4 @@
"@apollo/client": patch
---
-Improve consistency when unsubscribing Concast with rollback when new observer subscribes to otherwise-active Concast right after last observer unsubscribes
+Delay Concast subscription teardown slightly in `useSubscription` to prevent unexpected Concast closure when one `useSubscription` hook tears down its in-flight Concast subscription immediately followed by another `useSubscription` hook reusing and subscribing to that same Concast
diff --git a/src/react/hooks/__tests__/useSubscription.test.tsx b/src/react/hooks/__tests__/useSubscription.test.tsx
index af802470c4d..ca84c5f35a9 100644
--- a/src/react/hooks/__tests__/useSubscription.test.tsx
+++ b/src/react/hooks/__tests__/useSubscription.test.tsx
@@ -996,4 +996,73 @@ describe('useSubscription Hook', () => {
);
});
});
+
+ it('should handle simple subscription after old in-flight teardown immediately \
+followed by new in-flight setup', async () => {
+ const subscription = gql`
+ subscription {
+ car {
+ make
+ }
+ }
+ `;
+
+ const results = ['Audi', 'BMW'].map(make => ({
+ result: { data: { car: { make } } },
+ }));
+
+ const link = new MockSubscriptionLink();
+ const client = new ApolloClient({
+ link,
+ cache: new Cache({ addTypename: false })
+ });
+
+ const { result, unmount, rerender } = renderHook(
+ ({ coin }) => {
+ const heads = useSubscription(subscription, {
+ variables: {},
+ skip: coin === 'tails',
+ context: { coin: 'heads' }
+ });
+ const tails = useSubscription(subscription, {
+ variables: {},
+ skip: coin === 'heads',
+ context: { coin: 'tails' }
+ });
+ return { heads, tails };
+ },
+ {
+ initialProps: {
+ coin: 'heads'
+ },
+ wrapper: ({ children }) => (
+
+ {children}
+
+ )
+ },
+ );
+
+ rerender({ coin: 'tails' });
+
+ await new Promise(resolve => setTimeout(() => resolve('wait'), 20));
+
+ link.simulateResult(results[0]);
+
+ await waitFor(() => {
+ expect(result.current.tails.data).toEqual(results[0].result.data);
+ }, { interval: 1 });
+ expect(result.current.heads.data).toBeUndefined();
+
+ rerender({ coin: 'heads' });
+
+ link.simulateResult(results[1]);
+
+ await waitFor(() => {
+ expect(result.current.heads.data).toEqual(results[1].result.data);
+ }, { interval: 1 });
+ expect(result.current.tails.data).toBeUndefined();
+
+ unmount();
+ });
});
diff --git a/src/react/hooks/useSubscription.ts b/src/react/hooks/useSubscription.ts
index bce2aa555b6..903ab691c9d 100644
--- a/src/react/hooks/useSubscription.ts
+++ b/src/react/hooks/useSubscription.ts
@@ -160,7 +160,18 @@ export function useSubscription {
+ // immediately stop receiving subscription values, but do not unsubscribe
+ // until after a short delay in case another useSubscription hook is
+ // reusing the same underlying observable and is about to subscribe
+ const ignoredSubscription = observable.subscribe({
+ next() {},
+ error() {},
+ complete() {},
+ });
subscription.unsubscribe();
+ setTimeout(() => {
+ ignoredSubscription.unsubscribe();
+ });
};
}, [observable]);
diff --git a/src/utilities/observables/Concast.ts b/src/utilities/observables/Concast.ts
index 36126864fbd..deff3ce1730 100644
--- a/src/utilities/observables/Concast.ts
+++ b/src/utilities/observables/Concast.ts
@@ -152,40 +152,15 @@ export class Concast extends Observable {
// easy way to observe the final state of the Concast.
private resolve: (result?: T | PromiseLike) => void;
private reject: (reason: any) => void;
- private internalPromise = new Promise((resolve, reject) => {
+ public readonly promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
- public get promise() {
- return this.internalPromise;
- }
// Name and argument of the most recently invoked observer method, used
// to deliver latest results immediately to new observers.
private latest?: ["next", T] | ["error", any];
- // Delay unsubscribing from the underlying subscription slightly,
- // so that immediately subscribing another observer can keep the
- // subscription active.
- private deferredUnsubscribe() {
- const { sub } = this;
- this.sub = null;
- const isError = this.latest?.[0] === 'error';
- if (sub) {
- setTimeout(() => {
- if (this.observers.size > 0 && !sub.closed && !isError) {
- this.sub = sub;
- this.internalPromise = new Promise((resolve, reject) => {
- this.resolve = resolve;
- this.reject = reject;
- });
- } else {
- sub.unsubscribe();
- }
- });
- }
- }
-
// Bound handler functions that can be reused for every internal
// subscription.
private handlers = {
@@ -200,7 +175,11 @@ export class Concast extends Observable {
error: (error: any) => {
const { sub } = this;
if (sub !== null) {
- this.deferredUnsubscribe();
+ // Delay unsubscribing from the underlying subscription slightly,
+ // so that immediately subscribing another observer can keep the
+ // subscription active.
+ if (sub) setTimeout(() => sub.unsubscribe());
+ this.sub = null;
this.latest = ["error", error];
this.reject(error);
this.notify("error", error);
@@ -218,7 +197,8 @@ export class Concast extends Observable {
// eventually have been initialized to a non-empty array.
const value = sources.shift();
if (!value) {
- this.deferredUnsubscribe();
+ if (sub) setTimeout(() => sub.unsubscribe());
+ this.sub = null;
if (this.latest &&
this.latest[0] === "next") {
this.resolve(this.latest[1]);
diff --git a/src/utilities/observables/__tests__/Concast.ts b/src/utilities/observables/__tests__/Concast.ts
index c5ed20be234..c14746a2a3d 100644
--- a/src/utilities/observables/__tests__/Concast.ts
+++ b/src/utilities/observables/__tests__/Concast.ts
@@ -172,63 +172,4 @@ describe("Concast Observable (similar to Behavior Subject in RxJS)", () => {
sub.unsubscribe();
});
});
-
- itAsync("maintains subscription and promise when replacing last observer", (resolve, reject) => {
- const concast = new Concast([
- new Observable(observer => {
- const timerIds = [setTimeout(() => {
- observer.next(1);
- timerIds.push(setTimeout(() => {
- observer.next(2);
- observer.complete();
- }, 100))
- }, 100)];
-
- return () => {
- for (const timerId of timerIds) {
- clearTimeout(timerId);
- }
- };
- })
- ]);
- const observer = {
- next() {
- reject(new Error("should not have called observer.next"));
- },
- error: reject,
- complete() {
- reject(new Error("should not have called observer.complete"));
- },
- };
-
- concast.addObserver(observer);
- concast.removeObserver(observer);
-
- const secondObserver = {
- ...observer,
- next(value: number) {
- if (value === 1) {
- Promise.race([
- new Promise(resolve => setTimeout(() => resolve(true))),
- concast.promise.then(() => false, () => false)
- ]).then(isPending => {
- if (!isPending) {
- reject("should not have resolved or rejected concast.promise by first observer.next");
- }
- });
- }
- },
- complete() {
- resolve();
- }
- };
- concast.addObserver(secondObserver);
-
- return new Promise(resolve => setTimeout(resolve, 1000)).then(() => {
- if (!(concast as unknown as Record)['sub']) {
- reject(new Error("should not have nulled out concast.sub yet"));
- }
- reject(new Error("unexpectedly timed out"));
- });
- });
});