From 37c78d715851cd2af1cb2c6cdbc7a7864d4218d2 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Wed, 2 Sep 2020 12:29:44 -0700 Subject: [PATCH] Fixed netty leak on DELETE operations in GATEWAY mode (#14727) --- .../cosmos/implementation/RxGatewayStoreModel.java | 14 +++----------- .../directconnectivity/ErrorUtils.java | 6 +----- .../directconnectivity/ResponseUtils.java | 11 ++--------- .../implementation/http/ReactorNettyClient.java | 6 +----- 4 files changed, 7 insertions(+), 30 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index c41395e57dbcc..f67f567176484 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -246,17 +246,9 @@ private Mono toDocumentServiceResponse(Mono contentObservable; - - if (request.getOperationType() == OperationType.Delete) { - // for delete we don't expect any body - contentObservable = Mono.just(EMPTY_BYTE_ARRAY); - } else { - // transforms the ByteBufFlux to Flux - contentObservable = httpResponse - .bodyAsByteArray() - .switchIfEmpty(Mono.just(EMPTY_BYTE_ARRAY)); - } + Mono contentObservable = httpResponse + .bodyAsByteArray() + .switchIfEmpty(Mono.just(EMPTY_BYTE_ARRAY)); return contentObservable .map(content -> { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ErrorUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ErrorUtils.java index 493a885e65f82..5c718d67c02b5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ErrorUtils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ErrorUtils.java @@ -17,11 +17,7 @@ public class ErrorUtils { private static final Logger logger = LoggerFactory.getLogger(ErrorUtils.class); static Mono getErrorResponseAsync(HttpResponse responseMessage, HttpRequest request) { - Mono responseAsString = responseMessage.bodyAsString().switchIfEmpty(Mono.just(StringUtils.EMPTY)); - if (request.httpMethod() == HttpMethod.DELETE) { - return Mono.just(StringUtils.EMPTY); - } - return responseAsString; + return responseMessage.bodyAsString().switchIfEmpty(Mono.just(StringUtils.EMPTY)); } static void logGoneException(URI physicalAddress, String activityId) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ResponseUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ResponseUtils.java index 91b98158838fd..645a4f2c97984 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ResponseUtils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ResponseUtils.java @@ -10,20 +10,13 @@ import reactor.core.publisher.Mono; class ResponseUtils { - private static byte[] EMPTY_BYTE_ARRAY = {}; + private static final byte[] EMPTY_BYTE_ARRAY = {}; static Mono toStoreResponse(HttpResponse httpClientResponse, HttpRequest httpRequest) { HttpHeaders httpResponseHeaders = httpClientResponse.headers(); - Mono contentObservable; - - if (httpRequest.httpMethod() == HttpMethod.DELETE) { - // for delete we don't expect any body - contentObservable = Mono.just(EMPTY_BYTE_ARRAY); - } else { - contentObservable = httpClientResponse.bodyAsByteArray().switchIfEmpty(Mono.just(EMPTY_BYTE_ARRAY)); - } + Mono contentObservable = httpClientResponse.bodyAsByteArray().switchIfEmpty(Mono.just(EMPTY_BYTE_ARRAY)); return contentObservable.map(byteArrayContent -> { // transforms to Mono diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java index 54dbf84da1d5c..ba46e804ff679 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java @@ -197,11 +197,7 @@ public HttpHeaders headers() { @Override public Flux body() { return bodyIntern() - .doOnSubscribe(this::updateSubscriptionState) - .map(byteBuf -> { - byteBuf.retain(); - return byteBuf; - }); + .doOnSubscribe(this::updateSubscriptionState); } @Override