From a6768b62a4448a38761e89d2f495c7bf131eec2d Mon Sep 17 00:00:00 2001 From: xstefank Date: Tue, 17 Sep 2024 15:24:21 +0200 Subject: [PATCH] Fix gRPC DevUI testing console --- .../resources/dev-ui/qwc-grpc-services.js | 75 ++++++++++++++----- .../runtime/devui/GrpcJsonRPCService.java | 65 ++++++++++++---- 2 files changed, 105 insertions(+), 35 deletions(-) diff --git a/extensions/grpc/deployment/src/main/resources/dev-ui/qwc-grpc-services.js b/extensions/grpc/deployment/src/main/resources/dev-ui/qwc-grpc-services.js index 26caaee30abe7..1675b06f73056 100644 --- a/extensions/grpc/deployment/src/main/resources/dev-ui/qwc-grpc-services.js +++ b/extensions/grpc/deployment/src/main/resources/dev-ui/qwc-grpc-services.js @@ -233,20 +233,22 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) { _renderCommandButtons(service, method){ if(this._streamsMap.size >=0){ - if(method.type == 'UNARY'){ - return html` this._clear(service.name, method)} @mouseup=${() => this._default(service.name, method)}>Reset + if(method.type == 'UNARY' || method.type == 'SERVER_STREAMING'){ + return html` this._default(service.name, method)}>Reset this._test(service, method)}>Send`; }else if(this._isRunning(service.name, method)){ - return html`
this._test(service, method)}>Cancel stream -
`; + return html` this._default(service.name, method)}>Reset + this._test(service, method)}>Send + this._disconnect(service, method)}>Disconnect + `; }else { - return html` this._test(service, method)}>Start stream`; + return html` this._test(service, method)}>Send`; } } } _keypress(e, service, method){ - if(method.type == 'UNARY' || !this._isRunning(service.name, method)){ + if(method.type == 'UNARY' || method.type == 'SERVER_STREAMING' || !this._isRunning(service.name, method)){ if ((e.keyCode == 10 || e.keyCode == 13) && e.ctrlKey){ // ctlr-enter this._test(service, method); } @@ -268,46 +270,75 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) { } _default(serviceName, method){ + let requestTextArea = this._requestTextArea(serviceName, method); + requestTextArea.content = ''; let pv = JSON.parse(method.prototype); - this._requestTextArea(serviceName, method).populatePrettyJson(JSON.stringify(pv)); + let prettyJson = JSON.stringify(pv, null, 2); + requestTextArea.populatePrettyJson(prettyJson); } _test(service, method){ - let textArea = this._requestTextArea(service.name, method); - let content = textArea.getAttribute('value'); + let requestTextArea = this._requestTextArea(service.name, method); + let content = requestTextArea.getAttribute('value'); + let id = this._id(service.name, method); + let responseTextArea = this._responseTextArea(service.name, method); if(method.type == 'UNARY'){ this.jsonRpc.testService({ + id: id, serviceName: service.name, methodName: method.bareMethodName, methodType: method.type, content: content }).then(jsonRpcResponse => { - const jsonObject = JSON.parse(jsonRpcResponse.result); - const prettyJson = JSON.stringify(jsonObject, null, 2); - this._responseTextArea(service.name, method).populatePrettyJson(prettyJson); + this._responseTextArea(service.name, method).populatePrettyJson(this._prettyJson(jsonRpcResponse.result)); }); }else{ - let id = this._id(service.name, method); if(this._isRunning(service.name, method)){ - this._streamsMap.get(id).cancel(); - this._streamsMap.delete(id); - this._clear(service.name, method); - this._default(service.name, method); + this.jsonRpc.streamService({ + id: id, + serviceName: service.name, + methodName: method.bareMethodName, + isRunning: true, + content: content + }); + // this._streamsMap.get(id).cancel(); + // this._streamsMap.delete(id); + // this._clear(service.name, method); + // this._default(service.name, method); }else{ + // starting a new stream, clear the response area + responseTextArea.content = null; let cancelable = this.jsonRpc.streamService({ + id: id, serviceName: service.name, methodName: method.bareMethodName, - methodType: method.type, + isRunning: false, content: content }).onNext(jsonRpcResponse => { - this._responseTextArea(service.name, method).populatePrettyJson(jsonRpcResponse.result); + if (responseTextArea.content == null) { + responseTextArea.populatePrettyJson(this._prettyJson(jsonRpcResponse.result)); + } else { + responseTextArea.populatePrettyJson(responseTextArea.content + '\n' + this._prettyJson(jsonRpcResponse.result)); + } }); - this._streamsMap.set(id, cancelable); + if (method.type == 'BIDI_STREAMING' || method.type == 'CLIENT_STREAMING') { + this._streamsMap.set(id, cancelable); + } } this._testerButtons = this._renderCommandButtons(service, method); this._forceUpdate(); } } + + _disconnect(service, method){ + let id = this._id(service.name, method); + this.jsonRpc.disconnectService({ + id: id, + }); + this._streamsMap.delete(id); + this._testerButtons = this._renderCommandButtons(service, method); + this._forceUpdate(); + } _forceUpdate(){ if(this._detailsOpenedItem.length > 0){ @@ -332,5 +363,9 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) { _responseId(serviceName, method){ return serviceName + '/' + method.bareMethodName + '_response'; } + + _prettyJson(content){ + return JSON.stringify(JSON.parse(content), null, 2); + } } customElements.define('qwc-grpc-services', QwcGrpcServices); \ No newline at end of file diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devui/GrpcJsonRPCService.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devui/GrpcJsonRPCService.java index 78835d4fd51fb..7d367e3d43710 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devui/GrpcJsonRPCService.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devui/GrpcJsonRPCService.java @@ -18,6 +18,7 @@ import com.google.protobuf.util.JsonFormat; import io.grpc.Channel; +import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; import io.grpc.ServiceDescriptor; import io.grpc.netty.NettyChannelBuilder; @@ -45,6 +46,7 @@ public class GrpcJsonRPCService { private static final Logger LOG = Logger.getLogger(GrpcJsonRPCService.class); private Map grpcServiceClassInfos; + private Map> callsInProgress; @Inject HttpConfiguration httpConfiguration; @@ -72,6 +74,7 @@ public void init() { this.ssl = isTLSConfigured(httpConfiguration.ssl.certificate); } this.grpcServiceClassInfos = getGrpcServiceClassInfos(); + this.callsInProgress = new HashMap<>(); } private boolean isTLSConfigured(CertificateConfig certificate) { @@ -107,25 +110,27 @@ public JsonArray getServices() { return services; } - public Uni testService(String serviceName, String methodName, String methodType, String content) { + public Uni testService(String id, String serviceName, String methodName, String content) { try { - return streamService(serviceName, methodName, methodType, content).toUni(); + return streamService(id, serviceName, methodName, false, content).toUni(); } catch (Throwable t) { return Uni.createFrom().item(error(t.getMessage()).encodePrettily()); } } - public Multi streamService(String serviceName, String methodName, String methodType, String content) + public Multi streamService(String id, String serviceName, String methodName, boolean isRunning, + String content) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InvalidProtocolBufferException { if (content == null) { - return Multi.createFrom().item(error("Invalid messsge").encodePrettily()); + return Multi.createFrom().item(error("Invalid message").encodePrettily()); } BroadcastProcessor streamEvent = BroadcastProcessor.create(); GrpcServiceClassInfo info = this.grpcServiceClassInfos.get(serviceName); - Object grpcStub = createStub(info.grpcServiceClass, host, port); + ManagedChannel channel = getChannel(host, port); + Object grpcStub = createStub(info.grpcServiceClass, channel); ServiceDescriptor serviceDescriptor = info.serviceDescriptor; @@ -134,20 +139,50 @@ public Multi streamService(String serviceName, String methodName, String MethodDescriptor.PrototypeMarshaller protoMarshaller = (MethodDescriptor.PrototypeMarshaller) requestMarshaller; Class requestType = protoMarshaller.getMessagePrototype().getClass(); + Message message = createMessage(content, requestType); + + if (isRunning) { + // we are already connected with this gRPC endpoint, just send the message + callsInProgress.get(id).onNext(message); + } else { + // Invoke the stub method and format the response as JSON + StreamObserver responseObserver = new TestObserver<>(streamEvent); + StreamObserver incomingStream; + + final Method stubMethod = getStubMethod(grpcStub, methodDescriptor.getBareMethodName()); + + if (stubMethod.getParameterCount() == 1 && stubMethod.getReturnType() == StreamObserver.class) { + // returned StreamObserver consumes incoming messages + //noinspection unchecked + incomingStream = (StreamObserver) stubMethod.invoke(grpcStub, responseObserver); + callsInProgress.put(id, incomingStream); + // will be streamed continuously + incomingStream.onNext(message); + } else { + // incoming message should be passed as the first parameter of the invocation + stubMethod.invoke(grpcStub, message, responseObserver); + } + } + + channel.shutdown(); + return streamEvent; + } + + private static Message createMessage(String content, Class requestType) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InvalidProtocolBufferException { // Create a new builder for the request message, e.g. HelloRequest.newBuilder() Method newBuilderMethod = requestType.getDeclaredMethod("newBuilder"); Message.Builder builder = (Message.Builder) newBuilderMethod.invoke(null); // Use the test data to build the request object JsonFormat.parser().merge(content, builder); - Message message = builder.build(); - - StreamObserver responseObserver = new TestObserver(streamEvent); - - final Method stubMethod = getStubMethod(grpcStub, methodDescriptor.getBareMethodName()); - stubMethod.invoke(grpcStub, message, responseObserver); + return builder.build(); + } - return streamEvent; + public Uni disconnectService(String id) { + callsInProgress.get(id).onCompleted(); + callsInProgress.remove(id); + return Uni.createFrom().voidItem(); } private Map getGrpcServiceClassInfos() { @@ -220,17 +255,17 @@ private MethodDescriptor getMethodDescriptor(ServiceDescriptor serviceDescriptor return null; } - private Object createStub(Class grpcServiceClass, String host, int port) { + private Object createStub(Class grpcServiceClass, Channel channel) { try { Method stubFactoryMethod = grpcServiceClass.getDeclaredMethod("newStub", Channel.class); - return stubFactoryMethod.invoke(null, getChannel(host, port)); + return stubFactoryMethod.invoke(null, channel); } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { LOG.warnf("Could not create stub for %s - " + e.getMessage(), grpcServiceClass); return null; } } - private Channel getChannel(String host, int port) { + private ManagedChannel getChannel(String host, int port) { return NettyChannelBuilder .forAddress(host, port) .usePlaintext()