Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.0] Add gen MethodDescriptor valid, avoid strange situations #8651

Merged
merged 5 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions dubbo-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@
<artifactId>protobuf-java</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.21</version>
<scope>test</scope>
</dependency>


</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;
Expand All @@ -38,6 +39,13 @@
*
*/
public class MethodDescriptor {

private static final String GRPC_ASYNC_RETURN_CLASS = "com.google.common.util.concurrent.ListenableFuture";
private static final String TRI_ASYNC_RETURN_CLASS = "java.util.concurrent.CompletableFuture";
private static final String REACTOR_RETURN_CLASS = "reactor.core.publisher.Mono";
private static final String RX_RETURN_CLASS = "io.reactivex.Single";
private static final String GRPC_STREAM_CLASS = "io.grpc.stub.StreamObserver";

private static final Logger logger = LoggerFactory.getLogger(MethodDescriptor.class);
private final Method method;
// private final boolean isCallBack;
Expand All @@ -58,18 +66,22 @@ public MethodDescriptor(Method method) {
this.method = method;
this.methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// bidirectional-stream: StreamObserver<Request> foo(StreamObserver<Response>)
if (parameterTypes.length == 1 && isStreamType(parameterTypes[0])) {
this.parameterClasses = new Class<?>[]{
(Class<?>) ((ParameterizedType) method.getGenericReturnType()).getActualTypeArguments()[0]};
this.returnClass = (Class<?>) ((ParameterizedType) method.getGenericParameterTypes()[0])
.getActualTypeArguments()[0];
this.rpcType = RpcType.BIDIRECTIONAL_STREAM;
// server-stream: void foo(Request, StreamObserver<Response>)
} else if (parameterTypes.length == 2 && method.getReturnType().equals(Void.TYPE)
&& !isStreamType(parameterTypes[0]) && isStreamType(parameterTypes[1])) {
this.parameterClasses = method.getParameterTypes();
this.returnClass = (Class<?>) ((ParameterizedType)method.getGenericParameterTypes()[1]).getActualTypeArguments()[0];
this.returnClass =
(Class<?>) ((ParameterizedType) method.getGenericParameterTypes()[1]).getActualTypeArguments()[0];
this.rpcType = RpcType.SERVER_STREAM;
} else {
// unary: Response foo(Request)
this.parameterClasses = method.getParameterTypes();
this.returnClass = method.getReturnType();
this.rpcType = RpcType.UNARY;
Expand All @@ -92,7 +104,7 @@ public MethodDescriptor(Method method) {
}

private static boolean isStreamType(Class<?> clz) {
return StreamObserver.class.isAssignableFrom(clz);
return StreamObserver.class.isAssignableFrom(clz) || GRPC_STREAM_CLASS.equalsIgnoreCase(clz.getName());
}

public boolean isStream() {
Expand All @@ -111,30 +123,156 @@ public RpcType getRpcType() {
return rpcType;
}

/**
* Determine if the request and response instance should be wrapped in Protobuf wrapper object
*
* @return true if the request and response object is not generated by protobuf
*/
private boolean needWrap() {
// generic call must be wrapped
if (CommonConstants.$INVOKE.equals(methodName) || CommonConstants.$INVOKE_ASYNC.equals(methodName)) {
return true;
} else if ($ECHO.equals(methodName)) {
}
// echo must be wrapped
if ($ECHO.equals(methodName)) {
return true;
}
boolean returnClassProtobuf = isProtobufClass(returnClass);
// Response foo()
if (parameterClasses.length == 0) {
return !returnClassProtobuf;
}
int protobufParameterCount = 0;
int javaParameterCount = 0;
int streamParameterCount = 0;
boolean secondParameterStream = false;
// count normal and protobuf param
for (int i = 0; i < parameterClasses.length; i++) {
Class<?> parameterClass = parameterClasses[i];
if (isProtobufClass(parameterClass)) {
protobufParameterCount++;
} else {
if (isStreamType(parameterClass)) {
if (i == 1) {
secondParameterStream = true;
}
streamParameterCount++;
} else {
javaParameterCount++;
}
}
}
// more than one stream param
if (streamParameterCount > 1) {
throw new IllegalStateException("method params error: more than one Stream params. method=" + methodName);
}
// protobuf only support one param
if (protobufParameterCount >= 2) {
throw new IllegalStateException("method params error: more than one protobuf params. method=" + methodName);
}
// server stream support one normal param and one stream param
if (streamParameterCount == 1) {
if (javaParameterCount + protobufParameterCount > 1) {
throw new IllegalStateException("method params error: server stream does not support more than one normal param." +
" method=" + methodName);
}
// server stream: void foo(Request, StreamObserver<Response>)
if (!secondParameterStream) {
throw new IllegalStateException("method params error: server stream's second param must be StreamObserver." +
" method=" + methodName);
}
}
if (isStream()) {
if (RpcType.SERVER_STREAM == rpcType) {
if (!secondParameterStream) {
throw new IllegalStateException("method params error:server stream's second param must be StreamObserver." +
" method=" + methodName);
}
}
// param type must be consistent
if (returnClassProtobuf) {
if (javaParameterCount > 0) {
throw new IllegalStateException("method params error: both normal and protobuf param found. method=" + methodName);
}
} else {
if (protobufParameterCount > 0) {
throw new IllegalStateException("method params error method=" + methodName);
}
}
} else {
if ((rpcType != RpcType.SERVER_STREAM && parameterClasses.length != 1) || parameterClasses[0] == null) {
if (streamParameterCount > 0) {
throw new IllegalStateException("method params error: unary method should not contain any StreamObserver." +
" method=" + methodName);
}
if (protobufParameterCount > 0 && returnClassProtobuf) {
return false;
}
// handler reactor or rxjava only consider gen by proto
if (isMono(returnClass) || isRx(returnClass)) {
return false;
}
if (protobufParameterCount <= 0 && !returnClassProtobuf) {
return true;
}
// handle grpc stub only consider gen by proto
if (GRPC_ASYNC_RETURN_CLASS.equalsIgnoreCase(returnClass.getName()) && protobufParameterCount == 1) {
return false;
}
// handle dubbo generated method
if (TRI_ASYNC_RETURN_CLASS.equalsIgnoreCase(returnClass.getName())) {
Class<?> actualReturnClass =
(Class<?>) ((ParameterizedType) method.getGenericReturnType()).getActualTypeArguments()[0];
boolean actualReturnClassProtobuf = isProtobufClass(actualReturnClass);
if (actualReturnClassProtobuf && protobufParameterCount == 1) {
return false;
}
if (!actualReturnClassProtobuf && protobufParameterCount == 0) {
return true;
}
}
// todo remove this in future
boolean ignore = checkNeedIgnore();
if (ignore) {
return protobufParameterCount != 1;
}
throw new IllegalStateException("method params error method=" + methodName);
}
// java param should be wrapped
return javaParameterCount > 0;
}

Class<?> clazz = parameterClasses[0];
while (clazz != Object.class && clazz != null) {
Class<?>[] interfaces = clazz.getInterfaces();
if (interfaces.length > 0) {
for (Class<?> clazzInterface : interfaces) {
if (PROTOBUF_MESSAGE_CLASS_NAME.equalsIgnoreCase(clazzInterface.getName())) {
return false;
}
/**
* fixme will produce error on grpc. but is harmless so ignore now
*/
private boolean checkNeedIgnore() {
if (Iterator.class.isAssignableFrom(returnClass)) {
return true;
}
return false;
}

private boolean isMono(Class<?> clz) {
return REACTOR_RETURN_CLASS.equalsIgnoreCase(clz.getName());
}

private boolean isRx(Class<?> clz) {
return RX_RETURN_CLASS.equalsIgnoreCase(clz.getName());
}


public boolean isProtobufClass(Class<?> clazz) {
while (clazz != Object.class && clazz != null) {
Class<?>[] interfaces = clazz.getInterfaces();
if (interfaces.length > 0) {
for (Class<?> clazzInterface : interfaces) {
if (PROTOBUF_MESSAGE_CLASS_NAME.equalsIgnoreCase(clazzInterface.getName())) {
return true;
}
}
clazz = clazz.getSuperclass();
}
return true;
clazz = clazz.getSuperclass();
}
return false;
}

public boolean matchParams(String params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,37 @@ public interface DescriptorService {

void noParameterMethod();

/**
* unray return protobuf class
*
* @return protobuf class
*/
HelloReply noParameterAndReturnProtobufMethod();

/**
* unray return java class
*
* @return
*/
String noParameterAndReturnJavaClassMethod();


/**
* bi stream need wrapper
*
* @param streamObserver
* @return
*/
StreamObserver<String> wrapBidirectionalStream(StreamObserver<String> streamObserver);

/**
* no need wrapper bi stream
*
* @param streamObserver
* @return
*/
StreamObserver<HelloReply> bidirectionalStream(StreamObserver<HelloReply> streamObserver);

/**
* only for test.
*
Expand All @@ -34,4 +65,38 @@ public interface DescriptorService {
void sayHelloServerStream(HelloReply request, StreamObserver<HelloReply> reply);

void sayHelloServerStream2(Object request, StreamObserver<Object> reply);

/***********************grpc******************************/

java.util.Iterator<HelloReply> iteratorServerStream(HelloReply request);

reactor.core.publisher.Mono<HelloReply> reactorMethod(HelloReply reactorRequest);

reactor.core.publisher.Mono<HelloReply> reactorMethod2(reactor.core.publisher.Mono<HelloReply> reactorRequest);

io.reactivex.Single<HelloReply> rxJavaMethod(io.reactivex.Single<HelloReply> replySingle);

/**********************test error*****************/
void testMultiProtobufParameters(HelloReply reply1, HelloReply reply2);

String testDiffParametersAndReturn(HelloReply reply1);

HelloReply testDiffParametersAndReturn2(String reply1);

void testErrorServerStream(StreamObserver<HelloReply> reply, HelloReply request);

void testErrorServerStream2(HelloReply request, HelloReply request2, StreamObserver<HelloReply> reply);

void testErrorServerStream3(String aa, StreamObserver<HelloReply> reply);

void testErrorServerStream4(String aa, String bb, StreamObserver<String> reply);

StreamObserver<HelloReply> testErrorBiStream(HelloReply reply, StreamObserver<HelloReply> observer);

StreamObserver<HelloReply> testErrorBiStream2(String reply, StreamObserver<HelloReply> observer);

StreamObserver<String> testErrorBiStream3(StreamObserver<HelloReply> observer);
StreamObserver<String> testErrorBiStream4(StreamObserver<HelloReply> observer,String str);


}
Loading