Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "core: add subchannel list LB commons utility function, refact… #9892

Merged
merged 1 commit into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 263 additions & 17 deletions core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,267 @@

package io.grpc.util;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.util.SubchannelListLoadBalancerCommons.RoundRobinPicker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nonnull;

/**
* A {@link SubchannelListLoadBalancerCommons} that provides round-robin load-balancing over the
* {@link EquivalentAddressGroup}s from the {@link NameResolver}.
* A {@link LoadBalancer} that provides round-robin load-balancing over the {@link
* EquivalentAddressGroup}s from the {@link NameResolver}.
*/
final class RoundRobinLoadBalancer extends LoadBalancer {
@VisibleForTesting
static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.create("state-info");

private final SubchannelListLoadBalancerCommons roundRobinCommons;
private final Helper helper;
private final Map<EquivalentAddressGroup, Subchannel> subchannels =
new HashMap<>();
private final Random random;

private ConnectivityState currentState;
private RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK);

RoundRobinLoadBalancer(Helper helper) {
this.roundRobinCommons = new SubchannelListLoadBalancerCommons(helper, () -> { },
this::createReadyPicker);
this.helper = checkNotNull(helper, "helper");
this.random = new Random();
}

private RoundRobinPicker createReadyPicker(List<Subchannel> activeSubchannelList) {
// initialize the Picker to a random start index to ensure that a high frequency of Picker
// churn does not skew subchannel selection.
return new ReadyPicker(activeSubchannelList, random.nextInt(activeSubchannelList.size()));
}

@Override
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
return roundRobinCommons.acceptResolvedAddresses(resolvedAddresses);
if (resolvedAddresses.getAddresses().isEmpty()) {
handleNameResolutionError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
+ ", attrs=" + resolvedAddresses.getAttributes()));
return false;
}

List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet();
Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(servers);
Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs.keySet());

for (Map.Entry<EquivalentAddressGroup, EquivalentAddressGroup> latestEntry :
latestAddrs.entrySet()) {
EquivalentAddressGroup strippedAddressGroup = latestEntry.getKey();
EquivalentAddressGroup originalAddressGroup = latestEntry.getValue();
Subchannel existingSubchannel = subchannels.get(strippedAddressGroup);
if (existingSubchannel != null) {
// EAG's Attributes may have changed.
existingSubchannel.updateAddresses(Collections.singletonList(originalAddressGroup));
continue;
}
// Create new subchannels for new addresses.

// NB(lukaszx0): we don't merge `attributes` with `subchannelAttr` because subchannel
// doesn't need them. They're describing the resolved server list but we're not taking
// any action based on this information.
Attributes.Builder subchannelAttrs = Attributes.newBuilder()
// NB(lukaszx0): because attributes are immutable we can't set new value for the key
// after creation but since we can mutate the values we leverage that and set
// AtomicReference which will allow mutating state info for given channel.
.set(STATE_INFO,
new Ref<>(ConnectivityStateInfo.forNonError(IDLE)));

final Subchannel subchannel = checkNotNull(
helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(originalAddressGroup)
.setAttributes(subchannelAttrs.build())
.build()),
"subchannel");
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo state) {
processSubchannelState(subchannel, state);
}
});
subchannels.put(strippedAddressGroup, subchannel);
subchannel.requestConnection();
}

ArrayList<Subchannel> removedSubchannels = new ArrayList<>();
for (EquivalentAddressGroup addressGroup : removedAddrs) {
removedSubchannels.add(subchannels.remove(addressGroup));
}

// Update the picker before shutting down the subchannels, to reduce the chance of the race
// between picking a subchannel and shutting it down.
updateBalancingState();

// Shutdown removed subchannels
for (Subchannel removedSubchannel : removedSubchannels) {
shutdownSubchannel(removedSubchannel);
}

return true;
}

@Override
public void handleNameResolutionError(Status error) {
roundRobinCommons.handleNameResolutionError(error);
if (currentState != READY) {
updateBalancingState(TRANSIENT_FAILURE, new EmptyPicker(error));
}
}

private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
return;
}
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}
if (stateInfo.getState() == IDLE) {
subchannel.requestConnection();
}
Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel);
if (subchannelStateRef.value.getState().equals(TRANSIENT_FAILURE)) {
if (stateInfo.getState().equals(CONNECTING) || stateInfo.getState().equals(IDLE)) {
return;
}
}
subchannelStateRef.value = stateInfo;
updateBalancingState();
}

private void shutdownSubchannel(Subchannel subchannel) {
subchannel.shutdown();
getSubchannelStateInfoRef(subchannel).value =
ConnectivityStateInfo.forNonError(SHUTDOWN);
}

@Override
public void shutdown() {
roundRobinCommons.shutdown();
for (Subchannel subchannel : getSubchannels()) {
shutdownSubchannel(subchannel);
}
subchannels.clear();
}

