Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rntbd health check improvement 2 #33464

Merged
merged 14 commits into from
Feb 16, 2023
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
#### Breaking Changes

#### Bugs Fixed
- Change feed pull API is suing an incorrect key value for collection lookup, which can result in using the old collection in collection recreate scenarios. - See [PR 33178](https://github.com/Azure/azure-sdk-for-java/pull/33178)
* Change feed pull API is using an incorrect key value for collection lookup, which can result in using the old collection in collection recreate scenarios. - See [PR 33178](https://github.com/Azure/azure-sdk-for-java/pull/33178)
* Added improvement in `RntbdClientChannelHealthChecker` for detecting transit timeout. - See [PR 33464](https://github.com/Azure/azure-sdk-for-java/pull/33464)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,29 +278,6 @@ public static <E extends CosmosException> RntbdChannelAcquisitionTimeline getCha
return e.getChannelAcquisitionTimeline();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> E setChannelTaskQueueSize(E e, int value) {
e.setRntbdChannelTaskQueueSize(value);
return e;
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> int getRntbdPendingRequestQueueSize(E e) {
return e.getRntbdPendingRequestQueueSize();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> E setRntbdPendingRequestQueueSize(E e, int value) {
e.setRntbdPendingRequestQueueSize(value);
return e;
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> int getChannelTaskQueueSize(E e) {
return e.getRntbdChannelTaskQueueSize();
}


@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> E setRntbdRequestLength(E e, int requestLen) {
e.setRntbdRequestLength(requestLen);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.azure.cosmos.implementation.batch.BatchExecUtils;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelStatistics;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -83,14 +84,14 @@ public class CosmosException extends AzureException {
private CosmosError cosmosError;

/**
* RNTBD channel task queue size
* RNTBD endpoint statistics
*/
private int rntbdChannelTaskQueueSize;
private RntbdEndpointStatistics rntbdEndpointStatistics;

/**
* RNTBD endpoint statistics
*/
private RntbdEndpointStatistics rntbdEndpointStatistics;
private RntbdChannelStatistics rntbdChannelStatistics;

/**
* LSN
Expand Down Expand Up @@ -122,11 +123,6 @@ public class CosmosException extends AzureException {
*/
private int requestPayloadLength;

/**
* RNTBD pending request queue size
*/
private int rntbdPendingRequestQueueSize;

/**
* RNTBD request length
*/
Expand Down Expand Up @@ -500,6 +496,14 @@ RntbdEndpointStatistics getRntbdServiceEndpointStatistics() {
return this.rntbdEndpointStatistics;
}

RntbdChannelStatistics getRntbdChannelStatistics() {
return this.rntbdChannelStatistics;
}

void setRntbdChannelStatistics(RntbdChannelStatistics rntbdChannelStatistics) {
this.rntbdChannelStatistics = rntbdChannelStatistics;
}

void setRntbdRequestLength(int rntbdRequestLength) {
this.rntbdRequestLength = rntbdRequestLength;
}
Expand Down Expand Up @@ -532,22 +536,6 @@ void setSendingRequestHasStarted(boolean hasSendingRequestStarted) {
this.sendingRequestHasStarted = hasSendingRequestStarted;
}

int getRntbdChannelTaskQueueSize() {
return this.rntbdChannelTaskQueueSize;
}

void setRntbdChannelTaskQueueSize(int rntbdChannelTaskQueueSize) {
this.rntbdChannelTaskQueueSize = rntbdChannelTaskQueueSize;
}

int getRntbdPendingRequestQueueSize() {
return this.rntbdChannelTaskQueueSize;
}

void setRntbdPendingRequestQueueSize(int rntbdPendingRequestQueueSize) {
this.rntbdPendingRequestQueueSize = rntbdPendingRequestQueueSize;
}

List<String> getReplicaStatusList() {
return this.replicaStatusList;
}
Expand All @@ -568,6 +556,20 @@ public List<String> getReplicaStatusList(CosmosException cosmosException) {
return cosmosException.getReplicaStatusList();
}

@Override
public CosmosException setRntbdChannelStatistics(
CosmosException cosmosException,
RntbdChannelStatistics rntbdChannelStatistics) {

cosmosException.setRntbdChannelStatistics(rntbdChannelStatistics);
return cosmosException;
}

@Override
public RntbdChannelStatistics getRntbdChannelStatistics(CosmosException cosmosException) {
return cosmosException.getRntbdChannelStatistics();
}

});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.cosmos;

import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import io.netty.channel.ChannelOption;

Expand All @@ -23,7 +24,7 @@ public final class DirectConnectionConfig {
private static final Duration DEFAULT_IDLE_ENDPOINT_TIMEOUT = Duration.ofHours(1l);
private static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(5L);
private static final Duration DEFAULT_NETWORK_REQUEST_TIMEOUT = Duration.ofSeconds(5L);
private static final Duration MIN_NETWORK_REQUEST_TIMEOUT = Duration.ofSeconds(5L);
private static final Duration MIN_NETWORK_REQUEST_TIMEOUT = Duration.ofSeconds(1L);
private static final Duration MAX_NETWORK_REQUEST_TIMEOUT = Duration.ofSeconds(10L);
private static final int DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT = 130;
private static final int DEFAULT_MAX_REQUESTS_PER_CONNECTION = 30;
Expand All @@ -39,6 +40,7 @@ public final class DirectConnectionConfig {
private int maxRequestsPerConnection;
private int ioThreadCountPerCoreFactor;
private int ioThreadPriority;
private boolean healthCheckTimeoutDetectionEnabled;

/**
* Constructor
Expand All @@ -53,6 +55,7 @@ public DirectConnectionConfig() {
this.networkRequestTimeout = DEFAULT_NETWORK_REQUEST_TIMEOUT;
this.ioThreadCountPerCoreFactor = DEFAULT_IO_THREAD_COUNT_PER_CORE_FACTOR;
this.ioThreadPriority = DEFAULT_IO_THREAD_PRIORITY;
this.healthCheckTimeoutDetectionEnabled = Configs.isTcpHealthCheckTimeoutDetectionEnabled();
}

/**
Expand Down Expand Up @@ -258,7 +261,7 @@ public Duration getNetworkRequestTimeout() {
* Sets the network request timeout interval (time to wait for response from network peer).
*
* Default value is 5 seconds.
* It only allows values &ge;5s and &le;10s. (backend allows requests to take up-to 5 seconds processing time - 5 seconds
* It only allows values &ge;1s and &le;10s. (backend allows requests to take up-to 5 seconds processing time - 5 seconds
* buffer so 10 seconds in total for transport is more than sufficient).
*
* Attention! Please adjust this value with caution.
Expand Down Expand Up @@ -298,6 +301,15 @@ DirectConnectionConfig setIoThreadPriority(int ioThreadPriority) {
return this;
}

DirectConnectionConfig setHealthCheckTimeoutDetectionEnabled(boolean timeoutDetectionEnabled) {
this.healthCheckTimeoutDetectionEnabled = timeoutDetectionEnabled;
return this;
}

boolean isHealthCheckTimeoutDetectionEnabled() {
return this.healthCheckTimeoutDetectionEnabled;
}

@Override
public String toString() {
return "DirectConnectionConfig{" +
Expand All @@ -309,6 +321,7 @@ public String toString() {
", networkRequestTimeout=" + networkRequestTimeout +
", ioThreadCountPerCoreFactor=" + ioThreadCountPerCoreFactor +
", ioThreadPriority=" + ioThreadPriority +
", tcpHealthCheckTimeoutDetectionEnabled=" + healthCheckTimeoutDetectionEnabled +
'}';
}

Expand Down Expand Up @@ -339,6 +352,19 @@ public DirectConnectionConfig setIoThreadPriority(DirectConnectionConfig config,
int ioThreadPriority) {
return config.setIoThreadPriority(ioThreadPriority);
}

@Override
public DirectConnectionConfig setHealthCheckTimeoutDetectionEnabled(
DirectConnectionConfig directConnectionConfig, boolean timeoutDetectionEnabled) {

directConnectionConfig.setHealthCheckTimeoutDetectionEnabled(timeoutDetectionEnabled);
return directConnectionConfig;
}

@Override
public boolean isHealthCheckTimeoutDetectionEnabled(DirectConnectionConfig directConnectionConfig) {
return directConnectionConfig.isHealthCheckTimeoutDetectionEnabled();
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ public class Configs {
private static final String REPLICA_ADDRESS_VALIDATION_ENABLED = "COSMOS.REPLICA_ADDRESS_VALIDATION_ENABLED";
private static final boolean DEFAULT_REPLICA_ADDRESS_VALIDATION_ENABLED = true;

// Rntbd health check related config
private static final String TCP_HEALTH_CHECK_TIMEOUT_DETECTION_ENABLED = "COSMOS.TCP_HEALTH_CHECK_TIMEOUT_DETECTION_ENABLED";
private static final boolean DEFAULT_TCP_HEALTH_CHECK_TIMEOUT_DETECTION_ENABLED = true;

public Configs() {
this.sslContext = sslContextInit();
}
Expand Down Expand Up @@ -314,4 +318,10 @@ public static boolean isReplicaAddressValidationEnabled() {
REPLICA_ADDRESS_VALIDATION_ENABLED,
DEFAULT_REPLICA_ADDRESS_VALIDATION_ENABLED);
}

public static boolean isTcpHealthCheckTimeoutDetectionEnabled() {
return getJVMConfigAsBoolean(
TCP_HEALTH_CHECK_TIMEOUT_DETECTION_ENABLED,
DEFAULT_TCP_HEALTH_CHECK_TIMEOUT_DETECTION_ENABLED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public final class ConnectionPolicy {
private boolean tcpConnectionEndpointRediscoveryEnabled;
private int ioThreadCountPerCoreFactor;
private int ioThreadPriority;
private boolean tcpHealthCheckTimeoutDetectionEnabled;

/**
* Constructor.
Expand All @@ -62,7 +63,10 @@ public ConnectionPolicy(GatewayConnectionConfig gatewayConnectionConfig) {
this(ConnectionMode.GATEWAY, DirectConnectionConfig.getDefaultConfig(), gatewayConnectionConfig);
}

private ConnectionPolicy(ConnectionMode connectionMode, DirectConnectionConfig directConnectionConfig, GatewayConnectionConfig gatewayConnectionConfig) {
private ConnectionPolicy(
ConnectionMode connectionMode,
DirectConnectionConfig directConnectionConfig,
GatewayConnectionConfig gatewayConnectionConfig) {
this();
this.connectionMode = connectionMode;
this.connectTimeout = directConnectionConfig.getConnectTimeout();
Expand All @@ -84,6 +88,11 @@ private ConnectionPolicy(ConnectionMode connectionMode, DirectConnectionConfig d
this.maxConnectionPoolSize = gatewayConnectionConfig.getMaxConnectionPoolSize();
this.httpNetworkRequestTimeout = BridgeInternal.getNetworkRequestTimeoutFromGatewayConnectionConfig(gatewayConnectionConfig);
this.proxy = gatewayConnectionConfig.getProxy();
this.tcpHealthCheckTimeoutDetectionEnabled =
ImplementationBridgeHelpers
.DirectConnectionConfigHelper
.getDirectConnectionConfigAccessor()
.isHealthCheckTimeoutDetectionEnabled(directConnectionConfig);
}

private ConnectionPolicy() {
Expand All @@ -94,6 +103,7 @@ private ConnectionPolicy() {
this.throttlingRetryOptions = new ThrottlingRetryOptions();
this.userAgentSuffix = "";
this.ioThreadPriority = Thread.NORM_PRIORITY;
this.tcpHealthCheckTimeoutDetectionEnabled = true;
}

/**
Expand Down Expand Up @@ -550,6 +560,10 @@ public ConnectionPolicy setMaxRequestsPerConnection(int maxRequestsPerConnection

public int getIoThreadPriority() { return this.ioThreadPriority; }

public boolean isTcpHealthCheckTimeoutDetectionEnabled() {
return this.tcpHealthCheckTimeoutDetectionEnabled;
}

public ConnectionPolicy setIoThreadCountPerCoreFactor(int ioThreadCountPerCoreFactor) {
this.ioThreadCountPerCoreFactor = ioThreadCountPerCoreFactor;
return this;
Expand Down Expand Up @@ -582,6 +596,9 @@ public String toString() {
", maxConnectionsPerEndpoint=" + maxConnectionsPerEndpoint +
", maxRequestsPerConnection=" + maxRequestsPerConnection +
", tcpConnectionEndpointRediscoveryEnabled=" + tcpConnectionEndpointRediscoveryEnabled +
", ioThreadPriority=" + ioThreadPriority +
", ioThreadCountPerCoreFactor=" + ioThreadCountPerCoreFactor +
", tcpHealthCheckTimeoutDetectionEnabled=" + tcpHealthCheckTimeoutDetectionEnabled +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.azure.cosmos.implementation.batch.ItemBatchOperation;
import com.azure.cosmos.implementation.batch.PartitionScopeThresholds;
import com.azure.cosmos.implementation.clienttelemetry.TagName;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelStatistics;
import com.azure.cosmos.implementation.patch.PatchOperation;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
Expand Down Expand Up @@ -192,6 +193,9 @@ DirectConnectionConfig setIoThreadCountPerCoreFactor(
int getIoThreadPriority(DirectConnectionConfig config);
DirectConnectionConfig setIoThreadPriority(
DirectConnectionConfig config, int ioThreadPriority);
DirectConnectionConfig setHealthCheckTimeoutDetectionEnabled(
DirectConnectionConfig directConnectionConfig, boolean timeoutDetectionEnabled);
boolean isHealthCheckTimeoutDetectionEnabled(DirectConnectionConfig directConnectionConfig);
}
}

Expand Down Expand Up @@ -1081,6 +1085,8 @@ public static void setCosmosExceptionAccessor(final CosmosExceptionAccessor newA
public interface CosmosExceptionAccessor {
CosmosException createCosmosException(int statusCode, Exception innerException);
List<String> getReplicaStatusList(CosmosException cosmosException);
CosmosException setRntbdChannelStatistics(CosmosException cosmosException, RntbdChannelStatistics rntbdChannelStatistics);
RntbdChannelStatistics getRntbdChannelStatistics(CosmosException cosmosException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public CpuLoadHistory(List<CpuLoad> cpuLoad, Duration monitoringInterval) {

public boolean isCpuOverloaded() {
if (cpuOverload.get() == null) {
cpuOverload.set(isCpuOverloadInternal());
cpuOverload.set(isCpuOverloadedInternal());
}

return cpuOverload.get();
Expand All @@ -55,13 +55,25 @@ Instant getLastTimestamp() {
return this.cpuLoad.get(this.cpuLoad.size() - 1).timestamp;
}

private boolean isCpuOverloadInternal() {
private boolean isCpuOverloadedInternal() {
if (isCpuOverThreshold(90.0)) {
return true;
}

return delayInThreadScheduling();
}

public boolean isCpuOverThreshold(double cpuThreshold) {
for (int index = 0; index < this.cpuLoad.size(); ++index) {
if ((double) this.cpuLoad.get(index).value > 90.0) {
if ((double) this.cpuLoad.get(index).value > cpuThreshold) {
return true;
}
}

return false;
}

private boolean delayInThreadScheduling() {
// This signal is fragile, because the timestamps come from
// a non-monotonic clock that might have gotten adjusted by
// e.g. NTP.
Expand Down
Loading