Skip to content

Commit

Permalink
Reactor-Netty HttpClient changes (#5185)
Browse files Browse the repository at this point in the history
* Reactor Netty Milestone release changes

* Updated Milestone version to RC version

* Code review comments:
1. Reverted the RC1 back to M3 version.
2. Added constants for literal values in Configs.java
3. Removed inputStream implementation from HttpResponse.java as its not required

* Setting default connections to 1000, and fixed Exception handling by unwrapping exception

* Unwrapping exception

* Fixed validate failure by unwrapping exception

* Fixed multi-master conflict resolution test

* Unwrapping exceptions wherever possible to make sure we check on inner exception cause

* Fixed compilation errors

* Fixed doOnError in Store Client and Consistency Writer to handle unwrapped exceptions

* Handling empty response from backend

* Updating number of documents and collection throughput size

* Updated Feed Response Validator for query metrics

* Fixed query documents with aggregate test

* Updated retrieved documents count

* Updated getLogicalPlanBuildTime for query metrics validation

* Unused imports
  • Loading branch information
kushagraThapar authored and mitchdenny committed Oct 15, 2019
1 parent 4bbc226 commit 205b57e
Show file tree
Hide file tree
Showing 27 changed files with 241 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Configuration {
private int documentDataFieldSize = 20;

@Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size")
private Integer maxConnectionPoolSize = 1000;
private Integer maxConnectionPoolSize = 500;

@Parameter(names = "-consistencyLevel", description = "Consistency Level", converter = ConsistencyLevelConverter.class)
private ConsistencyLevel consistencyLevel = ConsistencyLevel.SESSION;
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/microsoft-azure-cosmos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Licensed under the MIT License.
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>${reactor-netty.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.data.cosmos.internal.Permission;
import com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdMetrics;
import io.micrometer.core.instrument.MeterRegistry;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -167,14 +168,15 @@ public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(String id) {
}

private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosDatabase database){
return database.read().onErrorResume(exception -> {
if (exception instanceof CosmosClientException) {
CosmosClientException cosmosClientException = (CosmosClientException) exception;
return database.read().onErrorResume(t -> {
Throwable throwable = Exceptions.unwrap(t);
if (throwable instanceof CosmosClientException) {
CosmosClientException cosmosClientException = (CosmosClientException) throwable;
if (cosmosClientException.statusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return createDatabase(new CosmosDatabaseProperties(database.id()), new CosmosDatabaseRequestOptions());
}
}
return Mono.error(exception);
return Mono.error(throwable);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.data.cosmos.internal.Offer;
import com.azure.data.cosmos.internal.Paths;
import org.apache.commons.lang3.StringUtils;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -308,13 +309,14 @@ public Mono<CosmosContainerResponse> createContainerIfNotExists(String id, Strin
private Mono<CosmosContainerResponse> createContainerIfNotExistsInternal(
CosmosContainerProperties containerProperties, CosmosContainer container, CosmosContainerRequestOptions options) {
return container.read(options).onErrorResume(exception -> {
if (exception instanceof CosmosClientException) {
CosmosClientException cosmosClientException = (CosmosClientException) exception;
Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosClientException) {
CosmosClientException cosmosClientException = (CosmosClientException) unwrappedException;
if (cosmosClientException.statusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return createContainer(containerProperties, options);
}
}
return Mono.error(exception);
return Mono.error(unwrappedException);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public class Configs {
private static final int CPU_CNT = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_DIRECT_HTTPS_POOL_SIZE = CPU_CNT * 500;

// Reactor Netty Constants
private static final int MAX_IDLE_CONNECTION_TIMEOUT_IN_MILLIS = 60 * 1000;
private static final int CONNECTION_ACQUIRE_TIMEOUT_IN_MILLIS = 45 * 1000;
private static final int REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE = 500;
private static final String REACTOR_NETTY_CONNECTION_POOL_NAME = "reactor-netty-connection-pool";

public Configs() {
Expand Down Expand Up @@ -147,6 +151,18 @@ public String getReactorNettyConnectionPoolName() {
return REACTOR_NETTY_CONNECTION_POOL_NAME;
}

public int getMaxIdleConnectionTimeoutInMillis() {
return MAX_IDLE_CONNECTION_TIMEOUT_IN_MILLIS;
}

public int getConnectionAcquireTimeoutInMillis() {
return CONNECTION_ACQUIRE_TIMEOUT_IN_MILLIS;
}

public int getReactorNettyMaxConnectionPoolSize() {
return REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE;
}

private static String getJVMConfigAsString(String propName, String defaultValue) {
String propValue = System.getProperty(propName);
return StringUtils.defaultString(propValue, defaultValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -43,7 +42,6 @@
*/
class RxGatewayStoreModel implements RxStoreModel {

private final static int INITIAL_RESPONSE_BUFFER_SIZE = 1024;
private final Logger logger = LoggerFactory.getLogger(RxGatewayStoreModel.class);
private final Map<String, String> defaultHeaders;
private final HttpClient httpClient;
Expand Down Expand Up @@ -229,22 +227,6 @@ private String ensureSlashPrefixed(String path) {
return "/" + path;
}

private Mono<String> toString(Flux<ByteBuf> contentObservable) {
return contentObservable
.reduce(
new ByteArrayOutputStream(INITIAL_RESPONSE_BUFFER_SIZE),
(out, bb) -> {
try {
bb.readBytes(out, bb.readableBytes());
return out;
}
catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(out -> new String(out.toByteArray(), StandardCharsets.UTF_8));
}

/**
* Transforms the reactor netty's client response Observable to RxDocumentServiceResponse Observable.
*
Expand All @@ -259,102 +241,63 @@ private Mono<String> toString(Flux<ByteBuf> contentObservable) {
private Flux<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpResponse> httpResponseMono,
RxDocumentServiceRequest request) {

if (request.getIsMedia()) {
return httpResponseMono.flatMap(httpResponse -> {
return httpResponseMono.flatMap(httpResponse -> {

// header key/value pairs
HttpHeaders httpResponseHeaders = httpResponse.headers();
int httpResponseStatus = httpResponse.statusCode();
// header key/value pairs
HttpHeaders httpResponseHeaders = httpResponse.headers();
int httpResponseStatus = httpResponse.statusCode();

Flux<InputStream> inputStreamObservable;

if (request.getOperationType() == OperationType.Delete) {
// for delete we don't expect any body
inputStreamObservable = Flux.just(IOUtils.toInputStream("", StandardCharsets.UTF_8));
} else {
// transforms the ByteBufFlux to Flux<InputStream>
inputStreamObservable = httpResponse
.body()
.flatMap(byteBuf ->
Flux.just(IOUtils.toInputStream(byteBuf.toString(StandardCharsets.UTF_8), StandardCharsets.UTF_8)));
}

return inputStreamObservable
.flatMap(contentInputStream -> {
try {
// If there is any error in the header response this throws exception
// TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
validateOrThrow(request,
HttpResponseStatus.valueOf(httpResponseStatus),
httpResponseHeaders,
null,
contentInputStream);

// transforms to Observable<StoreResponse>
StoreResponse rsp = new StoreResponse(httpResponseStatus, HttpUtils
.unescape(httpResponseHeaders.toMap().entrySet()), contentInputStream);
return Flux.just(rsp);
} catch (Exception e) {
return Flux.error(e);
}
}).single();

}).map(RxDocumentServiceResponse::new).flux();

} else {
return httpResponseMono.flatMap(httpResponse -> {
Flux<String> contentObservable;

// header key/value pairs
HttpHeaders httpResponseHeaders = httpResponse.headers();
int httpResponseStatus = httpResponse.statusCode();

Flux<String> contentObservable;

if (request.getOperationType() == OperationType.Delete) {
// for delete we don't expect any body
contentObservable = Flux.just(StringUtils.EMPTY);
} else {
// transforms the ByteBufFlux to Flux<String>
contentObservable = toString(httpResponse.body()).flux();
}
if (request.getOperationType() == OperationType.Delete) {
// for delete we don't expect any body
contentObservable = Flux.just(StringUtils.EMPTY);
} else {
// transforms the ByteBufFlux to Flux<String>
contentObservable = httpResponse
.bodyAsString()
.switchIfEmpty(Mono.just(StringUtils.EMPTY))
.flux();
}

return contentObservable
.flatMap(content -> {
try {
// If there is any error in the header response this throws exception
// TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null);

// transforms to Observable<StoreResponse>
StoreResponse rsp = new StoreResponse(httpResponseStatus,
HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()),
content);
return Flux.just(rsp);
} catch (Exception e) {
return Flux.error(e);
}
}).single();

}).map(RxDocumentServiceResponse::new)
.onErrorResume(throwable -> {
if (!(throwable instanceof Exception)) {
// fatal error
logger.error("Unexpected failure {}", throwable.getMessage(), throwable);
return Mono.error(throwable);
}

Exception exception = (Exception) throwable;
if (!(exception instanceof CosmosClientException)) {
// wrap in CosmosClientException
logger.error("Network failure", exception);
CosmosClientException dce = BridgeInternal.createCosmosClientException(0, exception);
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
return Mono.error(dce);
}

return Mono.error(exception);
}).flux();
}
return contentObservable
.flatMap(content -> {
try {
// If there is any error in the header response this throws exception
// TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null);

// transforms to Observable<StoreResponse>
StoreResponse rsp = new StoreResponse(httpResponseStatus,
HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()),
content);
return Flux.just(rsp);
} catch (Exception e) {
return Flux.error(e);
}
})
.single();

}).map(RxDocumentServiceResponse::new)
.onErrorResume(throwable -> {
Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable);
if (!(unwrappedException instanceof Exception)) {
// fatal error
logger.error("Unexpected failure {}", unwrappedException.getMessage(), unwrappedException);
return Mono.error(unwrappedException);
}

Exception exception = (Exception) unwrappedException;
if (!(exception instanceof CosmosClientException)) {
// wrap in CosmosClientException
logger.error("Network failure", exception);
CosmosClientException dce = BridgeInternal.createCosmosClientException(0, exception);
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
return Mono.error(dce);
}

return Mono.error(exception);
}).flux();
}

private void validateOrThrow(RxDocumentServiceRequest request, HttpResponseStatus status, HttpHeaders headers, String body,
Expand Down Expand Up @@ -498,4 +441,4 @@ private void applySessionToken(RxDocumentServiceRequest request) {
headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, sessionToken);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.data.cosmos.internal.RxDocumentServiceRequest;
import com.azure.data.cosmos.internal.routing.PartitionKeyRangeIdentity;
import org.apache.commons.lang3.StringUtils;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

import java.util.Map;
Expand Down Expand Up @@ -105,13 +106,14 @@ private Mono<DocumentCollection> resolveByPartitionKeyRangeIdentityAsync(Partiti
// which contains value "<collectionrid>,<partitionkeyrangeid>", then resolve to collection rid in this header.
if (partitionKeyRangeIdentity != null && partitionKeyRangeIdentity.getCollectionRid() != null) {
return this.resolveByRidAsync(partitionKeyRangeIdentity.getCollectionRid(), properties)
.onErrorResume(e -> {
if (e instanceof NotFoundException) {
.onErrorResume(t -> {
Throwable unwrappedException = Exceptions.unwrap(t);
if (unwrappedException instanceof NotFoundException) {
// This is signal to the upper logic either to refresh
// collection cache and retry.
return Mono.error(new InvalidPartitionException(RMResources.InvalidDocumentCollection));
}
return Mono.error(e);
return Mono.error(unwrappedException);

});
}
Expand Down Expand Up @@ -165,7 +167,7 @@ private Mono<Void> refreshAsync(RxDocumentServiceRequest request) {
});
}).then();
} else {
// In case of ForceRefresh directive coming from client, there will be no ResolvedCollectionRid, so we
// In case of ForceRefresh directive coming from client, there will be no ResolvedCollectionRid, so we
// need to refresh unconditionally.
mono = Mono.fromRunnable(() -> this.refresh(request.getResourceAddress(), request.properties));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.commons.collections4.ComparatorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -164,7 +165,8 @@ Mono<StoreResponse> writePrivateAsync(
.doOnError(
t -> {
try {
CosmosClientException ex = Utils.as(t, CosmosClientException.class);
Throwable unwrappedException = Exceptions.unwrap(t);
CosmosClientException ex = Utils.as(unwrappedException, CosmosClientException.class);
try {
BridgeInternal.recordResponse(request.requestContext.cosmosResponseDiagnostics, request,
storeReader.createStoreResult(null, ex, false, false, primaryUri));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class ErrorUtils {
private static final Logger logger = LoggerFactory.getLogger(ErrorUtils.class);

static Mono<String> getErrorResponseAsync(HttpResponse responseMessage, HttpRequest request) {
Mono<String> responseAsString = ResponseUtils.toString(responseMessage.body());
Mono<String> responseAsString = responseMessage.bodyAsString();
if (request.httpMethod() == HttpMethod.DELETE) {
return Mono.just(StringUtils.EMPTY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,15 @@ public Mono<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest reque
}

return addresses;
}).onErrorResume(ex -> {
CosmosClientException dce = com.azure.data.cosmos.internal.Utils.as(ex, CosmosClientException.class);
}).onErrorResume(throwable -> {
Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable);
CosmosClientException dce = com.azure.data.cosmos.internal.Utils.as(unwrappedException, CosmosClientException.class);
if (dce == null) {
logger.error("unexpected failure", ex);
if (forceRefreshPartitionAddressesModified) {
this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
}
return Mono.error(ex);
return Mono.error(unwrappedException);
} else {
logger.debug("tryGetAddresses dce", dce);
if (Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND) ||
Expand All @@ -238,7 +239,7 @@ public Mono<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest reque
logger.debug("tryGetAddresses: inner onErrorResumeNext return null", dce);
return Mono.empty();
}
return Mono.error(ex);
return Mono.error(unwrappedException);
}

});
Expand Down
Loading

0 comments on commit 205b57e

Please sign in to comment.