Skip to content

Commit

Permalink
bugfix rsocket-ws-gateway, serviceCall (remote & local): serviceCall …
Browse files Browse the repository at this point in the history
…unsubscribe from stream didn't unsubscribe the service (#265)
  • Loading branch information
idanilt authored Oct 2, 2020
1 parent e68d2e4 commit 669510d
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 25 deletions.
14 changes: 12 additions & 2 deletions packages/rsocket-ws-gateway/src/createGatewayProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ interface Proxy {
type ConnectionOptions = Partial<{
keepAlive: number;
lifetime: number;
}>
}>;

export function createGatewayProxy(
url: string,
Expand Down Expand Up @@ -94,7 +94,7 @@ const connect = (url, options: ConnectionOptions = {}) => {
},
onError: (error: any) => {
// console.log('Err', error);
reject({ message: 'Connection error' });
reject({ message: 'Connection error ' + error.toString() });
},
});
});
Expand Down Expand Up @@ -125,6 +125,10 @@ const requestResponse = (socket, qualifier) => {
const requestStream = (socket, qualifier) => {
return (...args) => {
return new Observable((observer) => {
let canceled = false;
let cancel = () => {
canceled = true;
};
socket
.requestStream({
data: {
Expand All @@ -134,6 +138,11 @@ const requestStream = (socket, qualifier) => {
})
.subscribe({
onSubscribe(subscription) {
if (canceled) {
subscription.cancel();
return;
}
cancel = subscription.cancel;
subscription.request(2147483647);
},
onNext: ({ data }) => {
Expand All @@ -146,6 +155,7 @@ const requestStream = (socket, qualifier) => {
observer.error(e);
},
});
return () => cancel();
});
};
};
21 changes: 14 additions & 7 deletions packages/rsocket-ws-gateway/src/requestStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@ import { Flowable } from 'rsocket-flowable';
import { RequestHandler } from './api/Gateway';

const flowableHandler: RequestHandler = (serviceCall, data, subscriber) => {
subscriber.onSubscribe();
serviceCall.requestStream(data).subscribe(
(response: any) => {
subscriber.onNext({ data: response });
let sub;
subscriber.onSubscribe({
cancel: () => {
sub && sub.unsubscribe();
},
(error: any) => subscriber.onError(error),
() => subscriber.onComplete()
);
request: () => {
sub = serviceCall.requestStream(data).subscribe(
(response: any) => {
subscriber.onNext({ data: response });
},
(error: any) => subscriber.onError(error),
() => subscriber.onComplete()
);
},
});
};

export const requestStream = ({ data }, serviceCall, handler = flowableHandler) => {
Expand Down
44 changes: 42 additions & 2 deletions packages/rsocket-ws-gateway/tests/gateway.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
import { Gateway as GatewayInterface } from '../src/api/Gateway';
import { Gateway } from '../src/Gateway';
import { createGatewayProxy } from '../src/createGatewayProxy';
import { from, throwError } from 'rxjs';
import { from, throwError, interval } from 'rxjs';
import { finalize, map } from 'rxjs/operators';
import { createMicroservice, ASYNC_MODEL_TYPES } from '@scalecube/browser';

let eDone = false;
let fDone = false;
class ServiceA {
public methodA() {
return Promise.resolve({ id: 1 });
Expand All @@ -31,6 +34,16 @@ class ServiceA {
public methodD() {
return throwError(new Error('methodD error'));
}
public methodE() {
return interval(1000).pipe(finalize(() => (eDone = true)));
}
public methodF() {
return interval(1000).pipe(
finalize(() => {
fDone = true;
})
);
}
}

export const definition = {
Expand All @@ -40,6 +53,8 @@ export const definition = {
methodB: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_RESPONSE },
methodC: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_STREAM },
methodD: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_STREAM },
methodE: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_STREAM },
methodF: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_STREAM },
},
};

Expand Down Expand Up @@ -84,8 +99,33 @@ test('requestStream', (done) => {
);
});

test('requestStream unsubscribe to cancel service call', (done) => {
const s = proxy.methodE().subscribe();
s.unsubscribe();
setTimeout(() => {
expect(eDone).toBe(true);
done();
}, 20);
});
test('requestStream error to cancel service call', (done) => {
proxy
.methodF()
.pipe(
map(() => {
throw Error();
})
)
.subscribe(
() => {},
() => {}
);
setTimeout(() => {
expect(fDone).toBe(true);
done();
}, 2000);
});

test('fail requestStream', (done) => {
const responses = [1, 2];
proxy.methodD().subscribe(
done.fail,
(e) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/rsocket-ws-gateway/tests/start.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ test(`Given microservices with gateway
const gateway = new Gateway({ port: 8050 });
// gateway.start({ serviceCall });
createGatewayProxy('ws://localhost:8050', definition).catch((e) => {
expect(e.message).toBe('Connection error');
expect(e.message).toContain('Connection error');
done();
});
});
Expand Down
26 changes: 14 additions & 12 deletions packages/scalecube-microservice/src/ServiceCall/LocalCall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,20 @@ export const localCall = ({ localService, asyncModel, message, microserviceConte
switch (asyncModel) {
case ASYNC_MODEL_TYPES.REQUEST_STREAM:
return new Observable((obs: any) => {
check.isFunction(invoke.subscribe)
? invoke.subscribe(
(...data: any) => obs.next(...data),
(err: Error) => obs.error(err),
() => obs.complete()
)
: obs.error(
serviceCallError({
errorMessage: getIncorrectServiceImplementForObservable(microserviceContext.whoAmI, message.qualifier),
microserviceContext,
})
);
if (check.isFunction(invoke.subscribe)) {
const s = invoke.subscribe(
(...data: any) => obs.next(...data),
(err: Error) => obs.error(err),
() => obs.complete()
);
return () => s.unsubscribe();
}
obs.error(
serviceCallError({
errorMessage: getIncorrectServiceImplementForObservable(microserviceContext.whoAmI, message.qualifier),
microserviceContext,
})
);
});
case ASYNC_MODEL_TYPES.REQUEST_RESPONSE:
return new Promise((resolve, reject) => {
Expand Down
11 changes: 10 additions & 1 deletion packages/scalecube-microservice/src/ServiceCall/RemoteCall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,30 @@ export const remoteCall = (options: RemoteCallOptions) => {

switch (asyncModel) {
case ASYNC_MODEL_TYPES.REQUEST_STREAM:
let canceled = false;
let cancel = () => {
canceled = true;
};
return new Observable((obs: any) => {
getValidEndpoint(options)
.then((endpoint: MicroserviceApi.Endpoint) => {
transportClient
.start({ remoteAddress: endpoint.address, logger })
.then(({ requestStream }: TransportApi.Invoker) => {
requestStream(message).subscribe(
if (canceled) {
return;
}
const sub = requestStream(message).subscribe(
(data: any) => obs.next(data),
(err: Error) => obs.error(err),
() => obs.complete()
);
cancel = () => sub.unsubscribe();
})
.catch((error: Error) => obs.error(error));
})
.catch((error: Error) => obs.error(error));
return () => cancel();
});

case ASYNC_MODEL_TYPES.REQUEST_RESPONSE:
Expand Down

0 comments on commit 669510d

Please sign in to comment.