Skip to content

Commit

Permalink
Fix 3.0 use triple can not return real exception (#8458)
Browse files Browse the repository at this point in the history
* Fix use triple can't return real exception in java (#8363)

* Fix use triple can't return real exception in java (#8363)

- add common exception structure for multi-language

* Remove exception serialization and ignore when decode/encode failed

* Add exceptionUtils license and remove useless pom config

* Remove unused constant

* Serialize exception only in wrapper mode

* Format code

* Ignore google rpc classes

Co-authored-by: guohao <guohaoice@gmail.com>
  • Loading branch information
EarthChen and guohao authored Aug 11, 2021
1 parent b64ea1d commit 6808f8f
Show file tree
Hide file tree
Showing 18 changed files with 743 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ public void execute(Runnable runnable) {
} catch (RejectedExecutionException e) {
LOGGER.error("Consumer's thread pool is full", e);
getStreamSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.RESOURCE_EXHAUSTED)
.withDescription("Consumer's thread pool is full").asException());
.withDescription("Consumer's thread pool is full").asException());
} catch (Throwable t) {
LOGGER.error("Consumer submit request to thread pool error ", t);
getStreamSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withCause(t)
.withDescription("Consumer's error")
.asException());
.withCause(t)
.withDescription("Consumer's error")
.asException());
}
}

