diff --git a/api/src/main/java/io/grpc/MethodDescriptor.java b/api/src/main/java/io/grpc/MethodDescriptor.java index c1b8a9ed723..c85be6b6478 100644 --- a/api/src/main/java/io/grpc/MethodDescriptor.java +++ b/api/src/main/java/io/grpc/MethodDescriptor.java @@ -265,7 +265,7 @@ public String getServiceName() { /** * A convenience method for {@code extractBareMethodName(getFullMethodName())}. * - * @since 1.32.0 + * @since 1.33.0 */ @Nullable @ExperimentalApi("https://github.com/grpc/grpc-java/issues/5635") @@ -413,7 +413,7 @@ public static String extractFullServiceName(String fullMethodName) { * Extract the method name out of a fully qualified method name. May return {@code null} * if the input is malformed, but you cannot rely on it for the validity of the input. * - * @since 1.32.0 + * @since 1.33.0 */ @Nullable @ExperimentalApi("https://github.com/grpc/grpc-java/issues/5635") diff --git a/buildscripts/grpc-java-artifacts/Dockerfile b/buildscripts/grpc-java-artifacts/Dockerfile index 5bab7bf29fb..b75b54318a6 100644 --- a/buildscripts/grpc-java-artifacts/Dockerfile +++ b/buildscripts/grpc-java-artifacts/Dockerfile @@ -20,6 +20,6 @@ RUN yum install -y \ yum clean all # Install Maven -RUN curl -Ls http://apache.cs.utah.edu/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz | \ +RUN curl -Ls http://dlcdn.apache.org/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz | \ tar xz -C /var/local ENV PATH /var/local/apache-maven-3.3.9/bin:$PATH diff --git a/repositories.bzl b/repositories.bzl index cced1d29bee..c7c9cf736c3 100644 --- a/repositories.bzl +++ b/repositories.bzl @@ -95,10 +95,10 @@ def grpc_java_repositories(): if not native.existing_rule("com_github_cncf_xds"): http_archive( name = "com_github_cncf_xds", - strip_prefix = "xds-d92e9ce0af512a73a3a126b32fa4920bee12e180", - sha256 = "27be88b1ff2844885d3b2d0d579546f3a8b3f26b4871eed89082c9709e49a4bd", + strip_prefix = "xds-06c439db220b89134a8a49bad41994560d6537c6", + sha256 = "41ea212940ab44bf7f8a8b4169cfbc612ed2166dafabc0a56a8820ef665fc6a4", urls = [ - "https://github.com/cncf/xds/archive/d92e9ce0af512a73a3a126b32fa4920bee12e180.tar.gz", + "https://github.com/cncf/xds/archive/06c439db220b89134a8a49bad41994560d6537c6.tar.gz", ], ) if not native.existing_rule("com_github_grpc_grpc"): diff --git a/services/src/main/java/io/grpc/services/CallMetricRecorder.java b/services/src/main/java/io/grpc/services/CallMetricRecorder.java index d93f93606f9..8aff9a0d8ac 100644 --- a/services/src/main/java/io/grpc/services/CallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/CallMetricRecorder.java @@ -43,6 +43,7 @@ public final class CallMetricRecorder { new AtomicReference<>(); private double cpuUtilizationMetric = 0; private double memoryUtilizationMetric = 0; + private double qps = 0; private volatile boolean disabled; /** @@ -158,6 +159,23 @@ public CallMetricRecorder recordMemoryUtilizationMetric(double value) { return this; } + /** + * Records a call metric measurement for qps. + * If RPC has already finished, this method is no-op. + * + *

A latter record will overwrite its former name-sakes. + * + * @return this recorder object + * @since 1.54.0 + */ + public CallMetricRecorder recordQpsMetric(double value) { + if (disabled) { + return this; + } + qps = value; + return this; + } + /** * Returns all request cost metric values. No more metric values will be recorded after this @@ -187,8 +205,8 @@ MetricReport finalizeAndDump2() { if (savedUtilizationMetrics == null) { savedUtilizationMetrics = Collections.emptyMap(); } - return new MetricReport(cpuUtilizationMetric, - memoryUtilizationMetric, Collections.unmodifiableMap(savedRequestCostMetrics), + return new MetricReport(cpuUtilizationMetric, memoryUtilizationMetric, qps, + Collections.unmodifiableMap(savedRequestCostMetrics), Collections.unmodifiableMap(savedUtilizationMetrics) ); } diff --git a/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java b/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java index 97e5e5a0aa6..6cee9048c4c 100644 --- a/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java @@ -46,8 +46,8 @@ public static MetricReport finalizeAndDump2(CallMetricRecorder recorder) { } public static MetricReport createMetricReport(double cpuUtilization, double memoryUtilization, - Map requestCostMetrics, Map utilizationMetrics) { - return new MetricReport(cpuUtilization, memoryUtilization, - requestCostMetrics, utilizationMetrics); + double qps, Map requestCostMetrics, Map utilizationMetrics) { + return new MetricReport(cpuUtilization, memoryUtilization, qps, requestCostMetrics, + utilizationMetrics); } } diff --git a/services/src/main/java/io/grpc/services/MetricRecorder.java b/services/src/main/java/io/grpc/services/MetricRecorder.java index a576386e98b..7a541cceeab 100644 --- a/services/src/main/java/io/grpc/services/MetricRecorder.java +++ b/services/src/main/java/io/grpc/services/MetricRecorder.java @@ -30,6 +30,7 @@ public final class MetricRecorder { private volatile ConcurrentHashMap metricsData = new ConcurrentHashMap<>(); private volatile double cpuUtilization; private volatile double memoryUtilization; + private volatile double qps; public static MetricRecorder newInstance() { return new MetricRecorder(); @@ -86,8 +87,22 @@ public void clearMemoryUtilizationMetric() { memoryUtilization = 0; } + /** + * Update the QPS metrics data. + */ + public void setQps(double value) { + qps = value; + } + + /** + * Clear the QPS metrics data. + */ + public void clearQps() { + qps = 0; + } + MetricReport getMetricReport() { - return new MetricReport(cpuUtilization, memoryUtilization, + return new MetricReport(cpuUtilization, memoryUtilization, qps, Collections.emptyMap(), Collections.unmodifiableMap(metricsData)); } } diff --git a/services/src/main/java/io/grpc/services/MetricReport.java b/services/src/main/java/io/grpc/services/MetricReport.java index 56ab150f8af..73aba7a2af9 100644 --- a/services/src/main/java/io/grpc/services/MetricReport.java +++ b/services/src/main/java/io/grpc/services/MetricReport.java @@ -30,14 +30,16 @@ public final class MetricReport { private double cpuUtilization; private double memoryUtilization; + private double qps; private Map requestCostMetrics; private Map utilizationMetrics; - MetricReport(double cpuUtilization, double memoryUtilization, - Map requestCostMetrics, - Map utilizationMetrics) { + MetricReport(double cpuUtilization, double memoryUtilization, double qps, + Map requestCostMetrics, + Map utilizationMetrics) { this.cpuUtilization = cpuUtilization; this.memoryUtilization = memoryUtilization; + this.qps = qps; this.requestCostMetrics = checkNotNull(requestCostMetrics, "requestCostMetrics"); this.utilizationMetrics = checkNotNull(utilizationMetrics, "utilizationMetrics"); } @@ -58,6 +60,10 @@ public Map getUtilizationMetrics() { return utilizationMetrics; } + public double getQps() { + return qps; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -65,6 +71,7 @@ public String toString() { .add("memoryUtilization", memoryUtilization) .add("requestCost", requestCostMetrics) .add("utilization", utilizationMetrics) + .add("qps", qps) .toString(); } } diff --git a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java index 9811d1da92e..e4f4155c9be 100644 --- a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java +++ b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java @@ -46,6 +46,7 @@ public void dumpDumpsAllSavedMetricValues() { recorder.recordRequestCostMetric("cost3", 1.0); recorder.recordCpuUtilizationMetric(0.1928); recorder.recordMemoryUtilizationMetric(47.4); + recorder.recordQpsMetric(2522.54); MetricReport dump = recorder.finalizeAndDump2(); Truth.assertThat(dump.getUtilizationMetrics()) @@ -54,6 +55,7 @@ public void dumpDumpsAllSavedMetricValues() { .containsExactly("cost1", 37465.12, "cost2", 10293.0, "cost3", 1.0); Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0.1928); Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(47.4); + Truth.assertThat(dump.getQps()).isEqualTo(2522.54); } @Test @@ -75,6 +77,8 @@ public void lastValueWinForMetricsWithSameName() { recorder.recordUtilizationMetric("util1", 28374.21); recorder.recordMemoryUtilizationMetric(9384.0); recorder.recordUtilizationMetric("util1", 84323.3); + recorder.recordQpsMetric(1928.3); + recorder.recordQpsMetric(100.8); MetricReport dump = recorder.finalizeAndDump2(); Truth.assertThat(dump.getRequestCostMetrics()) @@ -83,6 +87,7 @@ public void lastValueWinForMetricsWithSameName() { Truth.assertThat(dump.getUtilizationMetrics()) .containsExactly("util1", 84323.3); Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0); + Truth.assertThat(dump.getQps()).isEqualTo(100.8); } @Test diff --git a/xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java b/xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java index 1e844cede36..533ccee2375 100644 --- a/xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java +++ b/xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java @@ -25,6 +25,8 @@ interface ThreadSafeRandom { long nextLong(); + long nextLong(long bound); + final class ThreadSafeRandomImpl implements ThreadSafeRandom { static final ThreadSafeRandom instance = new ThreadSafeRandomImpl(); @@ -40,5 +42,10 @@ public int nextInt(int bound) { public long nextLong() { return ThreadLocalRandom.current().nextLong(); } + + @Override + public long nextLong(long bound) { + return ThreadLocalRandom.current().nextLong(bound); + } } } diff --git a/xds/src/main/java/io/grpc/xds/WeightedRandomPicker.java b/xds/src/main/java/io/grpc/xds/WeightedRandomPicker.java index 1f5fc6d01df..904f3872b6d 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRandomPicker.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRandomPicker.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.primitives.UnsignedInteger; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.SubchannelPicker; @@ -34,21 +35,22 @@ final class WeightedRandomPicker extends SubchannelPicker { final List weightedChildPickers; private final ThreadSafeRandom random; - private final int totalWeight; + private final long totalWeight; static final class WeightedChildPicker { - private final int weight; + private final long weight; private final SubchannelPicker childPicker; - WeightedChildPicker(int weight, SubchannelPicker childPicker) { + WeightedChildPicker(long weight, SubchannelPicker childPicker) { checkArgument(weight >= 0, "weight is negative"); + checkArgument(weight <= UnsignedInteger.MAX_VALUE.longValue(), "weight is too large"); checkNotNull(childPicker, "childPicker is null"); this.weight = weight; this.childPicker = childPicker; } - int getWeight() { + long getWeight() { return weight; } @@ -93,12 +95,16 @@ public String toString() { this.weightedChildPickers = Collections.unmodifiableList(weightedChildPickers); - int totalWeight = 0; + long totalWeight = 0; for (WeightedChildPicker weightedChildPicker : weightedChildPickers) { - int weight = weightedChildPicker.getWeight(); + long weight = weightedChildPicker.getWeight(); + checkArgument(weight >= 0, "weight is negative"); + checkNotNull(weightedChildPicker.getPicker(), "childPicker is null"); totalWeight += weight; } this.totalWeight = totalWeight; + checkArgument(totalWeight <= UnsignedInteger.MAX_VALUE.longValue(), + "total weight greater than unsigned int can hold"); this.random = random; } @@ -111,15 +117,15 @@ public final PickResult pickSubchannel(PickSubchannelArgs args) { childPicker = weightedChildPickers.get(random.nextInt(weightedChildPickers.size())).getPicker(); } else { - int rand = random.nextInt(totalWeight); + long rand = random.nextLong(totalWeight); // Find the first idx such that rand < accumulatedWeights[idx] // Not using Arrays.binarySearch for better readability. - int accumulatedWeight = 0; - for (int idx = 0; idx < weightedChildPickers.size(); idx++) { - accumulatedWeight += weightedChildPickers.get(idx).getWeight(); + long accumulatedWeight = 0; + for (WeightedChildPicker weightedChildPicker : weightedChildPickers) { + accumulatedWeight += weightedChildPicker.getWeight(); if (rand < accumulatedWeight) { - childPicker = weightedChildPickers.get(idx).getPicker(); + childPicker = weightedChildPicker.getPicker(); break; } } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 094bb944d85..8a5992ab612 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -437,12 +437,12 @@ public Result selectConfig(PickSubchannelArgs args) { if (action.cluster() != null) { cluster = prefixedClusterName(action.cluster()); } else if (action.weightedClusters() != null) { - int totalWeight = 0; + long totalWeight = 0; for (ClusterWeight weightedCluster : action.weightedClusters()) { totalWeight += weightedCluster.weight(); } - int select = random.nextInt(totalWeight); - int accumulator = 0; + long select = random.nextLong(totalWeight); + long accumulator = 0; for (ClusterWeight weightedCluster : action.weightedClusters()) { accumulator += weightedCluster.weight(); if (select < accumulator) { diff --git a/xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java b/xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java index ed109fd694b..6ae23406d6b 100644 --- a/xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java +++ b/xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java @@ -24,6 +24,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.primitives.UnsignedInteger; import com.google.protobuf.Any; import com.google.protobuf.Duration; import com.google.protobuf.InvalidProtocolBufferException; @@ -477,7 +478,7 @@ static StructOrError parseRouteAction( return StructOrError.fromError("No cluster found in weighted cluster list"); } List weightedClusters = new ArrayList<>(); - int clusterWeightSum = 0; + long clusterWeightSum = 0; for (io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight clusterWeight : clusterWeights) { StructOrError clusterWeightOrError = @@ -492,6 +493,12 @@ static StructOrError parseRouteAction( if (clusterWeightSum <= 0) { return StructOrError.fromError("Sum of cluster weights should be above 0."); } + if (clusterWeightSum > UnsignedInteger.MAX_VALUE.longValue()) { + return StructOrError.fromError(String.format( + "Sum of cluster weights should be less than the maximum unsigned integer (%d), but" + + " was %d. ", + UnsignedInteger.MAX_VALUE.longValue(), clusterWeightSum)); + } return StructOrError.fromStruct(VirtualHost.Route.RouteAction.forWeightedClusters( weightedClusters, hashPolicies, timeoutNano, retryPolicy)); case CLUSTER_SPECIFIER_PLUGIN: @@ -499,7 +506,7 @@ static StructOrError parseRouteAction( String pluginName = proto.getClusterSpecifierPlugin(); PluginConfig pluginConfig = pluginConfigMap.get(pluginName); if (pluginConfig == null) { - // Skip route if the plugin is not registered, but it's optional. + // Skip route if the plugin is not registered, but it is optional. if (optionalPlugins.contains(pluginName)) { return null; } diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java b/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java index 0c2c7395b47..97414529678 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java @@ -254,8 +254,8 @@ public void inboundTrailers(Metadata trailers) { static MetricReport fromOrcaLoadReport(OrcaLoadReport loadReport) { return InternalCallMetricRecorder.createMetricReport(loadReport.getCpuUtilization(), - loadReport.getMemUtilization(), loadReport.getRequestCostMap(), - loadReport.getUtilizationMap()); + loadReport.getMemUtilization(), loadReport.getRpsFractional(), + loadReport.getRequestCostMap(), loadReport.getUtilizationMap()); } /** diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java b/xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java index 1ea64f70bf2..30522a5e0f6 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java @@ -150,6 +150,7 @@ private OrcaLoadReport generateMetricsReport() { InternalMetricRecorder.getMetricReport(metricRecorder); return OrcaLoadReport.newBuilder().setCpuUtilization(internalReport.getCpuUtilization()) .setMemUtilization(internalReport.getMemoryUtilization()) + .setRpsFractional(internalReport.getQps()) .putAllUtilization(internalReport.getUtilizationMetrics()) .build(); } diff --git a/xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java index ecdd96a7341..d6240fb09bb 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java @@ -87,7 +87,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { private static final class FakeRandom implements ThreadSafeRandom { int nextInt; - int bound; + long bound; + Long nextLong; @Override public int nextInt(int bound) { @@ -102,6 +103,23 @@ public int nextInt(int bound) { public long nextLong() { throw new UnsupportedOperationException("Should not be called"); } + + @Override + public long nextLong(long bound) { + this.bound = bound; + + if (nextLong == null) { + assertThat(nextInt).isAtLeast(0); + if (bound <= Integer.MAX_VALUE) { + assertThat(nextInt).isLessThan((int)bound); + } + return nextInt; + } + + assertThat(nextLong).isAtLeast(0); + assertThat(nextLong).isLessThan(bound); + return nextLong; + } } private final FakeRandom fakeRandom = new FakeRandom(); @@ -120,6 +138,24 @@ public void negativeWeight() { new WeightedChildPicker(-1, childPicker0); } + @Test + public void overWeightSingle() { + thrown.expect(IllegalArgumentException.class); + new WeightedChildPicker(Integer.MAX_VALUE * 3L, childPicker0); + } + + @Test + public void overWeightAggregate() { + + List weightedChildPickers = Arrays.asList( + new WeightedChildPicker(Integer.MAX_VALUE, childPicker0), + new WeightedChildPicker(Integer.MAX_VALUE, childPicker1), + new WeightedChildPicker(10, childPicker2)); + + thrown.expect(IllegalArgumentException.class); + new WeightedRandomPicker(weightedChildPickers, fakeRandom); + } + @Test public void pickWithFakeRandom() { WeightedChildPicker weightedChildPicker0 = new WeightedChildPicker(0, childPicker0); @@ -156,6 +192,36 @@ public void pickWithFakeRandom() { assertThat(fakeRandom.bound).isEqualTo(25); } + @Test + public void pickFromLargeTotal() { + + List weightedChildPickers = Arrays.asList( + new WeightedChildPicker(10, childPicker0), + new WeightedChildPicker(Integer.MAX_VALUE, childPicker1), + new WeightedChildPicker(10, childPicker2)); + WeightedRandomPicker xdsPicker = new WeightedRandomPicker(weightedChildPickers,fakeRandom); + + long totalWeight = weightedChildPickers.stream() + .mapToLong(WeightedChildPicker::getWeight) + .reduce(0, Long::sum); + + fakeRandom.nextLong = 5L; + assertThat(xdsPicker.pickSubchannel(pickSubchannelArgs)).isSameInstanceAs(pickResult0); + assertThat(fakeRandom.bound).isEqualTo(totalWeight); + + fakeRandom.nextLong = 16L; + assertThat(xdsPicker.pickSubchannel(pickSubchannelArgs)).isSameInstanceAs(pickResult1); + assertThat(fakeRandom.bound).isEqualTo(totalWeight); + + fakeRandom.nextLong = Integer.MAX_VALUE + 10L; + assertThat(xdsPicker.pickSubchannel(pickSubchannelArgs)).isSameInstanceAs(pickResult2); + assertThat(fakeRandom.bound).isEqualTo(totalWeight); + + fakeRandom.nextLong = Integer.MAX_VALUE + 15L; + assertThat(xdsPicker.pickSubchannel(pickSubchannelArgs)).isSameInstanceAs(pickResult2); + assertThat(fakeRandom.bound).isEqualTo(totalWeight); + } + @Test public void allZeroWeights() { WeightedChildPicker weightedChildPicker0 = new WeightedChildPicker(0, childPicker0); diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index b6f8b3c3663..3d934e16aa3 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -24,6 +24,7 @@ import static io.grpc.xds.FaultFilter.HEADER_DELAY_PERCENTAGE_KEY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -994,6 +995,7 @@ public void resolved_raceBetweenClusterReleasedAndResourceUpdateAddBackAgain() { @Test public void resolved_simpleCallSucceeds_routeToWeightedCluster() { when(mockRandom.nextInt(anyInt())).thenReturn(90, 10); + when(mockRandom.nextLong(anyLong())).thenReturn(90L, 10L); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate( diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaServiceImplTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaServiceImplTest.java index a292df0f035..1063b2bec0c 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaServiceImplTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaServiceImplTest.java @@ -143,19 +143,23 @@ public void testRequestIntervalLess() { ClientCall call = channel.newCall( OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT); defaultTestService.putUtilizationMetric("buffer", 0.2); + defaultTestService.setQps(1.9); call.start(listener, new Metadata()); call.sendMessage(OrcaLoadReportRequest.newBuilder() .setReportInterval(Duration.newBuilder().setSeconds(0).setNanos(500).build()).build()); call.halfClose(); call.request(1); - OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2).build(); + OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2) + .setRpsFractional(1.9).build(); verify(listener).onMessage(eq(expect)); reset(listener); defaultTestService.removeUtilizationMetric("buffer0"); + defaultTestService.clearQps(); assertThat(fakeClock.forwardTime(500, TimeUnit.NANOSECONDS)).isEqualTo(0); verifyNoInteractions(listener); assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(1); call.request(1); + expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2).build(); verify(listener).onMessage(eq(expect)); } @@ -245,17 +249,20 @@ public void testApis() throws Exception { .setMemUtilization(random.nextDouble()) .putAllUtilization(firstUtilization) .putUtilization("queue", 1.0) + .setRpsFractional(1239.01) .build(); defaultTestService.setCpuUtilizationMetric(goldenReport.getCpuUtilization()); defaultTestService.setMemoryUtilizationMetric(goldenReport.getMemUtilization()); defaultTestService.setAllUtilizationMetrics(firstUtilization); defaultTestService.putUtilizationMetric("queue", 1.0); + defaultTestService.setQps(1239.01); Iterator reports = OpenRcaServiceGrpc.newBlockingStub(channel) .streamCoreMetrics(OrcaLoadReportRequest.newBuilder().build()); assertThat(reports.next()).isEqualTo(goldenReport); defaultTestService.clearCpuUtilizationMetric(); defaultTestService.clearMemoryUtilizationMetric(); + defaultTestService.clearQps(); fakeClock.forwardTime(1, TimeUnit.SECONDS); goldenReport = OrcaLoadReport.newBuilder() .putAllUtilization(firstUtilization)