Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add weighted round robin LB policy support #9873

Merged
merged 30 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1f7bbd8
refactor round robin LB
YifeiZhuang Jan 23, 2023
73cf208
rename abstract*
YifeiZhuang Jan 25, 2023
12e7928
refactor round robin LB
YifeiZhuang Jan 23, 2023
2727d9b
rename abstract*
YifeiZhuang Jan 25, 2023
764c9a9
temp: add weightedroundrobinimpl
YifeiZhuang Jan 25, 2023
afa10fb
add weighted round robin picker and scheduler
YifeiZhuang Feb 2, 2023
d4f785d
comments, and add afterSubchannelUpdate
YifeiZhuang Feb 3, 2023
60af73c
format
YifeiZhuang Feb 3, 2023
4228f97
move abstraction to composition
YifeiZhuang Feb 3, 2023
cf3d640
remove listener
YifeiZhuang Feb 3, 2023
c7bcfcd
add update timer to LB , not in picker
YifeiZhuang Feb 4, 2023
975eeed
use original round robin wl
YifeiZhuang Feb 6, 2023
5300199
add subchannel listener
YifeiZhuang Feb 6, 2023
2458ac9
add test
YifeiZhuang Feb 7, 2023
9cd33b6
Merge branch 'master' of https://github.com/grpc/grpc-java into wrr-impl
YifeiZhuang Feb 8, 2023
23231eb
add more tests
YifeiZhuang Feb 8, 2023
3d51405
add provider test
YifeiZhuang Feb 9, 2023
e6886c1
fix current picker
YifeiZhuang Feb 9, 2023
3da127e
remove virtual time, change comment
YifeiZhuang Feb 10, 2023
cb31730
fix avg weight
YifeiZhuang Feb 13, 2023
01a7d1a
Merge branch 'master' of https://github.com/grpc/grpc-java into wrr-impl
YifeiZhuang Feb 13, 2023
6e496da
Merge branch 'master' of https://github.com/grpc/grpc-java into wrr-impl
YifeiZhuang Feb 14, 2023
6dfa072
add env variable
YifeiZhuang Feb 16, 2023
f786098
Merge branch 'master' of https://github.com/grpc/grpc-java into wrr-impl
YifeiZhuang Feb 16, 2023
0bdce32
parse wrr proto
YifeiZhuang Feb 17, 2023
3979ef8
add test scheduler
YifeiZhuang Feb 23, 2023
0704a69
fix comments, timer, volatile, etc
YifeiZhuang Feb 23, 2023
fc0d3dd
bazel checksum, and use ticker
YifeiZhuang Feb 24, 2023
f728281
infTime = nanoTime() + MAX_VALUE
YifeiZhuang Feb 24, 2023
1ef6221
minor fix
YifeiZhuang Feb 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.NameResolver;
import io.grpc.Status;
Expand All @@ -50,7 +51,8 @@
* A {@link LoadBalancer} that provides round-robin load-balancing over the {@link
* EquivalentAddressGroup}s from the {@link NameResolver}.
*/
final class RoundRobinLoadBalancer extends LoadBalancer {
@Internal
public class RoundRobinLoadBalancer extends LoadBalancer {
@VisibleForTesting
static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.create("state-info");
Expand All @@ -59,11 +61,10 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
private final Map<EquivalentAddressGroup, Subchannel> subchannels =
new HashMap<>();
private final Random random;

private ConnectivityState currentState;
private RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK);
protected RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK);

RoundRobinLoadBalancer(Helper helper) {
public RoundRobinLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
this.random = new Random();
}
Expand Down Expand Up @@ -210,7 +211,7 @@ private void updateBalancingState() {
// initialize the Picker to a random start index to ensure that a high frequency of Picker
// churn does not skew subchannel selection.
int startIndex = random.nextInt(activeList.size());
updateBalancingState(READY, new ReadyPicker(activeList, startIndex));
updateBalancingState(READY, createReadyPicker(activeList, startIndex));
}
}

Expand All @@ -222,6 +223,10 @@ private void updateBalancingState(ConnectivityState state, RoundRobinPicker pick
}
}

protected RoundRobinPicker createReadyPicker(List<Subchannel> activeList, int startIndex) {
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
return new ReadyPicker(activeList, startIndex);
}

/**
* Filters out non-ready subchannels.
*/
Expand Down Expand Up @@ -254,7 +259,7 @@ private static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) {
}