Expand Down Expand Up @@ -128,11 +128,11 @@ protected Object deserializeResponse(byte[] data) {
}
if (getMethodDescriptor().isNeedWrap()) {
final TripleWrapper.TripleResponseWrapper wrapper = TripleUtil.unpack(data,
TripleWrapper.TripleResponseWrapper.class);
TripleWrapper.TripleResponseWrapper.class);
if (!getSerializeType().equals(TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType()))) {
throw new UnsupportedOperationException("Received inconsistent serialization type from server, " +
"reject to deserialize! Expected:" + getSerializeType() +
" Actual:" + TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType()));
"reject to deserialize! Expected:" + getSerializeType() +
" Actual:" + TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType()));
}
return TripleUtil.unwrapResp(getUrl(), wrapper, getMultipleSerialization());
} else {
Expand All @@ -146,17 +146,17 @@ protected Object deserializeResponse(byte[] data) {
protected Metadata createRequestMeta(RpcInvocation inv) {
Metadata metadata = new DefaultMetadata();
metadata.put(TripleConstant.PATH_KEY, "/" + inv.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + inv.getMethodName())
.put(TripleConstant.AUTHORITY_KEY, getUrl().getAddress())
.put(TripleConstant.CONTENT_TYPE_KEY, TripleConstant.CONTENT_PROTO)
.put(TripleConstant.TIMEOUT, inv.get(CommonConstants.TIMEOUT_KEY) + "m")
.put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);
.put(TripleConstant.AUTHORITY_KEY, getUrl().getAddress())
.put(TripleConstant.CONTENT_TYPE_KEY, TripleConstant.CONTENT_PROTO)
.put(TripleConstant.TIMEOUT, inv.get(CommonConstants.TIMEOUT_KEY) + "m")
.put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);

metadata.putIfNotNull(TripleConstant.SERVICE_VERSION, inv.getInvoker().getUrl().getVersion())
.putIfNotNull(TripleConstant.CONSUMER_APP_NAME_KEY,
(String) inv.getObjectAttachments().remove(CommonConstants.APPLICATION_KEY))
.putIfNotNull(TripleConstant.CONSUMER_APP_NAME_KEY,
(String) inv.getObjectAttachments().remove(CommonConstants.REMOTE_APPLICATION_KEY))
.putIfNotNull(TripleConstant.SERVICE_GROUP, inv.getInvoker().getUrl().getGroup());
.putIfNotNull(TripleConstant.CONSUMER_APP_NAME_KEY,
(String) inv.getObjectAttachments().remove(CommonConstants.APPLICATION_KEY))
.putIfNotNull(TripleConstant.CONSUMER_APP_NAME_KEY,
(String) inv.getObjectAttachments().remove(CommonConstants.REMOTE_APPLICATION_KEY))
.putIfNotNull(TripleConstant.SERVICE_GROUP, inv.getInvoker().getUrl().getGroup());
inv.getObjectAttachments().remove(CommonConstants.GROUP_KEY);
inv.getObjectAttachments().remove(CommonConstants.INTERFACE_KEY);
inv.getObjectAttachments().remove(CommonConstants.PATH_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ protected Object[] deserializeRequest(byte[] data) {
TripleWrapper.TripleRequestWrapper.class);
if (!getSerializeType().equals(TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType()))) {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INVALID_ARGUMENT)
.withDescription("Received inconsistent serialization type from client, " +
"reject to deserialize! Expected:" + getSerializeType() +
" Actual:" + TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())));
.withDescription("Received inconsistent serialization type from client, " +
"reject to deserialize! Expected:" + getSerializeType() +
" Actual:" + TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())));
return null;
}
if (getMethodDescriptor() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.Constants;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
import org.apache.dubbo.triple.TripleWrapper;

import com.google.protobuf.Any;
import com.google.rpc.DebugInfo;
import com.google.rpc.Status;
import io.netty.handler.codec.http2.Http2Headers;

import java.io.IOException;
Expand All @@ -54,7 +59,8 @@ public abstract class AbstractStream implements Stream {
static {
ThreadFactory tripleTF = new NamedInternalThreadFactory("tri-callbcak", true);
for (int i = 0; i < 4; i++) {
final ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 1, 0, TimeUnit.DAYS, new LinkedBlockingQueue<>(1024),
final ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 1, 0, TimeUnit.DAYS,
new LinkedBlockingQueue<>(1024),
tripleTF, new ThreadPoolExecutor.AbortPolicy());
CALLBACK_EXECUTORS.add(tp);
}
Expand Down Expand Up @@ -189,26 +195,81 @@ public TransportObserver asTransportObserver() {
}

protected void transportError(GrpcStatus status) {
Metadata metadata = new DefaultMetadata();
metadata.put(TripleConstant.STATUS_KEY, Integer.toString(status.code.code));
metadata.put(TripleConstant.MESSAGE_KEY, status.toMessage());
getTransportSubscriber().tryOnMetadata(metadata, true);
// set metadata
Metadata metadata = getMetaData(status);
getTransportSubscriber().tryOnMetadata(metadata, false);
// set trailers
Metadata trailers = getTrailers(status);
getTransportSubscriber().tryOnMetadata(trailers, true);
if (LOGGER.isErrorEnabled()) {
LOGGER.error("[Triple-Server-Error] " + status.toMessage());
}
}

protected void transportError(Throwable throwable) {
Metadata metadata = new DefaultMetadata();
metadata.put(TripleConstant.STATUS_KEY, Integer.toString(Code.UNKNOWN.code));
metadata.put(TripleConstant.MESSAGE_KEY, throwable.getMessage());
getTransportSubscriber().tryOnMetadata(metadata, true);
GrpcStatus status = new GrpcStatus(Code.UNKNOWN, throwable, throwable.getMessage());
Metadata metadata = getMetaData(status);
getTransportSubscriber().tryOnMetadata(metadata, false);
Metadata trailers = getTrailers(status);
getTransportSubscriber().tryOnMetadata(trailers, true);
if (LOGGER.isErrorEnabled()) {
LOGGER.error("[Triple-Server-Error] service=" + getServiceDescriptor().getServiceName()
+ " method=" + getMethodName(), throwable);
}
}

private Metadata getMetaData(GrpcStatus status) {
Metadata metadata = new DefaultMetadata();
metadata.put(TripleConstant.MESSAGE_KEY, getGrpcMessage(status));
metadata.put(TripleConstant.STATUS_KEY, String.valueOf(status.code.code));
return metadata;
}

private String getGrpcMessage(GrpcStatus status) {
if (StringUtils.isNotEmpty(status.description)) {
return status.description;
}
if (status.cause != null) {
return status.cause.getMessage();
}
return "unknown";
}

private Metadata getTrailers(GrpcStatus grpcStatus) {

Metadata metadata = new DefaultMetadata();
Status.Builder builder = Status.newBuilder()
.setCode(grpcStatus.code.code)
.setMessage(getGrpcMessage(grpcStatus));
Throwable throwable = grpcStatus.cause;
if (throwable == null) {
return metadata;
}
DebugInfo debugInfo = DebugInfo.newBuilder()
.addAllStackEntries(ExceptionUtils.getStackFrameList(throwable))
// can not use now
// .setDetail(throwable.getMessage())
.build();
builder.addDetails(Any.pack(debugInfo));
Status status = builder.build();
metadata.put(TripleConstant.STATUS_DETAIL_KEY, TripleUtil.encodeBase64ASCII(status.toByteArray()));
// only wrapper mode support exception serialization
if (getMethodDescriptor() != null && !getMethodDescriptor().isNeedWrap()) {
return metadata;
}
try {
TripleWrapper.TripleExceptionWrapper exceptionWrapper = TripleUtil.wrapException(getUrl(), throwable, getSerializeType(), getMultipleSerialization());
String exceptionStr = TripleUtil.encodeBase64ASCII(exceptionWrapper.toByteArray());
if (exceptionStr.length() <= TripleConstant.DEFAULT_HEADER_LIST_SIZE) {
metadata.put(TripleConstant.EXCEPTION_TW_BIN, exceptionStr);
}
} catch (Throwable t) {
LOGGER.warn("Encode triple exception to trailers failed", t);
}

return metadata;
}

protected Map<String, Object> parseMetadataToMap(Metadata metadata) {
Map<String, Object> attachments = new LinkedHashMap<>();
for (Map.Entry<CharSequence, CharSequence> header : metadata) {
Expand Down Expand Up @@ -323,14 +384,7 @@ public byte[] getData() {

@Override
public void onComplete(OperationHandler handler) {
Metadata metadata;
if (getTrailers() == null) {
metadata = getHeaders();
} else {
metadata = getTrailers();
}

final GrpcStatus status = extractStatusFromMeta(metadata);
final GrpcStatus status = extractStatusFromMeta(getHeaders());
if (GrpcStatus.Code.isOk(status.code.code)) {
doOnComplete(handler);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,12 @@ public void onData(byte[] data, boolean endStream, OperationHandler handler) {
@Override
public void onComplete(OperationHandler handler) {
execute(() -> {
Metadata metadata;
if (getTrailers() == null) {
metadata = getHeaders();
} else {
metadata = getTrailers();
}
final GrpcStatus status = extractStatusFromMeta(metadata);
final GrpcStatus status = extractStatusFromMeta(getHeaders());

if (GrpcStatus.Code.isOk(status.code.code)) {
getStreamSubscriber().onCompleted();
} else {
getStreamSubscriber().onError(status.asException());
getStreamSubscriber().onError(status.asException(getTrailers()));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,15 @@ public Spliterator<Map.Entry<CharSequence, CharSequence>> spliterator() {
return innerMap.entrySet().spliterator();
}

@Override
public boolean contains(CharSequence key) {
return innerMap.containsKey(key);
}

@Override
public boolean remove(CharSequence key) {
innerMap.remove(key);
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.rpc.protocol.tri;

import org.apache.dubbo.common.utils.CollectionUtils;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

public class ExceptionUtils {

private static final int NOT_FOUND = -1;

public static String getStackTrace(final Throwable throwable) {
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw, true);
throwable.printStackTrace(pw);
return sw.getBuffer().toString();
}

public static String getStackFrameString(List<String> stackFrameList) {
if (CollectionUtils.isEmpty(stackFrameList)) {
return "";
}
StringBuilder stringBuilder = new StringBuilder();
for (String s : stackFrameList) {
stringBuilder.append(s).append("\n");
}
return stringBuilder.toString();
}

public static String[] getStackFrames(final Throwable throwable) {
if (throwable == null) {
return new String[0];
}
return getStackFrames(getStackTrace(throwable));
}

static String[] getStackFrames(final String stackTrace) {
final String linebreak = System.lineSeparator();
final StringTokenizer frames = new StringTokenizer(stackTrace, linebreak);
final List<String> list = new ArrayList<>();
while (frames.hasMoreTokens()) {
list.add(frames.nextToken());
}
return list.toArray(new String[0]);
}

public static List<String> getStackFrameList(final Throwable t) {
final String stackTrace = getStackTrace(t);
final String linebreak = System.lineSeparator();
final StringTokenizer frames = new StringTokenizer(stackTrace, linebreak);
final List<String> list = new ArrayList<>();
while (frames.hasMoreTokens()) {
list.add(frames.nextToken());
}
return list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.QueryStringEncoder;

import static io.netty.util.internal.ObjectUtil.checkNotNull;

/**
* See https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
*/
Expand Down Expand Up @@ -95,6 +97,17 @@ public static String fromMessage(String raw) {
return QueryStringDecoder.decodeComponent(raw);
}

public static Metadata trailersFromThrowable(Throwable t) {
Throwable cause = checkNotNull(t, "t");
while (cause != null) {
if (cause instanceof TripleRpcException) {
return ((TripleRpcException) cause).getTrailers();
}
cause = cause.getCause();
}
return null;
}

public GrpcStatus withCause(Throwable cause) {
return new GrpcStatus(this.code, cause, this.description);
}
Expand All @@ -107,6 +120,10 @@ public TripleRpcException asException() {
return new TripleRpcException(this);
}

public TripleRpcException asException(Metadata trailers) {
return new TripleRpcException(this, trailers);
}

public String toMessage() {
final String msg;
if (cause == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public boolean contains(CharSequence key) {
return headers.contains(key);
}

@Override
public boolean remove(CharSequence key) {
return headers.remove(key);
}

@Override
public Iterator<Map.Entry<CharSequence, CharSequence>> iterator() {
return headers.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ default Metadata putIfNotNull(CharSequence key, CharSequence value) {

boolean contains(CharSequence key);

boolean remove(CharSequence key);

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private void writeRequest(ChannelHandlerContext ctx, final Request req, final Ch
MethodDescriptor methodDescriptor = repo.lookupMethod(inv.getServiceName(), inv.getMethodName());
String serviceKey = url.getServiceKey();
// If it is InstanceAddressURL, the serviceKey may not be obtained.
if(null == serviceKey) {
if (null == serviceKey) {
serviceKey = inv.getTargetServiceUniqueName();
}
final ConsumerModel service = repo.lookupReferredService(serviceKey);
Expand Down
Loading

0 comments on commit 6808f8f

Please sign in to comment.