Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix gRPC DevUI testing console #43337

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,22 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) {

_renderCommandButtons(service, method){
if(this._streamsMap.size >=0){
if(method.type == 'UNARY'){
return html`<vaadin-button theme="secondary error" @mousedown=${() => this._clear(service.name, method)} @mouseup=${() => this._default(service.name, method)}>Reset</vaadin-button>
if(method.type == 'UNARY' || method.type == 'SERVER_STREAMING'){
return html`<vaadin-button theme="secondary error" @click=${() => this._default(service.name, method)}>Reset</vaadin-button>
<vaadin-button theme="secondary success" @click=${() => this._test(service, method)}>Send</vaadin-button>`;
}else if(this._isRunning(service.name, method)){
return html`<div class="streamButtons"><vaadin-button theme="secondary error" @click=${() => this._test(service, method)}>Cancel stream</vaadin-button>
<vaadin-progress-bar class="progress-short" indeterminate></vaadin-progress-bar></div>`;
return html`<vaadin-button theme="secondary error" @click=${() => this._default(service.name, method)}>Reset</vaadin-button>
<vaadin-button theme="secondary success" @click=${() => this._test(service, method)}>Send</vaadin-button>
<vaadin-button theme="secondary error" @click=${() => this._disconnect(service, method)}>Disconnect</vaadin-button>
<vaadin-progress-bar class="progress-short" indeterminate></vaadin-progress-bar>`;
}else {
return html`<vaadin-button theme="secondary success" @click=${() => this._test(service, method)}>Start stream</vaadin-button>`;
return html`<vaadin-button theme="secondary success" @click=${() => this._test(service, method)}>Send</vaadin-button>`;
}
}
}

_keypress(e, service, method){
if(method.type == 'UNARY' || !this._isRunning(service.name, method)){
if(method.type == 'UNARY' || method.type == 'SERVER_STREAMING' || !this._isRunning(service.name, method)){
if ((e.keyCode == 10 || e.keyCode == 13) && e.ctrlKey){ // ctlr-enter
this._test(service, method);
}
Expand All @@ -268,46 +270,75 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) {
}

_default(serviceName, method){
let requestTextArea = this._requestTextArea(serviceName, method);
requestTextArea.content = '';
let pv = JSON.parse(method.prototype);
this._requestTextArea(serviceName, method).populatePrettyJson(JSON.stringify(pv));
let prettyJson = JSON.stringify(pv, null, 2);
requestTextArea.populatePrettyJson(prettyJson);
}

_test(service, method){
let textArea = this._requestTextArea(service.name, method);
let content = textArea.getAttribute('value');
let requestTextArea = this._requestTextArea(service.name, method);
let content = requestTextArea.getAttribute('value');
let id = this._id(service.name, method);
let responseTextArea = this._responseTextArea(service.name, method);
if(method.type == 'UNARY'){
this.jsonRpc.testService({
id: id,
serviceName: service.name,
methodName: method.bareMethodName,
methodType: method.type,
content: content
}).then(jsonRpcResponse => {
const jsonObject = JSON.parse(jsonRpcResponse.result);
const prettyJson = JSON.stringify(jsonObject, null, 2);
this._responseTextArea(service.name, method).populatePrettyJson(prettyJson);
this._responseTextArea(service.name, method).populatePrettyJson(this._prettyJson(jsonRpcResponse.result));
});
}else{
let id = this._id(service.name, method);
if(this._isRunning(service.name, method)){
this._streamsMap.get(id).cancel();
this._streamsMap.delete(id);
this._clear(service.name, method);
this._default(service.name, method);
this.jsonRpc.streamService({
id: id,
serviceName: service.name,
methodName: method.bareMethodName,
isRunning: true,
content: content
});
// this._streamsMap.get(id).cancel();
// this._streamsMap.delete(id);
// this._clear(service.name, method);
// this._default(service.name, method);
}else{
// starting a new stream, clear the response area
responseTextArea.content = null;
let cancelable = this.jsonRpc.streamService({
id: id,
serviceName: service.name,
methodName: method.bareMethodName,
methodType: method.type,
isRunning: false,
content: content
}).onNext(jsonRpcResponse => {
this._responseTextArea(service.name, method).populatePrettyJson(jsonRpcResponse.result);
if (responseTextArea.content == null) {
responseTextArea.populatePrettyJson(this._prettyJson(jsonRpcResponse.result));
} else {
responseTextArea.populatePrettyJson(responseTextArea.content + '\n' + this._prettyJson(jsonRpcResponse.result));
}
});
this._streamsMap.set(id, cancelable);
if (method.type == 'BIDI_STREAMING' || method.type == 'CLIENT_STREAMING') {
this._streamsMap.set(id, cancelable);
}
}
this._testerButtons = this._renderCommandButtons(service, method);
this._forceUpdate();
}
}

_disconnect(service, method){
let id = this._id(service.name, method);
this.jsonRpc.disconnectService({
id: id,
});
this._streamsMap.delete(id);
this._testerButtons = this._renderCommandButtons(service, method);
this._forceUpdate();
}

_forceUpdate(){
if(this._detailsOpenedItem.length > 0){
Expand All @@ -332,5 +363,9 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) {
_responseId(serviceName, method){
return serviceName + '/' + method.bareMethodName + '_response';
}

_prettyJson(content){
return JSON.stringify(JSON.parse(content), null, 2);
}
}
customElements.define('qwc-grpc-services', QwcGrpcServices);
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.protobuf.util.JsonFormat;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.ServiceDescriptor;
import io.grpc.netty.NettyChannelBuilder;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class GrpcJsonRPCService {
private static final Logger LOG = Logger.getLogger(GrpcJsonRPCService.class);

private Map<String, GrpcServiceClassInfo> grpcServiceClassInfos;
private Map<String, StreamObserver<Message>> callsInProgress;

@Inject
HttpConfiguration httpConfiguration;
Expand Down Expand Up @@ -72,6 +74,7 @@ public void init() {
this.ssl = isTLSConfigured(httpConfiguration.ssl.certificate);
}
this.grpcServiceClassInfos = getGrpcServiceClassInfos();
this.callsInProgress = new HashMap<>();
}

private boolean isTLSConfigured(CertificateConfig certificate) {
Expand Down Expand Up @@ -107,25 +110,27 @@ public JsonArray getServices() {
return services;
}

public Uni<String> testService(String serviceName, String methodName, String methodType, String content) {
public Uni<String> testService(String id, String serviceName, String methodName, String content) {
try {
return streamService(serviceName, methodName, methodType, content).toUni();
return streamService(id, serviceName, methodName, false, content).toUni();
} catch (Throwable t) {
return Uni.createFrom().item(error(t.getMessage()).encodePrettily());
}
}

public Multi<String> streamService(String serviceName, String methodName, String methodType, String content)
public Multi<String> streamService(String id, String serviceName, String methodName, boolean isRunning,
String content)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InvalidProtocolBufferException {
if (content == null) {
return Multi.createFrom().item(error("Invalid messsge").encodePrettily());
return Multi.createFrom().item(error("Invalid message").encodePrettily());
}

BroadcastProcessor<String> streamEvent = BroadcastProcessor.create();

GrpcServiceClassInfo info = this.grpcServiceClassInfos.get(serviceName);

Object grpcStub = createStub(info.grpcServiceClass, host, port);
ManagedChannel channel = getChannel(host, port);
Object grpcStub = createStub(info.grpcServiceClass, channel);

ServiceDescriptor serviceDescriptor = info.serviceDescriptor;

Expand All @@ -134,20 +139,50 @@ public Multi<String> streamService(String serviceName, String methodName, String
MethodDescriptor.PrototypeMarshaller<?> protoMarshaller = (MethodDescriptor.PrototypeMarshaller<?>) requestMarshaller;
Class<?> requestType = protoMarshaller.getMessagePrototype().getClass();

Message message = createMessage(content, requestType);

if (isRunning) {
// we are already connected with this gRPC endpoint, just send the message
callsInProgress.get(id).onNext(message);
} else {
// Invoke the stub method and format the response as JSON
StreamObserver<?> responseObserver = new TestObserver<>(streamEvent);
StreamObserver<Message> incomingStream;

final Method stubMethod = getStubMethod(grpcStub, methodDescriptor.getBareMethodName());

if (stubMethod.getParameterCount() == 1 && stubMethod.getReturnType() == StreamObserver.class) {
// returned StreamObserver consumes incoming messages
//noinspection unchecked
incomingStream = (StreamObserver<Message>) stubMethod.invoke(grpcStub, responseObserver);
callsInProgress.put(id, incomingStream);
// will be streamed continuously
incomingStream.onNext(message);
} else {
// incoming message should be passed as the first parameter of the invocation
stubMethod.invoke(grpcStub, message, responseObserver);
}
}

channel.shutdown();
return streamEvent;
}

private static Message createMessage(String content, Class<?> requestType)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InvalidProtocolBufferException {
// Create a new builder for the request message, e.g. HelloRequest.newBuilder()
Method newBuilderMethod = requestType.getDeclaredMethod("newBuilder");
Message.Builder builder = (Message.Builder) newBuilderMethod.invoke(null);

// Use the test data to build the request object
JsonFormat.parser().merge(content, builder);
Message message = builder.build();

StreamObserver<?> responseObserver = new TestObserver<Object>(streamEvent);

final Method stubMethod = getStubMethod(grpcStub, methodDescriptor.getBareMethodName());
stubMethod.invoke(grpcStub, message, responseObserver);
return builder.build();
}

return streamEvent;
public Uni<Void> disconnectService(String id) {
callsInProgress.get(id).onCompleted();
callsInProgress.remove(id);
return Uni.createFrom().voidItem();
}

private Map<String, GrpcJsonRPCService.GrpcServiceClassInfo> getGrpcServiceClassInfos() {
Expand Down Expand Up @@ -220,17 +255,17 @@ private MethodDescriptor getMethodDescriptor(ServiceDescriptor serviceDescriptor
return null;
}

private Object createStub(Class<?> grpcServiceClass, String host, int port) {
private Object createStub(Class<?> grpcServiceClass, Channel channel) {
try {
Method stubFactoryMethod = grpcServiceClass.getDeclaredMethod("newStub", Channel.class);
return stubFactoryMethod.invoke(null, getChannel(host, port));
return stubFactoryMethod.invoke(null, channel);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
LOG.warnf("Could not create stub for %s - " + e.getMessage(), grpcServiceClass);
return null;
}
}

private Channel getChannel(String host, int port) {
private ManagedChannel getChannel(String host, int port) {
return NettyChannelBuilder
.forAddress(host, port)
.usePlaintext()
Expand Down
Loading