Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds, wrr: randomize the initial deadline in the scheduler #9922

Merged
merged 2 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,23 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
private final Ticker ticker;

public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker);
this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, new Random());
}

public WeightedRoundRobinLoadBalancer(WrrHelper helper, Ticker ticker) {
public WeightedRoundRobinLoadBalancer(WrrHelper helper, Ticker ticker, Random random) {
super(helper);
helper.setLoadBalancer(this);
this.ticker = checkNotNull(ticker, "ticker");
this.infTime = ticker.nanoTime() + Long.MAX_VALUE;
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
this.updateWeightTask = new UpdateWeightTask();
this.random = new Random();
this.random = random;
}

@VisibleForTesting
WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker, Random random) {
this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, random);
}

@Override
Expand All @@ -100,8 +105,7 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {

@Override
public RoundRobinPicker createReadyPicker(List<Subchannel> activeList) {
int startIndex = random.nextInt(activeList.size());
return new WeightedRoundRobinPicker(activeList, startIndex);
return new WeightedRoundRobinPicker(activeList);
}

private final class UpdateWeightTask implements Runnable {
Expand Down Expand Up @@ -228,8 +232,8 @@ final class WeightedRoundRobinPicker extends ReadyPicker {
private volatile EdfScheduler scheduler;
private volatile boolean rrMode;

WeightedRoundRobinPicker(List<Subchannel> list, int startIndex) {
super(checkNotNull(list, "list"), startIndex);
WeightedRoundRobinPicker(List<Subchannel> list) {
super(checkNotNull(list, "list"), random.nextInt(list.size()));
Preconditions.checkArgument(!list.isEmpty(), "empty list");
this.list = list;
updateWeight();
Expand Down Expand Up @@ -266,7 +270,7 @@ private void updateWeight() {
rrMode = true;
return;
}
EdfScheduler scheduler = new EdfScheduler(list.size());
EdfScheduler scheduler = new EdfScheduler(list.size(), random);
avgWeight /= 1.0 * weightedChannelCount;
for (int i = 0; i < list.size(); i++) {
WrrSubchannel subchannel = (WrrSubchannel) list.get(i);
Expand Down Expand Up @@ -348,18 +352,21 @@ static final class EdfScheduler {

private final Object lock = new Object();

private final Random random;

/**
* Use the item's deadline as the order in the priority queue. If the deadlines are the same,
* use the index. Index should be unique.
*/
EdfScheduler(int initialCapacity) {
EdfScheduler(int initialCapacity, Random random) {
this.prioQueue = new PriorityQueue<ObjectState>(initialCapacity, (o1, o2) -> {
if (o1.deadline == o2.deadline) {
return Integer.compare(o1.index, o2.index);
} else {
return Double.compare(o1.deadline, o2.deadline);
}
});
this.random = random;
}

/**
Expand All @@ -371,8 +378,8 @@ static final class EdfScheduler {
void add(int index, double weight) {
checkArgument(weight > 0.0, "Weights need to be positive.");
ObjectState state = new ObjectState(Math.max(weight, MINIMUM_WEIGHT), index);
state.deadline = 1 / state.weight;
// TODO(zivy): randomize the initial deadline.
// Randomize the initial deadline.
state.deadline = random.nextDouble() * (1 / state.weight);
prioQueue.add(state);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
return subchannel;
}
});
wrr = new WeightedRoundRobinLoadBalancer(helper, fakeClock.getDeadlineTicker());
wrr = new WeightedRoundRobinLoadBalancer(helper, fakeClock.getDeadlineTicker(),
new FakeRandom());
}

@Test
Expand Down Expand Up @@ -625,7 +626,7 @@ public void edfScheduler() {
double totalWeight = 0;
int capacity = random.nextInt(10) + 1;
double[] weights = new double[capacity];
EdfScheduler scheduler = new EdfScheduler(capacity);
EdfScheduler scheduler = new EdfScheduler(capacity, random);
for (int i = 0; i < capacity; i++) {
weights[i] = random.nextDouble();
scheduler.add(i, weights[i]);
Expand All @@ -643,7 +644,7 @@ public void edfScheduler() {

@Test
public void edsScheduler_sameWeight() {
EdfScheduler scheduler = new EdfScheduler(2);
EdfScheduler scheduler = new EdfScheduler(2, new FakeRandom());
scheduler.add(0, 0.5);
scheduler.add(1, 0.5);
assertThat(scheduler.pick()).isEqualTo(0);
Expand All @@ -670,4 +671,12 @@ private static class FakeSocketAddress extends SocketAddress {
return "FakeSocketAddress-" + name;
}
}

private static class FakeRandom extends Random {
@Override
public double nextDouble() {
// return constant value to disable init deadline randomization in the scheduler
return 0.322023;
}
}
}