diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java index 515afb2d503..9883a8072ec 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java @@ -162,8 +162,13 @@ protected final HttpOutputMessage buildMessage(Object data) throws Throwable { data = ((HttpResult) data).getBody(); } HttpOutputMessage outputMessage = encodeHttpOutputMessage(data); - preOutputMessage(outputMessage); - responseEncoder.encode(outputMessage.getBody(), data); + try { + preOutputMessage(outputMessage); + responseEncoder.encode(outputMessage.getBody(), data); + } catch (Throwable t) { + outputMessage.close(); + throw t; + } return outputMessage; } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpInputMessage.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpInputMessage.java index 60d6a4dec77..717b03665a0 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpInputMessage.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpInputMessage.java @@ -16,9 +16,15 @@ */ package org.apache.dubbo.remoting.http12; +import java.io.IOException; import java.io.InputStream; -public interface HttpInputMessage { +public interface HttpInputMessage extends AutoCloseable { InputStream getBody(); + + @Override + default void close() throws IOException { + getBody().close(); + } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpOutputMessage.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpOutputMessage.java index e56630430c5..ac64685bd0a 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpOutputMessage.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpOutputMessage.java @@ -17,9 +17,10 @@ package org.apache.dubbo.remoting.http12; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.OutputStream; -public interface HttpOutputMessage { +public interface HttpOutputMessage extends AutoCloseable { HttpOutputMessage EMPTY_MESSAGE = new HttpOutputMessage() { @@ -32,4 +33,9 @@ public OutputStream getBody() { }; OutputStream getBody(); + + @Override + default void close() throws IOException { + getBody().close(); + } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/DefaultHttp1Request.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/DefaultHttp1Request.java index 1bc610d84bf..d950f8a0407 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/DefaultHttp1Request.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/DefaultHttp1Request.java @@ -20,6 +20,7 @@ import org.apache.dubbo.remoting.http12.HttpInputMessage; import org.apache.dubbo.remoting.http12.RequestMetadata; +import java.io.IOException; import java.io.InputStream; public class DefaultHttp1Request implements Http1Request { @@ -52,4 +53,9 @@ public String method() { public String path() { return httpMetadata.path(); } + + @Override + public void close() throws IOException { + httpInputMessage.close(); + } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/DefaultHttp1Response.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/DefaultHttp1Response.java index 4f345b6cbe6..75bdb4210c3 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/DefaultHttp1Response.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/DefaultHttp1Response.java @@ -20,6 +20,7 @@ import org.apache.dubbo.remoting.http12.HttpInputMessage; import org.apache.dubbo.remoting.http12.HttpMetadata; +import java.io.IOException; import java.io.InputStream; public class DefaultHttp1Response implements HttpMetadata, HttpInputMessage { @@ -42,4 +43,9 @@ public InputStream getBody() { public HttpHeaders headers() { return httpMetadata.headers(); } + + @Override + public void close() throws IOException { + httpInputMessage.close(); + } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1OutputMessage.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1OutputMessage.java index 2406b8249cf..dd8844162b0 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1OutputMessage.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1OutputMessage.java @@ -18,8 +18,11 @@ import org.apache.dubbo.remoting.http12.HttpOutputMessage; +import java.io.IOException; import java.io.OutputStream; +import io.netty.buffer.ByteBufOutputStream; + public class Http1OutputMessage implements HttpOutputMessage { private final OutputStream outputStream; @@ -32,4 +35,12 @@ public Http1OutputMessage(OutputStream outputStream) { public OutputStream getBody() { return outputStream; } + + @Override + public void close() throws IOException { + if (outputStream instanceof ByteBufOutputStream) { + ((ByteBufOutputStream) outputStream).buffer().release(); + } + outputStream.close(); + } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2OutputMessageFrame.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2OutputMessageFrame.java index 15c87eb4139..d3ba8ce66da 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2OutputMessageFrame.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2OutputMessageFrame.java @@ -16,8 +16,11 @@ */ package org.apache.dubbo.remoting.http12.h2; +import java.io.IOException; import java.io.OutputStream; +import io.netty.buffer.ByteBufOutputStream; + public class Http2OutputMessageFrame implements Http2OutputMessage { private final OutputStream body; @@ -42,6 +45,14 @@ public OutputStream getBody() { return body; } + @Override + public void close() throws IOException { + if (body instanceof ByteBufOutputStream) { + ((ByteBufOutputStream) body).buffer().release(); + } + body.close(); + } + @Override public boolean isEndStream() { return endStream; diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java index 5d74b831b04..970aa88fa7b 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java @@ -126,7 +126,9 @@ public void onData(MESSAGE message) { doOnData(message); } catch (Throwable t) { logError(t); - onError(t); + onError(message, t); + } finally { + onFinally(message); } }); } @@ -184,6 +186,18 @@ protected void onError(Throwable throwable) { throw new HttpStatusException(HttpStatus.INTERNAL_SERVER_ERROR.getCode(), throwable); } + protected void onError(MESSAGE message, Throwable throwable) { + onError(throwable); + } + + protected void onFinally(MESSAGE message) { + try { + message.close(); + } catch (Exception e) { + onError(e); + } + } + protected RpcInvocation buildRpcInvocation(RpcInvocationBuildContext context) { MethodDescriptor methodDescriptor = context.getMethodDescriptor(); if (methodDescriptor == null) { diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java index 9ae78813025..0399c643342 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java @@ -26,6 +26,7 @@ import org.apache.dubbo.remoting.http12.exception.UnimplementedException; import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; import org.apache.dubbo.remoting.http12.h2.Http2Header; +import org.apache.dubbo.remoting.http12.h2.Http2InputMessage; import org.apache.dubbo.remoting.http12.h2.Http2TransportListener; import org.apache.dubbo.remoting.http12.message.MethodMetadata; import org.apache.dubbo.remoting.http12.message.StreamingDecoder; @@ -122,6 +123,19 @@ protected RpcInvocation onBuildRpcInvocationCompletion(RpcInvocation invocation) return invocation; } + @Override + protected void onError(Http2InputMessage message, Throwable throwable) { + try { + message.close(); + } catch (Exception e) { + throwable.addSuppressed(e); + } + onError(throwable); + } + + @Override + protected void onFinally(Http2InputMessage message) {} + @Override protected GrpcStreamingDecoder getStreamingDecoder() { return (GrpcStreamingDecoder) super.getStreamingDecoder(); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java index 326e09ede3e..4fee48dfb43 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java @@ -86,6 +86,7 @@ protected Executor initializeExecutor(Http2Header metadata) { return new SerializingExecutor(executorSupport.getExecutor(metadata)); } + @Override protected void doOnMetadata(Http2Header metadata) { if (metadata.isEndStream()) { if (!HttpMethods.supportBody(metadata.method())) { @@ -164,7 +165,7 @@ protected void onMetadataCompletion(Http2Header metadata) { @Override protected void onDataCompletion(Http2InputMessage message) { if (message.isEndStream()) { - serverCallListener.onComplete(); + getStreamingDecoder().close(); } }