Skip to content

Commit

Permalink
Support set actual content length to inv/res attributes (#12521)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ authored Jun 13, 2023
1 parent e07e5e2 commit 5f39404
Show file tree
Hide file tree
Showing 12 changed files with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,7 @@ public interface Constants {
String OK_HTTP = "ok-http";
String URL_CONNECTION = "url-connection";
String APACHE_HTTP_CLIENT = "apache-http-client";

String CONTENT_LENGTH_KEY = "content-length";

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Codec;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.Decodeable;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.Request;
Expand Down Expand Up @@ -119,6 +120,9 @@ public void encode(Channel channel, OutputStream output, Object message) throws

@Override
public Object decode(Channel channel, InputStream input) throws IOException {
int contentLength = input.available();
getAttributes().put(Constants.CONTENT_LENGTH_KEY, contentLength);

ObjectInput in = CodecSupport.getSerialization(serializationType)
.deserialize(channel.getUrl(), input);
this.put(SERIALIZATION_ID_KEY, serializationType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Codec;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.Decodeable;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.transport.CodecSupport;
Expand Down Expand Up @@ -82,6 +83,9 @@ public Object decode(Channel channel, InputStream input) throws IOException {
log.debug("Decoding in thread -- [" + thread.getName() + "#" + thread.getId() + "]");
}

int contentLength = input.available();
setAttribute(Constants.CONTENT_LENGTH_KEY, contentLength);

// switch TCCL
if (invocation != null && invocation.getServiceModel() != null) {
Thread.currentThread().setContextClassLoader(invocation.getServiceModel().getClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@

import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_CREATE_STREAM_TRIPLE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_PARSE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_SERIALIZE_TRIPLE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_SERIALIZE_TRIPLE;

public abstract class AbstractServerCall implements ServerCall, ServerStream.Listener {

Expand Down Expand Up @@ -212,7 +212,7 @@ public final void onMessage(byte[] message, boolean isReturnTriException) {
.getContextClassLoader();
try {
Object instance = parseSingleMessage(message);
listener.onMessage(instance);
listener.onMessage(instance, message.length);
} catch (Exception e) {
final TriRpcStatus status = TriRpcStatus.UNKNOWN.withDescription("Server error")
.withCause(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void onReturn(Object value) {
}

@Override
public void onMessage(Object message) {
public void onMessage(Object message, int actualContentLength) {
if (message instanceof Object[]) {
message = ((Object[]) message)[0];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ interface Listener {
* Callback when message received.
*
* @param message message received
* @param actualContentLength actual content length from body
*/
void onMessage(Object message);
void onMessage(Object message, int actualContentLength);

/**
* Callback when call is finished.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void setOnStartConsumer(Consumer<ClientCall> onStartConsumer) {
}

@Override
public void onMessage(Object message) {
public void onMessage(Object message, int actualContentLength) {
delegate.onNext(message);
if (call.isAutoRequest()) {
call.request(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ interface Listener {
* Callback when a request message is received.
*
* @param message message received
* @param actualContentLength actual content length from body
*/
void onMessage(Object message);
void onMessage(Object message, int actualContentLength);

/**
* @param status when the call is canceled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void onReturn(Object value) {
}

@Override
public void onMessage(Object message) {
public void onMessage(Object message, int actualContentLength) {
if (message instanceof Object[]) {
message = ((Object[]) message)[0];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.dubbo.rpc.protocol.tri.call;

import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import io.netty.handler.codec.http2.Http2Exception;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
Expand All @@ -34,14 +33,15 @@
import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;

import io.netty.channel.Channel;
import io.netty.handler.codec.http2.Http2Exception;

import java.util.Map;
import java.util.concurrent.Executor;

import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_RESPONSE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_SERIALIZE_TRIPLE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_STREAM_LISTENER;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;

public class TripleClientCall implements ClientCall, ClientStream.Listener {
private static final ErrorTypeAwareLogger LOGGER = LoggerFactory.getErrorTypeAwareLogger(TripleClientCall.class);
Expand Down Expand Up @@ -78,7 +78,7 @@ public void onMessage(byte[] message, boolean isReturnTriException) {
}
try {
final Object unpacked = requestMetadata.packableMethod.parseResponse(message, isReturnTriException);
listener.onMessage(unpacked);
listener.onMessage(unpacked, message.length);
} catch (Throwable t) {
TriRpcStatus status = TriRpcStatus.INTERNAL.withDescription("Deserialize response failed")
.withCause(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dubbo.rpc.protocol.tri.call;

import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.protocol.tri.DeadlineFuture;
Expand All @@ -27,14 +28,16 @@ public class UnaryClientCallListener implements ClientCall.Listener {

private final DeadlineFuture future;
private Object appResponse;
private int actualContentLength;

public UnaryClientCallListener(DeadlineFuture deadlineFuture) {
this.future = deadlineFuture;
}

@Override
public void onMessage(Object message) {
public void onMessage(Object message, int actualContentLength) {
this.appResponse = message;
this.actualContentLength = actualContentLength;
}

@Override
Expand All @@ -50,6 +53,7 @@ public void onClose(TriRpcStatus status, Map<String, Object> trailers, boolean i
} else {
result.setException(status.asException());
}
result.setAttribute(Constants.CONTENT_LENGTH_KEY, actualContentLength);
future.received(status, result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dubbo.rpc.protocol.tri.call;

import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
Expand All @@ -40,12 +41,13 @@ public void onReturn(Object value) {
}

@Override
public void onMessage(Object message) {
public void onMessage(Object message, int actualContentLength) {
if (message instanceof Object[]) {
invocation.setArguments((Object[]) message);
} else {
invocation.setArguments(new Object[]{message});
}
invocation.put(Constants.CONTENT_LENGTH_KEY, actualContentLength);
}

@Override
Expand Down

0 comments on commit 5f39404

Please sign in to comment.