Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add weighted round robin LB policy support #9873

Merged
merged 30 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1f7bbd8
refactor round robin LB
YifeiZhuang Jan 23, 2023
73cf208
rename abstract*
YifeiZhuang Jan 25, 2023
12e7928
refactor round robin LB
YifeiZhuang Jan 23, 2023
2727d9b
rename abstract*
YifeiZhuang Jan 25, 2023
764c9a9
temp: add weightedroundrobinimpl
YifeiZhuang Jan 25, 2023
afa10fb
add weighted round robin picker and scheduler
YifeiZhuang Feb 2, 2023
d4f785d
comments, and add afterSubchannelUpdate
YifeiZhuang Feb 3, 2023
60af73c
format
YifeiZhuang Feb 3, 2023
4228f97
move abstraction to composition
YifeiZhuang Feb 3, 2023
cf3d640
remove listener
YifeiZhuang Feb 3, 2023
c7bcfcd
add update timer to LB , not in picker
YifeiZhuang Feb 4, 2023
975eeed
use original round robin wl
YifeiZhuang Feb 6, 2023
5300199
add subchannel listener
YifeiZhuang Feb 6, 2023
2458ac9
add test
YifeiZhuang Feb 7, 2023
9cd33b6
Merge branch 'master' of https://github.com/grpc/grpc-java into wrr-impl
YifeiZhuang Feb 8, 2023
23231eb
add more tests
YifeiZhuang Feb 8, 2023
3d51405
add provider test
YifeiZhuang Feb 9, 2023
e6886c1
fix current picker
YifeiZhuang Feb 9, 2023
3da127e
remove virtual time, change comment
YifeiZhuang Feb 10, 2023
cb31730
fix avg weight
YifeiZhuang Feb 13, 2023
01a7d1a
Merge branch 'master' of https://github.com/grpc/grpc-java into wrr-impl
YifeiZhuang Feb 13, 2023
6e496da
Merge branch 'master' of https://github.com/grpc/grpc-java into wrr-impl
YifeiZhuang Feb 14, 2023
6dfa072
add env variable
YifeiZhuang Feb 16, 2023
f786098
Merge branch 'master' of https://github.com/grpc/grpc-java into wrr-impl
YifeiZhuang Feb 16, 2023
0bdce32
parse wrr proto
YifeiZhuang Feb 17, 2023
3979ef8
add test scheduler
YifeiZhuang Feb 23, 2023
0704a69
fix comments, timer, volatile, etc
YifeiZhuang Feb 23, 2023
fc0d3dd
bazel checksum, and use ticker
YifeiZhuang Feb 24, 2023
f728281
infTime = nanoTime() + MAX_VALUE
YifeiZhuang Feb 24, 2023
1ef6221
minor fix
YifeiZhuang Feb 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions xds/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ java_library(
":envoy_service_load_stats_v3_java_grpc",
":envoy_service_status_v3_java_grpc",
":xds_protos_java",
":orca",
"//:auto_value_annotations",
"//alts",
"//api",
Expand Down
78 changes: 34 additions & 44 deletions xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ static final class WrrSubchannel extends ForwardingSubchannel {
private final TimeProvider timeProvider;
private final OrcaOobReportListener oobListener = this::onLoadReport;
private final OrcaPerRequestReportListener perRpcListener = this::onLoadReport;
volatile long lastUpdated;
volatile long nonEmptySince;
volatile double weight;
private volatile long lastUpdated;
private volatile long nonEmptySince;
private volatile double weight;
private volatile WeightedRoundRobinLoadBalancerConfig config;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this volatile? It appears to only be accessed in the synchronization context.

Copy link
Contributor Author

@YifeiZhuang YifeiZhuang Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh it appears so.
It can be used by pick() from SubchannelStateListener from transport thread, or here in the LB thread in weightUpdateTask or acceptResolvedAddresses, all in sync context.


WrrSubchannel(Subchannel delegate, TimeProvider timeProvider) {
Expand Down Expand Up @@ -217,13 +217,13 @@ protected Subchannel delegate() {
final class WeightedRoundRobinPicker extends ReadyPicker {
private final List<Subchannel> list;
private final AtomicReference<EdfScheduler> schedulerRef;
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
private volatile boolean rrMode = false;
private volatile boolean rrMode = true;
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved

WeightedRoundRobinPicker(List<Subchannel> list, int startIndex) {
super(list, startIndex);
super(checkNotNull(list, "list"), startIndex);
Preconditions.checkArgument(!list.isEmpty(), "empty list");
this.list = list;
this.schedulerRef = new AtomicReference<>(new EdfScheduler());
this.schedulerRef = new AtomicReference<>(new EdfScheduler(list.size()));
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
updateWeight();
}

Expand All @@ -245,7 +245,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
}

private void updateWeight() {
EdfScheduler scheduler = new EdfScheduler();
EdfScheduler scheduler = new EdfScheduler(list.size());
int weightedChannelCount = 0;
double avgWeight = 0;
for (Subchannel value : list) {
Expand All @@ -255,22 +255,24 @@ private void updateWeight() {
weightedChannelCount++;
}
}
rrMode = weightedChannelCount < 2;
if (rrMode) {
if (weightedChannelCount < 2) {
rrMode = true;
return;
}
avgWeight /= 1.0 * weightedChannelCount;
for (int i = 0; i < list.size(); i++) {
WrrSubchannel subchannel = (WrrSubchannel) list.get(i);
double newWeight = subchannel.getWeight();
scheduler.add(i, newWeight > 0 ? newWeight : avgWeight);
}
schedulerRef.set(scheduler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you are completely replacing the scheduler each time you update weights, if something has a large weight and something else has a minimum weight the second one might never get used.
One way of dealing with this would be for ObjectState to have a flag indicating whether this was added from a pick, then for all of the ones that weren't added from a pick you could use the old deadline (or the smaller of the old deadline and the newly calculated one). You could have the flag passed to scheduler.add() and all of the work done in the add method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely true.
Note stubby has similar "completion ratio" mechanism, that gives credits to subchannels in the previous state when updating to the next state with the new weight. This way, the minimum weight channel can possibly be picked.
The current implementation is very simplified. I'll make it as a future improvement and I will capture it in the design doc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you are completely replacing the scheduler each time you update weights, if something has a large weight and something else has a minimum weight the second one might never get used.

Randomizing the scheduler each creation should prevent that. Worst-case, if a schedule is used for only a single pick after being created, then that is the same as a WRR implementation that has weighted ranges for each choice and uses a random number to choose (same approach as weighted_target).

The code right now does not seem to randomize the initial scheduler state, but it will need to before de-experimentalizing.

rrMode = false;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class)
.add("list", list).toString();
.add("list", list).add("rrMode", rrMode).toString();
}

@VisibleForTesting
Expand All @@ -294,29 +296,27 @@ public boolean isEquivalentTo(RoundRobinPicker picker) {
* The earliest deadline first implementation in which each object is
* chosen deterministically and periodically with frequency proportional to its weight.
*
* <p>Specifically, each object added to chooser is given a period equal to the multiplicative
* inverse of its weight. The place of each object in its period is tracked, and each call to
* choose returns the object with the least remaining time in its period (1/weight).
* <p>Specifically, each object added to chooser is given a deadline equal to the multiplicative
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
* inverse of its weight. The place of each object in its deadline is tracked, and each call to
* choose returns the object with the least remaining time in its deadline (1/weight).
* (Ties are broken by the order in which the children were added to the chooser.)
* For example, if items A and B are added
* with weights 0.5 and 0.2, successive chooses return:
* For example, if items A and B are added with weights 0.5 and 0.2, successive chooses return:
*
* <ul>
* <li>In the first call, the remaining periods are A=2 (1/0.5) and B=5 (1/0.2), so A is
* returned. The period of A (as it was picked), is substracted from periods of all other
* objects.
* <li>Next, the remaining periods are A=2 and B=3, so A is returned. The period of A (2) is
* substracted from all other objects (B=1) and A is re-added with A=2.
* <li>Remaining periods are A=2 and B=1, so B is returned. The period of B (1) is substracted
* from all other objects (A=1) and B is re-added with B=5.
* <li>Remaining periods are A=1 and B=5, so A is returned. The period of A (1) is substracted
* from all other objects (B=4) and A is re-added with A=2.
* <li>Remaining periods are A=2 and B=4, so A is returned. The period of A (2) is substracted
* from all other objects (B=2) and A is re-added with A=2.
* <li>Remaining periods are A=2 and B=2, so A is returned. The period of A (2) is substracted
* from all other objects (B=0) and A is re-added with A=2.
* <li>Remaining periods are A=2 and B=0, so B is returned. The period of B (0) is substracted
* from all other objects (A=2) and B is re-added with B=5.
* <li>In the first call, the deadlines are A=2 (1/0.5) and B=5 (1/0.2), so A is returned.
* The deadline of A is updated to 4.
* <li>Next, the remaining deadlines are A=4 and B=5, so A is returned. The deadline of A (2) is
* updated to A=6.
* <li>Remaining deadlines are A=6 and B=5, so B is returned. The deadline of B is updated with
* with B=10.
* <li>Remaining deadlines are A=6 and B=10, so A is returned. The deadline of A is updated with
* A=8.
* <li>Remaining deadlines are A=8 and B=10, so A is returned. The deadline of A is updated with
* A=10.
* <li>Remaining deadlines are A=10 and B=10, so A is returned. The deadline of A is updated
* with A=12.
* <li>Remaining deadlines are A=12 and B=10, so B is returned. The deadline of B is updated
* with B=15.
* <li>etc.
* </ul>
*
Expand All @@ -333,13 +333,6 @@ public boolean isEquivalentTo(RoundRobinPicker picker) {
private static final class EdfScheduler {
private final PriorityQueue<ObjectState> prioQueue;

/**
* Upon every pick() the "virtual time" is advanced closer to the period of next items.
* Here we have an explicit "virtualTimeNow", which will be added to the period of all newly
* scheduled objects (virtualTimeNow + period).
*/
private double virtualTimeNow = 0.0;

/**
* Weights below this value will be logged and upped to this minimum weight.
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
*/
Expand All @@ -351,8 +344,8 @@ private static final class EdfScheduler {
* Use the item's deadline as the order in the priority queue. If the deadlines are the same,
* use the index. Index should be unique.
*/
EdfScheduler() {
this.prioQueue = new PriorityQueue<ObjectState>(10, (o1, o2) -> {
EdfScheduler(int initialCapacity) {
this.prioQueue = new PriorityQueue<ObjectState>(initialCapacity, (o1, o2) -> {
if (o1.deadline == o2.deadline) {
return o1.index - o2.index;
} else if (o1.deadline < o2.deadline) {
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -372,7 +365,7 @@ private static final class EdfScheduler {
void add(int index, double weight) {
checkArgument(weight > 0.0, "Weights need to be positive.");
ObjectState state = new ObjectState(Math.max(weight, MINIMUM_WEIGHT), index);
state.deadline = virtualTimeNow + 1 / state.weight;
state.deadline = 1 / state.weight;
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
prioQueue.add(state);
}

Expand All @@ -382,10 +375,7 @@ void add(int index, double weight) {
int pick() {
synchronized (lock) {
ObjectState minObject = prioQueue.remove();
// Simulate advancing in time by setting the current time to the period of the nearest item
// on the "time horizon".
virtualTimeNow = minObject.deadline;
minObject.deadline = virtualTimeNow + (1.0 / minObject.weight);
minObject.deadline += 1.0 / minObject.weight;
prioQueue.add(minObject);
return minObject.index;
}
Expand Down
165 changes: 149 additions & 16 deletions xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
import io.grpc.services.InternalCallMetricRecorder;
import io.grpc.services.MetricReport;
import io.grpc.util.RoundRobinLoadBalancer.EmptyPicker;
import io.grpc.xds.WeightedRoundRobinLoadBalancer.WeightedRoundRobinLoadBalancerConfig;
import io.grpc.xds.WeightedRoundRobinLoadBalancer.WeightedRoundRobinPicker;
Expand All @@ -61,6 +62,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -239,7 +241,7 @@ public void enableOobLoadReportConfig() {
assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1);
PickResult pickResult = weightedPicker.pickSubchannel(mockArgs);
assertThat(pickResult.getSubchannel()).isEqualTo(weightedSubchannel1);
assertThat(pickResult.getStreamTracerFactory()).isNotNull();
assertThat(pickResult.getStreamTracerFactory()).isNotNull(); // verify per-request listener
assertThat(oobCalls.isEmpty()).isTrue();
weightedConfig = WeightedRoundRobinLoadBalancerConfig.newBuilder().setEnableOobLoadReport(true)
.setOobReportingPeriodNanos(20_030_000_000L)
Expand All @@ -257,8 +259,8 @@ public void enableOobLoadReportConfig() {
verify(oobCalls.poll()).sendMessage(eq(golden));
}

@Test
public void pickByWeight() {
private void pickByWeight(MetricReport r1, MetricReport r2, MetricReport r3,
double p1, double p2, double p3) {
syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig)
.setAttributes(affinity).build()));
Expand All @@ -282,25 +284,47 @@ public void pickByWeight() {
WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0);
WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1);
WrrSubchannel weightedSubchannel3 = (WrrSubchannel) weightedPicker.getList().get(2);
weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport(
0.12, 0.1, 22, new HashMap<>(), new HashMap<>()));
weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport(
0.28, 0.1, 40, new HashMap<>(), new HashMap<>()));
weightedSubchannel3.onLoadReport(InternalCallMetricRecorder.createMetricReport(
0.86, 0.1, 100, new HashMap<>(), new HashMap<>()));
weightedSubchannel1.onLoadReport(r1);
weightedSubchannel2.onLoadReport(r2);
weightedSubchannel3.onLoadReport(r3);
assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1);
Map<Subchannel, Integer> pickCount = new HashMap<>();
for (int i = 0; i < 1000; i++) {
for (int i = 0; i < 10000; i++) {
Subchannel result = weightedPicker.pickSubchannel(mockArgs).getSubchannel();
pickCount.put(result, pickCount.getOrDefault(result, 0) + 1);
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
}
assertThat(pickCount.size()).isEqualTo(3);
assertThat(Math.abs(pickCount.get(weightedSubchannel1) / 1000.0
- 22 / 0.12 / (22 / 0.12 + 40 / 0.28 + 100 / 0.86))).isAtMost(EDF_PRECISE);
assertThat(Math.abs(pickCount.get(weightedSubchannel2) / 1000.0
- 40 / 0.28 / (22 / 0.12 + 40 / 0.28 + 100 / 0.86) )).isAtMost(EDF_PRECISE);
assertThat(Math.abs(pickCount.get(weightedSubchannel3) / 1000.0
- 100 / 0.86 / ( 22 / 0.12 + 40 / 0.28 + 100 / 0.86) )).isAtMost(EDF_PRECISE);
assertThat(Math.abs(pickCount.get(weightedSubchannel1) / 10000.0 - p1)).isAtMost(EDF_PRECISE);
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
assertThat(Math.abs(pickCount.get(weightedSubchannel2) / 10000.0 - p2 )).isAtMost(EDF_PRECISE);
assertThat(Math.abs(pickCount.get(weightedSubchannel3) / 10000.0 - p3 )).isAtMost(EDF_PRECISE);
}

@Test
public void pickByWeight_LargeWeight() {
pickByWeight(InternalCallMetricRecorder.createMetricReport(
0.1, 0.1, 2200, new HashMap<>(), new HashMap<>()),
InternalCallMetricRecorder.createMetricReport(
0.9, 0.1, 2, new HashMap<>(), new HashMap<>()),
InternalCallMetricRecorder.createMetricReport(
0.86, 0.1, 100, new HashMap<>(), new HashMap<>()),
2200 / 0.1 / (2200 / 0.1 + 2 / 0.9 + 100 / 0.86),
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
27 / 0.9 / (2200 / 0.1 + 2 / 0.9 + 100 / 0.86),
100 / 0.86 / ( 2200 / 0.1 + 2 / 0.9 + 100 / 0.86)
);
}

@Test
public void pickByWeight_normalWeight() {
pickByWeight(InternalCallMetricRecorder.createMetricReport(
0.12, 0.1, 22, new HashMap<>(), new HashMap<>()),
InternalCallMetricRecorder.createMetricReport(
0.28, 0.1, 40, new HashMap<>(), new HashMap<>()),
InternalCallMetricRecorder.createMetricReport(
0.86, 0.1, 100, new HashMap<>(), new HashMap<>()),
22 / 0.12 / (22 / 0.12 + 40 / 0.28 + 100 / 0.86),
40 / 0.28 / (22 / 0.12 + 40 / 0.28 + 100 / 0.86),
100 / 0.86 / ( 22 / 0.12 + 40 / 0.28 + 100 / 0.86)
);
}

@Test
Expand Down Expand Up @@ -423,6 +447,115 @@ public void weightExpired() {
.isAtMost(EDF_PRECISE);
}

@Test
public void unknownWeightIsAvgWeight() {
syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig)
.setAttributes(affinity).build()));
verify(helper, times(3)).createSubchannel(
any(CreateSubchannelArgs.class));
assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1);

Iterator<Subchannel> it = subchannels.values().iterator();
Subchannel readySubchannel1 = it.next();
subchannelStateListeners.get(readySubchannel1).onSubchannelState(ConnectivityStateInfo
.forNonError(ConnectivityState.READY));
Subchannel readySubchannel2 = it.next();
subchannelStateListeners.get(readySubchannel2).onSubchannelState(ConnectivityStateInfo
.forNonError(ConnectivityState.READY));
Subchannel readySubchannel3 = it.next();
subchannelStateListeners.get(readySubchannel3).onSubchannelState(ConnectivityStateInfo
.forNonError(ConnectivityState.READY));
verify(helper, times(3)).updateBalancingState(
eq(ConnectivityState.READY), pickerCaptor.capture());
WeightedRoundRobinPicker weightedPicker = pickerCaptor.getAllValues().get(2);
WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0);
WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1);
WrrSubchannel weightedSubchannel3 = (WrrSubchannel) weightedPicker.getList().get(2);
weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport(
0.1, 0.1, 1, new HashMap<>(), new HashMap<>()));
weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport(
0.2, 0.1, 1, new HashMap<>(), new HashMap<>()));
assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1);
Map<Subchannel, Integer> pickCount = new HashMap<>();
for (int i = 0; i < 1000; i++) {
Subchannel result = weightedPicker.pickSubchannel(mockArgs).getSubchannel();
pickCount.put(result, pickCount.getOrDefault(result, 0) + 1);
}
assertThat(pickCount.size()).isEqualTo(3);
assertThat(Math.abs(pickCount.get(weightedSubchannel1) / 1000.0 - 4.0 / 9))
.isAtMost(EDF_PRECISE);
assertThat(Math.abs(pickCount.get(weightedSubchannel2) / 1000.0 - 2.0 / 9))
.isAtMost(EDF_PRECISE);
// subchannel3's weight is average of subchannel1 and subchannel2
assertThat(Math.abs(pickCount.get(weightedSubchannel3) / 1000.0 - 3.0 / 9))
.isAtMost(EDF_PRECISE);
}

@Test
public void pickFromOtherThread() throws Exception {
syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig)
.setAttributes(affinity).build()));
verify(helper, times(3)).createSubchannel(
any(CreateSubchannelArgs.class));
assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1);

Iterator<Subchannel> it = subchannels.values().iterator();
Subchannel readySubchannel1 = it.next();
subchannelStateListeners.get(readySubchannel1).onSubchannelState(ConnectivityStateInfo
.forNonError(ConnectivityState.READY));
Subchannel readySubchannel2 = it.next();
subchannelStateListeners.get(readySubchannel2).onSubchannelState(ConnectivityStateInfo
.forNonError(ConnectivityState.READY));
verify(helper, times(2)).updateBalancingState(
eq(ConnectivityState.READY), pickerCaptor.capture());
WeightedRoundRobinPicker weightedPicker = pickerCaptor.getAllValues().get(1);
WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0);
WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1);
weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport(
0.1, 0.1, 1, new HashMap<>(), new HashMap<>()));
weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport(
0.2, 0.1, 1, new HashMap<>(), new HashMap<>()));
assertThat(weightedPicker.toString()).contains("rrMode=true");
CyclicBarrier barrier = new CyclicBarrier(2);
new Thread(new Runnable() {
@Override
public void run() {
try {
weightedPicker.pickSubchannel(mockArgs);
barrier.await();
Map<Subchannel, Integer> pickCount = new HashMap<>();
for (int i = 0; i < 1000; i++) {
Subchannel result = weightedPicker.pickSubchannel(mockArgs).getSubchannel();
pickCount.put(result, pickCount.getOrDefault(result, 0) + 1);
}
assertThat(pickCount.size()).isEqualTo(2);
// after blackout period
assertThat(Math.abs(pickCount.get(weightedSubchannel1) / 1000.0 - 2.0 / 3))
.isAtMost(EDF_PRECISE);
assertThat(Math.abs(pickCount.get(weightedSubchannel2) / 1000.0 - 1.0 / 3))
.isAtMost(EDF_PRECISE);
} catch (Exception ex) {
throw new AssertionError(ex);
}
}
}).start();
assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1);
barrier.await();
Map<Subchannel, Integer> pickCount = new HashMap<>();
for (int i = 0; i < 1000; i++) {
Subchannel result = weightedPicker.pickSubchannel(mockArgs).getSubchannel();
pickCount.put(result, pickCount.getOrDefault(result, 0) + 1);
}
assertThat(pickCount.size()).isEqualTo(2);
// after blackout period
assertThat(Math.abs(pickCount.get(weightedSubchannel1) / 1000.0 - 2.0 / 3))
.isAtMost(EDF_PRECISE);
assertThat(Math.abs(pickCount.get(weightedSubchannel2) / 1000.0 - 1.0 / 3))
.isAtMost(EDF_PRECISE);
}

private static class FakeSocketAddress extends SocketAddress {
final String name;

Expand Down