From 2e3c8fa46c8fe74e3dba74c38fa18f46f4230386 Mon Sep 17 00:00:00 2001 From: IdanILT Date: Wed, 30 Sep 2020 19:08:54 +0300 Subject: [PATCH 1/3] bugfix rsocket-ws-gateway, serviceCall (remote & local): serviceCall unsubscribe from stream didn't unsubscribe the service --- .../src/createGatewayProxy.ts | 14 +++++- .../rsocket-ws-gateway/src/requestStream.ts | 10 ++++- .../rsocket-ws-gateway/tests/gateway.spec.ts | 44 ++++++++++++++++++- .../src/ServiceCall/LocalCall.ts | 26 ++++++----- .../src/ServiceCall/RemoteCall.ts | 11 ++++- 5 files changed, 86 insertions(+), 19 deletions(-) diff --git a/packages/rsocket-ws-gateway/src/createGatewayProxy.ts b/packages/rsocket-ws-gateway/src/createGatewayProxy.ts index cdee15c8..57194f15 100644 --- a/packages/rsocket-ws-gateway/src/createGatewayProxy.ts +++ b/packages/rsocket-ws-gateway/src/createGatewayProxy.ts @@ -11,7 +11,7 @@ interface Proxy { type ConnectionOptions = Partial<{ keepAlive: number; lifetime: number; -}> +}>; export function createGatewayProxy( url: string, @@ -94,7 +94,7 @@ const connect = (url, options: ConnectionOptions = {}) => { }, onError: (error: any) => { // console.log('Err', error); - reject({ message: 'Connection error' }); + reject({ message: 'Connection error ' + error.toString() }); }, }); }); @@ -125,6 +125,10 @@ const requestResponse = (socket, qualifier) => { const requestStream = (socket, qualifier) => { return (...args) => { return new Observable((observer) => { + let canceled = false; + let cancel = () => { + canceled = true; + }; socket .requestStream({ data: { @@ -134,6 +138,11 @@ const requestStream = (socket, qualifier) => { }) .subscribe({ onSubscribe(subscription) { + if (canceled) { + subscription.cancel(); + return; + } + cancel = subscription.cancel; subscription.request(2147483647); }, onNext: ({ data }) => { @@ -146,6 +155,7 @@ const requestStream = (socket, qualifier) => { observer.error(e); }, }); + return () => cancel(); }); }; }; diff --git a/packages/rsocket-ws-gateway/src/requestStream.ts b/packages/rsocket-ws-gateway/src/requestStream.ts index ba44a9a4..f5bb9a1d 100644 --- a/packages/rsocket-ws-gateway/src/requestStream.ts +++ b/packages/rsocket-ws-gateway/src/requestStream.ts @@ -2,14 +2,20 @@ import { Flowable } from 'rsocket-flowable'; import { RequestHandler } from './api/Gateway'; const flowableHandler: RequestHandler = (serviceCall, data, subscriber) => { - subscriber.onSubscribe(); - serviceCall.requestStream(data).subscribe( + const sub = serviceCall.requestStream(data).subscribe( (response: any) => { subscriber.onNext({ data: response }); }, (error: any) => subscriber.onError(error), () => subscriber.onComplete() ); + + subscriber.onSubscribe({ + cancel: () => { + sub.unsubscribe(); + }, + request: () => {}, + }); }; export const requestStream = ({ data }, serviceCall, handler = flowableHandler) => { diff --git a/packages/rsocket-ws-gateway/tests/gateway.spec.ts b/packages/rsocket-ws-gateway/tests/gateway.spec.ts index 3a10bd89..2dd6de6d 100644 --- a/packages/rsocket-ws-gateway/tests/gateway.spec.ts +++ b/packages/rsocket-ws-gateway/tests/gateway.spec.ts @@ -15,9 +15,12 @@ import { Gateway as GatewayInterface } from '../src/api/Gateway'; import { Gateway } from '../src/Gateway'; import { createGatewayProxy } from '../src/createGatewayProxy'; -import { from, throwError } from 'rxjs'; +import { from, throwError, interval } from 'rxjs'; +import { finalize, map } from 'rxjs/operators'; import { createMicroservice, ASYNC_MODEL_TYPES } from '@scalecube/browser'; +let eDone = false; +let fDone = false; class ServiceA { public methodA() { return Promise.resolve({ id: 1 }); @@ -31,6 +34,16 @@ class ServiceA { public methodD() { return throwError(new Error('methodD error')); } + public methodE() { + return interval(1000).pipe(finalize(() => (eDone = true))); + } + public methodF() { + return interval(1000).pipe( + finalize(() => { + fDone = true; + }) + ); + } } export const definition = { @@ -40,6 +53,8 @@ export const definition = { methodB: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_RESPONSE }, methodC: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_STREAM }, methodD: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_STREAM }, + methodE: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_STREAM }, + methodF: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_STREAM }, }, }; @@ -84,8 +99,33 @@ test('requestStream', (done) => { ); }); +test('requestStream unsubscribe to cancel service call', (done) => { + const s = proxy.methodE().subscribe(); + s.unsubscribe(); + setTimeout(() => { + expect(eDone).toBe(true); + done(); + }, 20); +}); +test('requestStream error to cancel service call', (done) => { + proxy + .methodF() + .pipe( + map(() => { + throw Error(); + }) + ) + .subscribe( + () => {}, + () => {} + ); + setTimeout(() => { + expect(fDone).toBe(true); + done(); + }, 2000); +}); + test('fail requestStream', (done) => { - const responses = [1, 2]; proxy.methodD().subscribe( done.fail, (e) => { diff --git a/packages/scalecube-microservice/src/ServiceCall/LocalCall.ts b/packages/scalecube-microservice/src/ServiceCall/LocalCall.ts index 6ba91a01..20d26f0b 100644 --- a/packages/scalecube-microservice/src/ServiceCall/LocalCall.ts +++ b/packages/scalecube-microservice/src/ServiceCall/LocalCall.ts @@ -62,18 +62,20 @@ export const localCall = ({ localService, asyncModel, message, microserviceConte switch (asyncModel) { case ASYNC_MODEL_TYPES.REQUEST_STREAM: return new Observable((obs: any) => { - check.isFunction(invoke.subscribe) - ? invoke.subscribe( - (...data: any) => obs.next(...data), - (err: Error) => obs.error(err), - () => obs.complete() - ) - : obs.error( - serviceCallError({ - errorMessage: getIncorrectServiceImplementForObservable(microserviceContext.whoAmI, message.qualifier), - microserviceContext, - }) - ); + if (check.isFunction(invoke.subscribe)) { + const s = invoke.subscribe( + (...data: any) => obs.next(...data), + (err: Error) => obs.error(err), + () => obs.complete() + ); + return () => s.unsubscribe(); + } + obs.error( + serviceCallError({ + errorMessage: getIncorrectServiceImplementForObservable(microserviceContext.whoAmI, message.qualifier), + microserviceContext, + }) + ); }); case ASYNC_MODEL_TYPES.REQUEST_RESPONSE: return new Promise((resolve, reject) => { diff --git a/packages/scalecube-microservice/src/ServiceCall/RemoteCall.ts b/packages/scalecube-microservice/src/ServiceCall/RemoteCall.ts index 107973d1..a010432b 100644 --- a/packages/scalecube-microservice/src/ServiceCall/RemoteCall.ts +++ b/packages/scalecube-microservice/src/ServiceCall/RemoteCall.ts @@ -16,21 +16,30 @@ export const remoteCall = (options: RemoteCallOptions) => { switch (asyncModel) { case ASYNC_MODEL_TYPES.REQUEST_STREAM: + let canceled = false; + let cancel = () => { + canceled = true; + }; return new Observable((obs: any) => { getValidEndpoint(options) .then((endpoint: MicroserviceApi.Endpoint) => { transportClient .start({ remoteAddress: endpoint.address, logger }) .then(({ requestStream }: TransportApi.Invoker) => { - requestStream(message).subscribe( + if (canceled) { + return; + } + const sub = requestStream(message).subscribe( (data: any) => obs.next(data), (err: Error) => obs.error(err), () => obs.complete() ); + cancel = () => sub.unsubscribe(); }) .catch((error: Error) => obs.error(error)); }) .catch((error: Error) => obs.error(error)); + return () => cancel(); }); case ASYNC_MODEL_TYPES.REQUEST_RESPONSE: From 25d4e24b95b892df86a50a670178b209c78b18e1 Mon Sep 17 00:00:00 2001 From: IdanILT Date: Thu, 1 Oct 2020 03:00:03 +0300 Subject: [PATCH 2/3] fix next before onsubscribe --- .../rsocket-ws-gateway/src/requestStream.ts | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/packages/rsocket-ws-gateway/src/requestStream.ts b/packages/rsocket-ws-gateway/src/requestStream.ts index f5bb9a1d..e7c3ece4 100644 --- a/packages/rsocket-ws-gateway/src/requestStream.ts +++ b/packages/rsocket-ws-gateway/src/requestStream.ts @@ -2,19 +2,20 @@ import { Flowable } from 'rsocket-flowable'; import { RequestHandler } from './api/Gateway'; const flowableHandler: RequestHandler = (serviceCall, data, subscriber) => { - const sub = serviceCall.requestStream(data).subscribe( - (response: any) => { - subscriber.onNext({ data: response }); - }, - (error: any) => subscriber.onError(error), - () => subscriber.onComplete() - ); - + let sub; subscriber.onSubscribe({ cancel: () => { - sub.unsubscribe(); + sub && sub.unsubscribe(); + }, + request: () => { + sub = serviceCall.requestStream(data).subscribe( + (response: any) => { + subscriber.onNext({ data: response }); + }, + (error: any) => subscriber.onError(error), + () => subscriber.onComplete() + ); }, - request: () => {}, }); }; From e5bb9db971d1167b819aaa0aea575b43b18e0e53 Mon Sep 17 00:00:00 2001 From: IdanILT Date: Thu, 1 Oct 2020 03:15:48 +0300 Subject: [PATCH 3/3] fix test connection error --- packages/rsocket-ws-gateway/tests/start.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/rsocket-ws-gateway/tests/start.spec.ts b/packages/rsocket-ws-gateway/tests/start.spec.ts index 1eac74f0..02dbeba6 100644 --- a/packages/rsocket-ws-gateway/tests/start.spec.ts +++ b/packages/rsocket-ws-gateway/tests/start.spec.ts @@ -20,7 +20,7 @@ test(`Given microservices with gateway const gateway = new Gateway({ port: 8050 }); // gateway.start({ serviceCall }); createGatewayProxy('ws://localhost:8050', definition).catch((e) => { - expect(e.message).toBe('Connection error'); + expect(e.message).toContain('Connection error'); done(); }); });