Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds:Allow big cluster total weight #9864

Merged
merged 14 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/src/main/java/io/grpc/MethodDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public String getServiceName() {
/**
* A convenience method for {@code extractBareMethodName(getFullMethodName())}.
*
* @since 1.32.0
* @since 1.33.0
*/
@Nullable
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5635")
Expand Down Expand Up @@ -413,7 +413,7 @@ public static String extractFullServiceName(String fullMethodName) {
* Extract the method name out of a fully qualified method name. May return {@code null}
* if the input is malformed, but you cannot rely on it for the validity of the input.
*
* @since 1.32.0
* @since 1.33.0
*/
@Nullable
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5635")
Expand Down
2 changes: 1 addition & 1 deletion buildscripts/grpc-java-artifacts/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ RUN yum install -y \
yum clean all

# Install Maven
RUN curl -Ls http://apache.cs.utah.edu/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz | \
RUN curl -Ls http://dlcdn.apache.org/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz | \
tar xz -C /var/local
ENV PATH /var/local/apache-maven-3.3.9/bin:$PATH
6 changes: 3 additions & 3 deletions repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ def grpc_java_repositories():
if not native.existing_rule("com_github_cncf_xds"):
http_archive(
name = "com_github_cncf_xds",
strip_prefix = "xds-d92e9ce0af512a73a3a126b32fa4920bee12e180",
sha256 = "27be88b1ff2844885d3b2d0d579546f3a8b3f26b4871eed89082c9709e49a4bd",
strip_prefix = "xds-06c439db220b89134a8a49bad41994560d6537c6",
sha256 = "41ea212940ab44bf7f8a8b4169cfbc612ed2166dafabc0a56a8820ef665fc6a4",
urls = [
"https://github.com/cncf/xds/archive/d92e9ce0af512a73a3a126b32fa4920bee12e180.tar.gz",
"https://github.com/cncf/xds/archive/06c439db220b89134a8a49bad41994560d6537c6.tar.gz",
],
)
if not native.existing_rule("com_github_grpc_grpc"):
Expand Down
22 changes: 20 additions & 2 deletions services/src/main/java/io/grpc/services/CallMetricRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class CallMetricRecorder {
new AtomicReference<>();
private double cpuUtilizationMetric = 0;
private double memoryUtilizationMetric = 0;
private double qps = 0;
private volatile boolean disabled;

/**
Expand Down Expand Up @@ -158,6 +159,23 @@ public CallMetricRecorder recordMemoryUtilizationMetric(double value) {
return this;
}

/**
* Records a call metric measurement for qps.
* If RPC has already finished, this method is no-op.
*
* <p>A latter record will overwrite its former name-sakes.
*
* @return this recorder object
* @since 1.54.0
*/
public CallMetricRecorder recordQpsMetric(double value) {
if (disabled) {
return this;
}
qps = value;
return this;
}


/**
* Returns all request cost metric values. No more metric values will be recorded after this
Expand Down Expand Up @@ -187,8 +205,8 @@ MetricReport finalizeAndDump2() {
if (savedUtilizationMetrics == null) {
savedUtilizationMetrics = Collections.emptyMap();
}
return new MetricReport(cpuUtilizationMetric,
memoryUtilizationMetric, Collections.unmodifiableMap(savedRequestCostMetrics),
return new MetricReport(cpuUtilizationMetric, memoryUtilizationMetric, qps,
Collections.unmodifiableMap(savedRequestCostMetrics),
Collections.unmodifiableMap(savedUtilizationMetrics)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public static MetricReport finalizeAndDump2(CallMetricRecorder recorder) {
}

public static MetricReport createMetricReport(double cpuUtilization, double memoryUtilization,
Map<String, Double> requestCostMetrics, Map<String, Double> utilizationMetrics) {
return new MetricReport(cpuUtilization, memoryUtilization,
requestCostMetrics, utilizationMetrics);
double qps, Map<String, Double> requestCostMetrics, Map<String, Double> utilizationMetrics) {
return new MetricReport(cpuUtilization, memoryUtilization, qps, requestCostMetrics,
utilizationMetrics);
}
}
17 changes: 16 additions & 1 deletion services/src/main/java/io/grpc/services/MetricRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class MetricRecorder {
private volatile ConcurrentHashMap<String, Double> metricsData = new ConcurrentHashMap<>();
private volatile double cpuUtilization;
private volatile double memoryUtilization;
private volatile double qps;

public static MetricRecorder newInstance() {
return new MetricRecorder();
Expand Down Expand Up @@ -86,8 +87,22 @@ public void clearMemoryUtilizationMetric() {
memoryUtilization = 0;
}

/**
* Update the QPS metrics data.
*/
public void setQps(double value) {
qps = value;
}

/**
* Clear the QPS metrics data.
*/
public void clearQps() {
qps = 0;
}

MetricReport getMetricReport() {
return new MetricReport(cpuUtilization, memoryUtilization,
return new MetricReport(cpuUtilization, memoryUtilization, qps,
Collections.emptyMap(), Collections.unmodifiableMap(metricsData));
}
}
13 changes: 10 additions & 3 deletions services/src/main/java/io/grpc/services/MetricReport.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@
public final class MetricReport {
private double cpuUtilization;
private double memoryUtilization;
private double qps;
private Map<String, Double> requestCostMetrics;
private Map<String, Double> utilizationMetrics;

MetricReport(double cpuUtilization, double memoryUtilization,
Map<String, Double> requestCostMetrics,
Map<String, Double> utilizationMetrics) {
MetricReport(double cpuUtilization, double memoryUtilization, double qps,
Map<String, Double> requestCostMetrics,
Map<String, Double> utilizationMetrics) {
this.cpuUtilization = cpuUtilization;
this.memoryUtilization = memoryUtilization;
this.qps = qps;
this.requestCostMetrics = checkNotNull(requestCostMetrics, "requestCostMetrics");
this.utilizationMetrics = checkNotNull(utilizationMetrics, "utilizationMetrics");
}
Expand All @@ -58,13 +60,18 @@ public Map<String, Double> getUtilizationMetrics() {
return utilizationMetrics;
}

public double getQps() {
return qps;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("cpuUtilization", cpuUtilization)
.add("memoryUtilization", memoryUtilization)
.add("requestCost", requestCostMetrics)
.add("utilization", utilizationMetrics)
.add("qps", qps)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public void dumpDumpsAllSavedMetricValues() {
recorder.recordRequestCostMetric("cost3", 1.0);
recorder.recordCpuUtilizationMetric(0.1928);
recorder.recordMemoryUtilizationMetric(47.4);
recorder.recordQpsMetric(2522.54);

MetricReport dump = recorder.finalizeAndDump2();
Truth.assertThat(dump.getUtilizationMetrics())
Expand All @@ -54,6 +55,7 @@ public void dumpDumpsAllSavedMetricValues() {
.containsExactly("cost1", 37465.12, "cost2", 10293.0, "cost3", 1.0);
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0.1928);
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(47.4);
Truth.assertThat(dump.getQps()).isEqualTo(2522.54);
}

@Test
Expand All @@ -75,6 +77,8 @@ public void lastValueWinForMetricsWithSameName() {
recorder.recordUtilizationMetric("util1", 28374.21);
recorder.recordMemoryUtilizationMetric(9384.0);
recorder.recordUtilizationMetric("util1", 84323.3);
recorder.recordQpsMetric(1928.3);
recorder.recordQpsMetric(100.8);

MetricReport dump = recorder.finalizeAndDump2();
Truth.assertThat(dump.getRequestCostMetrics())
Expand All @@ -83,6 +87,7 @@ public void lastValueWinForMetricsWithSameName() {
Truth.assertThat(dump.getUtilizationMetrics())
.containsExactly("util1", 84323.3);
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0);
Truth.assertThat(dump.getQps()).isEqualTo(100.8);
}

@Test
Expand Down
7 changes: 7 additions & 0 deletions xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ interface ThreadSafeRandom {

long nextLong();

long nextLong(long bound);

final class ThreadSafeRandomImpl implements ThreadSafeRandom {

static final ThreadSafeRandom instance = new ThreadSafeRandomImpl();
Expand All @@ -40,5 +42,10 @@ public int nextInt(int bound) {
public long nextLong() {
return ThreadLocalRandom.current().nextLong();
}

@Override
public long nextLong(long bound) {
return ThreadLocalRandom.current().nextLong(bound);
}
}
}
28 changes: 17 additions & 11 deletions xds/src/main/java/io/grpc/xds/WeightedRandomPicker.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.primitives.UnsignedInteger;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.SubchannelPicker;
Expand All @@ -34,21 +35,22 @@ final class WeightedRandomPicker extends SubchannelPicker {
final List<WeightedChildPicker> weightedChildPickers;

private final ThreadSafeRandom random;
private final int totalWeight;
private final long totalWeight;

static final class WeightedChildPicker {
private final int weight;
private final long weight;
private final SubchannelPicker childPicker;

WeightedChildPicker(int weight, SubchannelPicker childPicker) {
WeightedChildPicker(long weight, SubchannelPicker childPicker) {
checkArgument(weight >= 0, "weight is negative");
checkArgument(weight <= UnsignedInteger.MAX_VALUE.longValue(), "weight is too large");
checkNotNull(childPicker, "childPicker is null");

this.weight = weight;
this.childPicker = childPicker;
}

int getWeight() {
long getWeight() {
return weight;
}

Expand Down Expand Up @@ -93,12 +95,16 @@ public String toString() {

this.weightedChildPickers = Collections.unmodifiableList(weightedChildPickers);

int totalWeight = 0;
long totalWeight = 0;
for (WeightedChildPicker weightedChildPicker : weightedChildPickers) {
int weight = weightedChildPicker.getWeight();
long weight = weightedChildPicker.getWeight();
checkArgument(weight >= 0, "weight is negative");
checkNotNull(weightedChildPicker.getPicker(), "childPicker is null");
totalWeight += weight;
}
this.totalWeight = totalWeight;
checkArgument(totalWeight <= UnsignedInteger.MAX_VALUE.longValue(),
"total weight greater than unsigned int can hold");

this.random = random;
}
Expand All @@ -111,15 +117,15 @@ public final PickResult pickSubchannel(PickSubchannelArgs args) {
childPicker =
weightedChildPickers.get(random.nextInt(weightedChildPickers.size())).getPicker();
} else {
int rand = random.nextInt(totalWeight);
long rand = random.nextLong(totalWeight);

// Find the first idx such that rand < accumulatedWeights[idx]
// Not using Arrays.binarySearch for better readability.
int accumulatedWeight = 0;
for (int idx = 0; idx < weightedChildPickers.size(); idx++) {
accumulatedWeight += weightedChildPickers.get(idx).getWeight();
long accumulatedWeight = 0;
for (WeightedChildPicker weightedChildPicker : weightedChildPickers) {
accumulatedWeight += weightedChildPicker.getWeight();
if (rand < accumulatedWeight) {
childPicker = weightedChildPickers.get(idx).getPicker();
childPicker = weightedChildPicker.getPicker();
break;
}
}
Expand Down
6 changes: 3 additions & 3 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -437,12 +437,12 @@ public Result selectConfig(PickSubchannelArgs args) {
if (action.cluster() != null) {
cluster = prefixedClusterName(action.cluster());
} else if (action.weightedClusters() != null) {
int totalWeight = 0;
long totalWeight = 0;
for (ClusterWeight weightedCluster : action.weightedClusters()) {
totalWeight += weightedCluster.weight();
}
int select = random.nextInt(totalWeight);
int accumulator = 0;
long select = random.nextLong(totalWeight);
long accumulator = 0;
for (ClusterWeight weightedCluster : action.weightedClusters()) {
accumulator += weightedCluster.weight();
if (select < accumulator) {
Expand Down
11 changes: 9 additions & 2 deletions xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.UnsignedInteger;
import com.google.protobuf.Any;
import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -477,7 +478,7 @@ static StructOrError<RouteAction> parseRouteAction(
return StructOrError.fromError("No cluster found in weighted cluster list");
}
List<ClusterWeight> weightedClusters = new ArrayList<>();
int clusterWeightSum = 0;
long clusterWeightSum = 0;
for (io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight clusterWeight
: clusterWeights) {
StructOrError<ClusterWeight> clusterWeightOrError =
Expand All @@ -492,14 +493,20 @@ static StructOrError<RouteAction> parseRouteAction(
if (clusterWeightSum <= 0) {
return StructOrError.fromError("Sum of cluster weights should be above 0.");
}
if (clusterWeightSum > UnsignedInteger.MAX_VALUE.longValue()) {
return StructOrError.fromError(String.format(
"Sum of cluster weights should be less than the maximum unsigned integer (%d), but"
+ " was %d. ",
UnsignedInteger.MAX_VALUE.longValue(), clusterWeightSum));
}
return StructOrError.fromStruct(VirtualHost.Route.RouteAction.forWeightedClusters(
weightedClusters, hashPolicies, timeoutNano, retryPolicy));
case CLUSTER_SPECIFIER_PLUGIN:
if (enableRouteLookup) {
String pluginName = proto.getClusterSpecifierPlugin();
PluginConfig pluginConfig = pluginConfigMap.get(pluginName);
if (pluginConfig == null) {
// Skip route if the plugin is not registered, but it's optional.
// Skip route if the plugin is not registered, but it is optional.
if (optionalPlugins.contains(pluginName)) {
return null;
}
Expand Down
4 changes: 2 additions & 2 deletions xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ public void inboundTrailers(Metadata trailers) {

static MetricReport fromOrcaLoadReport(OrcaLoadReport loadReport) {
return InternalCallMetricRecorder.createMetricReport(loadReport.getCpuUtilization(),
loadReport.getMemUtilization(), loadReport.getRequestCostMap(),
loadReport.getUtilizationMap());
loadReport.getMemUtilization(), loadReport.getRpsFractional(),
loadReport.getRequestCostMap(), loadReport.getUtilizationMap());
}

/**
Expand Down
1 change: 1 addition & 0 deletions xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ private OrcaLoadReport generateMetricsReport() {
InternalMetricRecorder.getMetricReport(metricRecorder);
return OrcaLoadReport.newBuilder().setCpuUtilization(internalReport.getCpuUtilization())
.setMemUtilization(internalReport.getMemoryUtilization())
.setRpsFractional(internalReport.getQps())
.putAllUtilization(internalReport.getUtilizationMetrics())
.build();
}
Expand Down
Loading