Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix rsocket-ws-gateway, serviceCall (remote & local): serviceCall unsubscribe from stream didn't unsubscribe the service #265

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions packages/rsocket-ws-gateway/src/createGatewayProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ interface Proxy {
type ConnectionOptions = Partial<{
keepAlive: number;
lifetime: number;
}>
}>;

export function createGatewayProxy(
url: string,
Expand Down Expand Up @@ -94,7 +94,7 @@ const connect = (url, options: ConnectionOptions = {}) => {
},
onError: (error: any) => {
// console.log('Err', error);
reject({ message: 'Connection error' });
reject({ message: 'Connection error ' + error.toString() });
},
});
});
Expand Down Expand Up @@ -125,6 +125,10 @@ const requestResponse = (socket, qualifier) => {
const requestStream = (socket, qualifier) => {
return (...args) => {
return new Observable((observer) => {
let canceled = false;
let cancel = () => {
canceled = true;
};
socket
.requestStream({
data: {
Expand All @@ -134,6 +138,11 @@ const requestStream = (socket, qualifier) => {
})
.subscribe({
onSubscribe(subscription) {
if (canceled) {
subscription.cancel();
return;
}
cancel = subscription.cancel;
subscription.request(2147483647);
},
onNext: ({ data }) => {
Expand All @@ -146,6 +155,7 @@ const requestStream = (socket, qualifier) => {
observer.error(e);
},
});
return () => cancel();
});
};
};
21 changes: 14 additions & 7 deletions packages/rsocket-ws-gateway/src/requestStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@ import { Flowable } from 'rsocket-flowable';
import { RequestHandler } from './api/Gateway';

const flowableHandler: RequestHandler = (serviceCall, data, subscriber) => {
subscriber.onSubscribe();
serviceCall.requestStream(data).subscribe(
(response: any) => {
subscriber.onNext({ data: response });
let sub;
subscriber.onSubscribe({
cancel: () => {
sub && sub.unsubscribe();
},
(error: any) => subscriber.onError(error),
() => subscriber.onComplete()
);
request: () => {
sub = serviceCall.requestStream(data).subscribe(
(response: any) => {
subscriber.onNext({ data: response });
},
(error: any) => subscriber.onError(error),
() => subscriber.onComplete()
);
},
});
};

export const requestStream = ({ data }, serviceCall, handler = flowableHandler) => {
Expand Down
44 changes: 42 additions & 2 deletions packages/rsocket-ws-gateway/tests/gateway.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
import { Gateway as GatewayInterface } from '../src/api/Gateway';
import { Gateway } from '../src/Gateway';
import { createGatewayProxy } from '../src/createGatewayProxy';
import { from, throwError } from 'rxjs';
import { from, throwError, interval } from 'rxjs';
import { finalize, map } from 'rxjs/operators';
import { createMicroservice, ASYNC_MODEL_TYPES } from '@scalecube/browser';

let eDone = false;
let fDone = false;
class ServiceA {
public methodA() {
return Promise.resolve({ id: 1 });
Expand All @@ -31,6 +34,16 @@ class ServiceA {
public methodD() {
return throwError(new Error('methodD error'));
}
public methodE() {
return interval(1000).pipe(finalize(() => (eDone = true)));
}
public methodF() {
return interval(1000).pipe(
finalize(() => {
fDone = true;
})
);
}
}

export const definition = {
Expand All @@ -40,6 +53,8 @@ export const definition = {
methodB: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_RESPONSE },
methodC: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_STREAM },
methodD: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_STREAM },
methodE: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_STREAM },
methodF: { asyncModel: ASYNC_MODEL_TYPES.REQUEST_STREAM },
},
};

Expand Down Expand Up @@ -84,8 +99,33 @@ test('requestStream', (done) => {
);
});

test('requestStream unsubscribe to cancel service call', (done) => {
const s = proxy.methodE().subscribe();
s.unsubscribe();
setTimeout(() => {
expect(eDone).toBe(true);
done();
}, 20);
});
test('requestStream error to cancel service call', (done) => {
proxy
.methodF()
.pipe(
map(() => {
throw Error();
})
)
.subscribe(
() => {},
() => {}
);
setTimeout(() => {
expect(fDone).toBe(true);
done();
}, 2000);
});

test('fail requestStream', (done) => {
const responses = [1, 2];
proxy.methodD().subscribe(
done.fail,
(e) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/rsocket-ws-gateway/tests/start.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ test(`Given microservices with gateway
const gateway = new Gateway({ port: 8050 });
// gateway.start({ serviceCall });
createGatewayProxy('ws://localhost:8050', definition).catch((e) => {
expect(e.message).toBe('Connection error');
expect(e.message).toContain('Connection error');
done();
});
});
Expand Down
26 changes: 14 additions & 12 deletions packages/scalecube-microservice/src/ServiceCall/LocalCall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,20 @@ export const localCall = ({ localService, asyncModel, message, microserviceConte
switch (asyncModel) {
case ASYNC_MODEL_TYPES.REQUEST_STREAM:
return new Observable((obs: any) => {
check.isFunction(invoke.subscribe)
? invoke.subscribe(
(...data: any) => obs.next(...data),
(err: Error) => obs.error(err),
() => obs.complete()
)
: obs.error(
serviceCallError({
errorMessage: getIncorrectServiceImplementForObservable(microserviceContext.whoAmI, message.qualifier),
microserviceContext,
})
);
if (check.isFunction(invoke.subscribe)) {
const s = invoke.subscribe(
(...data: any) => obs.next(...data),
(err: Error) => obs.error(err),
() => obs.complete()
);
return () => s.unsubscribe();
}
obs.error(
serviceCallError({
errorMessage: getIncorrectServiceImplementForObservable(microserviceContext.whoAmI, message.qualifier),
microserviceContext,
})
);
});
case ASYNC_MODEL_TYPES.REQUEST_RESPONSE:
return new Promise((resolve, reject) => {
Expand Down
11 changes: 10 additions & 1 deletion packages/scalecube-microservice/src/ServiceCall/RemoteCall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,30 @@ export const remoteCall = (options: RemoteCallOptions) => {

switch (asyncModel) {
case ASYNC_MODEL_TYPES.REQUEST_STREAM:
let canceled = false;
let cancel = () => {
canceled = true;
};
return new Observable((obs: any) => {
getValidEndpoint(options)
.then((endpoint: MicroserviceApi.Endpoint) => {
transportClient
.start({ remoteAddress: endpoint.address, logger })
.then(({ requestStream }: TransportApi.Invoker) => {
requestStream(message).subscribe(
if (canceled) {
return;
}
const sub = requestStream(message).subscribe(
(data: any) => obs.next(data),
(err: Error) => obs.error(err),
() => obs.complete()
);
cancel = () => sub.unsubscribe();
})
.catch((error: Error) => obs.error(error));
})
.catch((error: Error) => obs.error(error));
return () => cancel();
});

case ASYNC_MODEL_TYPES.REQUEST_RESPONSE:
Expand Down