@VisibleForTesting
Collection<Subchannel> getSubchannels() {
protected Collection<Subchannel> getSubchannels() {
return subchannels.values();
}

Expand All @@ -275,20 +280,19 @@ private static <T> Set<T> setsDifference(Set<T> a, Set<T> b) {
}

// Only subclasses are ReadyPicker or EmptyPicker
private abstract static class RoundRobinPicker extends SubchannelPicker {
abstract boolean isEquivalentTo(RoundRobinPicker picker);
public abstract static class RoundRobinPicker extends SubchannelPicker {
public abstract boolean isEquivalentTo(RoundRobinPicker picker);
}

@VisibleForTesting
static final class ReadyPicker extends RoundRobinPicker {
public static class ReadyPicker extends RoundRobinPicker {
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
private static final AtomicIntegerFieldUpdater<ReadyPicker> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index");

private final List<Subchannel> list; // non-empty
@SuppressWarnings("unused")
private volatile int index;

ReadyPicker(List<Subchannel> list, int startIndex) {
public ReadyPicker(List<Subchannel> list, int startIndex) {
Preconditions.checkArgument(!list.isEmpty(), "empty list");
this.list = list;
this.index = startIndex - 1;
Expand Down Expand Up @@ -321,7 +325,7 @@ List<Subchannel> getList() {
}

@Override
boolean isEquivalentTo(RoundRobinPicker picker) {
public boolean isEquivalentTo(RoundRobinPicker picker) {
if (!(picker instanceof ReadyPicker)) {
return false;
}
Expand All @@ -332,8 +336,7 @@ boolean isEquivalentTo(RoundRobinPicker picker) {
}
}

@VisibleForTesting
static final class EmptyPicker extends RoundRobinPicker {
public static final class EmptyPicker extends RoundRobinPicker {

private final Status status;

Expand All @@ -347,7 +350,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
}

@Override
boolean isEquivalentTo(RoundRobinPicker picker) {
public boolean isEquivalentTo(RoundRobinPicker picker) {
return picker instanceof EmptyPicker && (Objects.equal(status, ((EmptyPicker) picker).status)
|| (status.isOk() && ((EmptyPicker) picker).status.isOk()));
}
Expand Down
4 changes: 4 additions & 0 deletions xds/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ java_library(
":envoy_service_load_stats_v3_java_grpc",
":envoy_service_status_v3_java_grpc",
":xds_protos_java",
":orca",
"//:auto_value_annotations",
"//alts",
"//api",
Expand All @@ -40,6 +41,8 @@ java_library(
"//core:util",
"//netty",
"//stub",
"//services:metrics",
"//services:metrics_internal",
"@com_google_code_findbugs_jsr305//jar",
"@com_google_code_gson_gson//jar",
"@com_google_errorprone_error_prone_annotations//jar",
Expand Down Expand Up @@ -83,6 +86,7 @@ java_proto_library(
"@envoy_api//envoy/extensions/filters/http/rbac/v3:pkg",
"@envoy_api//envoy/extensions/filters/http/router/v3:pkg",
"@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg",
"@envoy_api//envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3:pkg",
"@envoy_api//envoy/extensions/load_balancing_policies/least_request/v3:pkg",
"@envoy_api//envoy/extensions/load_balancing_policies/ring_hash/v3:pkg",
"@envoy_api//envoy/extensions/load_balancing_policies/round_robin/v3:pkg",
Expand Down
70 changes: 64 additions & 6 deletions xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Struct;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.JsonFormat;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig;
import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy;
import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy.Policy;
import io.envoyproxy.envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin;
import io.envoyproxy.envoy.extensions.load_balancing_policies.least_request.v3.LeastRequest;
import io.envoyproxy.envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash;
import io.envoyproxy.envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin;
Expand Down Expand Up @@ -73,21 +75,31 @@ class LoadBalancerConfigFactory {
static final String WRR_LOCALITY_FIELD_NAME = "wrr_locality_experimental";
static final String CHILD_POLICY_FIELD = "childPolicy";

static final String BLACK_OUT_PERIOD = "blackoutPeriod";

static final String WEIGHT_EXPIRATION_PERIOD = "weightExpirationPeriod";

static final String OOB_REPORTING_PERIOD = "oobReportingPeriod";

static final String ENABLE_OOB_LOAD_REPORT = "enableOobLoadReport";

static final String WEIGHT_UPDATE_PERIOD = "weightUpdatePeriod";

/**
* Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link
* Cluster}.
*
* @throws ResourceInvalidException If the {@link Cluster} has an invalid LB configuration.
*/
static ImmutableMap<String, ?> newConfig(Cluster cluster, boolean enableLeastRequest,
boolean enableCustomLbConfig)
boolean enableCustomLbConfig, boolean enableWrr)
throws ResourceInvalidException {
// The new load_balancing_policy will always be used if it is set, but for backward
// compatibility we will fall back to using the old lb_policy field if the new field is not set.
if (cluster.hasLoadBalancingPolicy() && enableCustomLbConfig) {
try {
return LoadBalancingPolicyConverter.convertToServiceConfig(cluster.getLoadBalancingPolicy(),
0);
0, enableWrr);
} catch (MaxRecursionReachedException e) {
throw new ResourceInvalidException("Maximum LB config recursion depth reached", e);
}
Expand All @@ -111,6 +123,35 @@ class LoadBalancerConfigFactory {
return ImmutableMap.of(RING_HASH_FIELD_NAME, configBuilder.buildOrThrow());
}

/**
* Builds a service config JSON object for the weighted_round_robin load balancer config based on
* the given config values.
*/
private static ImmutableMap<String, ?> buildWrrConfig(Long blackoutPeriod,
Long weightExpirationPeriod,
Long oobReportingPeriod,
Boolean enableOobLoadReport,
Long weightUpdatePeriod) {
ImmutableMap.Builder<String, Object> configBuilder = ImmutableMap.builder();
if (blackoutPeriod != null) {
configBuilder.put(BLACK_OUT_PERIOD, blackoutPeriod.doubleValue());
}
if (weightExpirationPeriod != null) {
configBuilder.put(WEIGHT_EXPIRATION_PERIOD, weightExpirationPeriod.doubleValue());
}
if (oobReportingPeriod != null) {
configBuilder.put(OOB_REPORTING_PERIOD, oobReportingPeriod.doubleValue());
}
if (enableOobLoadReport != null) {
configBuilder.put(ENABLE_OOB_LOAD_REPORT, enableOobLoadReport);
}
if (weightUpdatePeriod != null) {
configBuilder.put(WEIGHT_UPDATE_PERIOD, weightUpdatePeriod.doubleValue());
}
return ImmutableMap.of(WeightedRoundRobinLoadBalancerProvider.SCHEME,
configBuilder.buildOrThrow());
}

/**
* Builds a service config JSON object for the least_request load balancer config based on the
* given config values..
Expand Down Expand Up @@ -151,7 +192,7 @@ static class LoadBalancingPolicyConverter {
* Converts a {@link LoadBalancingPolicy} object to a service config JSON object.
*/
private static ImmutableMap<String, ?> convertToServiceConfig(
LoadBalancingPolicy loadBalancingPolicy, int recursionDepth)
LoadBalancingPolicy loadBalancingPolicy, int recursionDepth, boolean enableWrr)
throws ResourceInvalidException, MaxRecursionReachedException {
if (recursionDepth > MAX_RECURSION) {
throw new MaxRecursionReachedException();
Expand All @@ -165,11 +206,16 @@ static class LoadBalancingPolicyConverter {
serviceConfig = convertRingHashConfig(typedConfig.unpack(RingHash.class));
} else if (typedConfig.is(WrrLocality.class)) {
serviceConfig = convertWrrLocalityConfig(typedConfig.unpack(WrrLocality.class),
recursionDepth);
recursionDepth, enableWrr);
} else if (typedConfig.is(RoundRobin.class)) {
serviceConfig = convertRoundRobinConfig();
} else if (typedConfig.is(LeastRequest.class)) {
serviceConfig = convertLeastRequestConfig(typedConfig.unpack(LeastRequest.class));
} else if (typedConfig.is(ClientSideWeightedRoundRobin.class)) {
if (enableWrr) {
serviceConfig = convertWeightedRoundRobinConfig(
typedConfig.unpack(ClientSideWeightedRoundRobin.class));
}
} else if (typedConfig.is(com.github.xds.type.v3.TypedStruct.class)) {
serviceConfig = convertCustomConfig(
typedConfig.unpack(com.github.xds.type.v3.TypedStruct.class));
Expand Down Expand Up @@ -217,14 +263,26 @@ static class LoadBalancingPolicyConverter {
ringHash.hasMaximumRingSize() ? ringHash.getMaximumRingSize().getValue() : null);
}

private static ImmutableMap<String, ?> convertWeightedRoundRobinConfig(
ClientSideWeightedRoundRobin wrr) throws ResourceInvalidException {
return buildWrrConfig(
wrr.hasBlackoutPeriod() ? Durations.toNanos(wrr.getBlackoutPeriod()) : null,
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
wrr.hasWeightExpirationPeriod()
? Durations.toNanos(wrr.getWeightExpirationPeriod()) : null,
wrr.hasOobReportingPeriod() ? Durations.toNanos(wrr.getOobReportingPeriod()) : null,
wrr.hasEnableOobLoadReport() ? wrr.getEnableOobLoadReport().getValue() : null,
wrr.hasWeightUpdatePeriod() ? Durations.toNanos(wrr.getWeightUpdatePeriod()) : null);
}

/**
* Converts a wrr_locality {@link Any} configuration to service config format.
*/
private static ImmutableMap<String, ?> convertWrrLocalityConfig(WrrLocality wrrLocality,
int recursionDepth) throws ResourceInvalidException,
int recursionDepth, boolean enableWrr) throws ResourceInvalidException,
MaxRecursionReachedException {
return buildWrrLocalityConfig(
convertToServiceConfig(wrrLocality.getEndpointPickingPolicy(), recursionDepth + 1));
convertToServiceConfig(wrrLocality.getEndpointPickingPolicy(),
recursionDepth + 1, enableWrr));
}

/**
Expand Down
Loading