private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready");

/**
* Updates picker with the list of active subchannels (state == READY).
*/
@SuppressWarnings("ReferenceEquality")
private void updateBalancingState() {
List<Subchannel> activeList = filterNonFailingSubchannels(getSubchannels());
if (activeList.isEmpty()) {
// No READY subchannels, determine aggregate state and error status
boolean isConnecting = false;
Status aggStatus = EMPTY_OK;
for (Subchannel subchannel : getSubchannels()) {
ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value;
// This subchannel IDLE is not because of channel IDLE_TIMEOUT,
// in which case LB is already shutdown.
// RRLB will request connection immediately on subchannel IDLE.
if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
isConnecting = true;
}
if (aggStatus == EMPTY_OK || !aggStatus.isOk()) {
aggStatus = stateInfo.getStatus();
}
}
updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE,
// If all subchannels are TRANSIENT_FAILURE, return the Status associated with
// an arbitrary subchannel, otherwise return OK.
new EmptyPicker(aggStatus));
} else {
// initialize the Picker to a random start index to ensure that a high frequency of Picker
// churn does not skew subchannel selection.
int startIndex = random.nextInt(activeList.size());
updateBalancingState(READY, new ReadyPicker(activeList, startIndex));
}
}

private void updateBalancingState(ConnectivityState state, RoundRobinPicker picker) {
if (state != currentState || !picker.isEquivalentTo(currentPicker)) {
helper.updateBalancingState(state, picker);
currentState = state;
currentPicker = picker;
}
}

/**
* Filters out non-ready subchannels.
*/
private static List<Subchannel> filterNonFailingSubchannels(
Collection<Subchannel> subchannels) {
List<Subchannel> readySubchannels = new ArrayList<>(subchannels.size());
for (Subchannel subchannel : subchannels) {
if (isReady(subchannel)) {
readySubchannels.add(subchannel);
}
}
return readySubchannels;
}

/**
* Converts list of {@link EquivalentAddressGroup} to {@link EquivalentAddressGroup} set and
* remove all attributes. The values are the original EAGs.
*/
private static Map<EquivalentAddressGroup, EquivalentAddressGroup> stripAttrs(
List<EquivalentAddressGroup> groupList) {
Map<EquivalentAddressGroup, EquivalentAddressGroup> addrs = new HashMap<>(groupList.size() * 2);
for (EquivalentAddressGroup group : groupList) {
addrs.put(stripAttrs(group), group);
}
return addrs;
}

private static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) {
return new EquivalentAddressGroup(eag.getAddresses());
}

@VisibleForTesting
Collection<Subchannel> getSubchannels() {
return roundRobinCommons.getSubchannels();
return subchannels.values();
}

private static Ref<ConnectivityStateInfo> getSubchannelStateInfoRef(
Subchannel subchannel) {
return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO");
}

// package-private to avoid synthetic access
static boolean isReady(Subchannel subchannel) {
return getSubchannelStateInfoRef(subchannel).value.getState() == READY;
}

private static <T> Set<T> setsDifference(Set<T> a, Set<T> b) {
Set<T> aCopy = new HashSet<>(a);
aCopy.removeAll(b);
return aCopy;
}

// Only subclasses are ReadyPicker or EmptyPicker
private abstract static class RoundRobinPicker extends SubchannelPicker {
abstract boolean isEquivalentTo(RoundRobinPicker picker);
}

@VisibleForTesting
Expand Down Expand Up @@ -113,7 +321,7 @@ List<Subchannel> getList() {
}

@Override
public boolean isEquivalentTo(RoundRobinPicker picker) {
boolean isEquivalentTo(RoundRobinPicker picker) {
if (!(picker instanceof ReadyPicker)) {
return false;
}
Expand All @@ -123,4 +331,42 @@ public boolean isEquivalentTo(RoundRobinPicker picker) {
|| (list.size() == other.list.size() && new HashSet<>(list).containsAll(other.list));
}
}

@VisibleForTesting
static final class EmptyPicker extends RoundRobinPicker {

private final Status status;

EmptyPicker(@Nonnull Status status) {
this.status = Preconditions.checkNotNull(status, "status");
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return status.isOk() ? PickResult.withNoResult() : PickResult.withError(status);
}

@Override
boolean isEquivalentTo(RoundRobinPicker picker) {
return picker instanceof EmptyPicker && (Objects.equal(status, ((EmptyPicker) picker).status)
|| (status.isOk() && ((EmptyPicker) picker).status.isOk()));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(EmptyPicker.class).add("status", status).toString();
}
}

/**
* A lighter weight Reference than AtomicReference.
*/
@VisibleForTesting
static final class Ref<T> {
T value;

Ref(T value) {
this.value = value;
}
}
}
Loading