Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move name resolution retry from managed channel to name resolver (take #2) #9812

Merged
merged 12 commits into from
Feb 4, 2023
1 change: 1 addition & 0 deletions api/src/main/java/io/grpc/NameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
public abstract class NameResolver {

temawi marked this conversation as resolved.
Show resolved Hide resolved
/**
* Returns the authority used to authenticate connections to servers. It <strong>must</strong> be
* from a trusted source, because if the authority is tampered with, RPCs may be sent to the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
* Schedules a retry operation according to a {@link BackoffPolicy}. The retry is run within a
* {@link SynchronizationContext}. At most one retry is scheduled at a time.
*/
final class BackoffPolicyRetryScheduler implements RetryScheduler {
private final ScheduledExecutorService scheduledExecutorService;
private final SynchronizationContext syncContext;
private final BackoffPolicy.Provider policyProvider;

private BackoffPolicy policy;
private ScheduledHandle scheduledHandle;

private static final Logger logger = Logger.getLogger(
BackoffPolicyRetryScheduler.class.getName());

BackoffPolicyRetryScheduler(BackoffPolicy.Provider policyProvider,
ScheduledExecutorService scheduledExecutorService,
SynchronizationContext syncContext) {
this.policyProvider = policyProvider;
this.scheduledExecutorService = scheduledExecutorService;
this.syncContext = syncContext;
}

/**
* Schedules a future retry operation. Only allows one retry to be scheduled at any given time.
*
* @return The delay in nanos before the operation fires or -1 if it was not scheduled.
*/
@Override
public long schedule(Runnable retryOperation) {
if (policy == null) {
policy = policyProvider.get();
}
// If a retry is already scheduled, take no further action.
if (scheduledHandle != null && scheduledHandle.isPending()) {
temawi marked this conversation as resolved.
Show resolved Hide resolved
return -1;
}
long delayNanos = policy.nextBackoffNanos();
scheduledHandle = syncContext.schedule(retryOperation, delayNanos, TimeUnit.NANOSECONDS,
scheduledExecutorService);
logger.fine("Scheduling DNS resolution backoff for " + delayNanos + "ns");
temawi marked this conversation as resolved.
Show resolved Hide resolved

return delayNanos;
}

/**
* Resets the {@link BackoffPolicyRetryScheduler} and cancels any pending retry task. The policy
* will be cleared thus also resetting any state associated with it (e.g. a backoff multiplier).
*/
@Override
public void reset() {
if (scheduledHandle != null && scheduledHandle.isPending()) {
temawi marked this conversation as resolved.
Show resolved Hide resolved
scheduledHandle.cancel();
}
policy = null;
}

}
21 changes: 13 additions & 8 deletions core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,24 @@ public final class DnsNameResolverProvider extends NameResolverProvider {
private static final String SCHEME = "dns";

@Override
public DnsNameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
if (SCHEME.equals(targetUri.getScheme())) {
String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
Preconditions.checkArgument(targetPath.startsWith("/"),
"the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
String name = targetPath.substring(1);
return new DnsNameResolver(
targetUri.getAuthority(),
name,
args,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
Stopwatch.createUnstarted(),
InternalServiceProviders.isAndroid(getClass().getClassLoader()));
return new RetryingNameResolver(
new DnsNameResolver(
targetUri.getAuthority(),
name,
args,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
Stopwatch.createUnstarted(),
InternalServiceProviders.isAndroid(getClass().getClassLoader())),
new BackoffPolicyRetryScheduler(
new ExponentialBackoffPolicy.Provider(),
args.getScheduledExecutorService(),
args.getSynchronizationContext()));
} else {
return null;
}
Expand Down
97 changes: 29 additions & 68 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
import io.grpc.internal.RetriableStream.Throttle;
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down Expand Up @@ -280,6 +281,10 @@ public void uncaughtException(Thread t, Throwable e) {
// Must be mutated and read from constructor or syncContext
// used for channel tracing when value changed
private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
// Must be mutated and read from constructor or syncContext
// Denotes if the last resolved addresses were accepted by the load balancer. A {@code null}
// value indicates no attempt has been made yet.
private Boolean lastAddressesAccepted;

@Nullable
private final ManagedChannelServiceConfig defaultServiceConfig;
Expand Down Expand Up @@ -367,7 +372,6 @@ private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
checkState(lbHelper != null, "lbHelper is null");
}
if (nameResolver != null) {
cancelNameResolverBackoff();
nameResolver.shutdown();
nameResolverStarted = false;
if (channelIsActive) {
Expand Down Expand Up @@ -450,42 +454,10 @@ private void rescheduleIdleTimer() {
idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
}

// Run from syncContext
@VisibleForTesting
class DelayedNameResolverRefresh implements Runnable {
@Override
public void run() {
scheduledNameResolverRefresh = null;
refreshNameResolution();
}
}

// Must be used from syncContext
@Nullable private ScheduledHandle scheduledNameResolverRefresh;
// The policy to control backoff between name resolution attempts. Non-null when an attempt is
// scheduled. Must be used from syncContext
@Nullable private BackoffPolicy nameResolverBackoffPolicy;

// Must be run from syncContext
private void cancelNameResolverBackoff() {
syncContext.throwIfNotInThisSynchronizationContext();
if (scheduledNameResolverRefresh != null) {
scheduledNameResolverRefresh.cancel();
scheduledNameResolverRefresh = null;
nameResolverBackoffPolicy = null;
}
}

/**
* Force name resolution refresh to happen immediately and reset refresh back-off. Must be run
* Force name resolution refresh to happen immediately. Must be run
* from syncContext.
*/
private void refreshAndResetNameResolution() {
syncContext.throwIfNotInThisSynchronizationContext();
cancelNameResolverBackoff();
refreshNameResolution();
}

private void refreshNameResolution() {
syncContext.throwIfNotInThisSynchronizationContext();
if (nameResolverStarted) {
Expand Down Expand Up @@ -1290,7 +1262,7 @@ private void maybeTerminateChannel() {
// Must be called from syncContext
private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
refreshAndResetNameResolution();
refreshNameResolution();
}
}

Expand Down Expand Up @@ -1337,9 +1309,9 @@ public void run() {
if (shutdown.get()) {
return;
}
if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
if (lastAddressesAccepted != null && !lastAddressesAccepted) {
checkState(nameResolverStarted, "name resolver must be started");
refreshAndResetNameResolution();
refreshNameResolution();
}
for (InternalSubchannel subchannel : subchannels) {
subchannel.resetConnectBackoff();
Expand Down Expand Up @@ -1495,7 +1467,7 @@ public void refreshNameResolution() {
final class LoadBalancerRefreshNameResolution implements Runnable {
@Override
public void run() {
refreshAndResetNameResolution();
ManagedChannelImpl.this.refreshNameResolution();
}
}

Expand Down Expand Up @@ -1726,7 +1698,7 @@ public ChannelCredentials withoutBearerTokens() {
}
}

private final class NameResolverListener extends NameResolver.Listener2 {
final class NameResolverListener extends NameResolver.Listener2 {
final LbHelperImpl helper;
final NameResolver resolver;

Expand All @@ -1745,6 +1717,7 @@ public void run() {
if (ManagedChannelImpl.this.nameResolver != resolver) {
return;
}
lastAddressesAccepted = false;

List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
channelLogger.log(
Expand All @@ -1758,7 +1731,6 @@ public void run() {
lastResolutionState = ResolutionState.SUCCESS;
}

nameResolverBackoffPolicy = null;
ConfigOrError configOrError = resolutionResult.getServiceConfig();
InternalConfigSelector resolvedConfigSelector =
resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
Expand Down Expand Up @@ -1816,6 +1788,7 @@ public void run() {
// we later check for these error codes when investigating pick results in
// GrpcUtil.getTransportFromPickResult().
onError(configOrError.getError());
lastAddressesAccepted = false;
temawi marked this conversation as resolved.
Show resolved Hide resolved
return;
} else {
effectiveServiceConfig = lastServiceConfig;
Expand Down Expand Up @@ -1859,21 +1832,32 @@ public void run() {
}
Attributes attributes = attrBuilder.build();

boolean addressesAccepted = helper.lb.tryAcceptResolvedAddresses(
lastAddressesAccepted = helper.lb.tryAcceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
.build());

if (!addressesAccepted) {
scheduleExponentialBackOffInSyncContext();
}
}
}
}

syncContext.execute(new NamesResolved());

// If NameResolved did not assign a value to lastAddressesAccepted, we assume there was an
temawi marked this conversation as resolved.
Show resolved Hide resolved
// exception and set it to false.
if (lastAddressesAccepted == null) {
lastAddressesAccepted = false;
}

// If a listener is provided, let it know if the addresses were accepted.
// TODO(tmwilson): Once we are ready to change the onResult() API and return a boolean
// this hacky callback in an attribute approach can be removed.
ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
.get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
if (resolutionResultListener != null) {
resolutionResultListener.resolutionAttempted(lastAddressesAccepted);
}
}

@Override
Expand Down Expand Up @@ -1903,29 +1887,6 @@ private void handleErrorInSyncContext(Status error) {
}

helper.lb.handleNameResolutionError(error);

scheduleExponentialBackOffInSyncContext();
}

private void scheduleExponentialBackOffInSyncContext() {
if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
// The name resolver may invoke onError multiple times, but we only want to
// schedule one backoff attempt
// TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
// want to reset the backoff interval upon repeated onError() calls
return;
}
if (nameResolverBackoffPolicy == null) {
nameResolverBackoffPolicy = backoffPolicyProvider.get();
}
long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
channelLogger.log(
ChannelLogLevel.DEBUG,
"Scheduling DNS resolution backoff for {0} ns", delayNanos);
scheduledNameResolverRefresh =
syncContext.schedule(
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
transportFactory .getScheduledExecutorService());
}
}

Expand Down
37 changes: 37 additions & 0 deletions core/src/main/java/io/grpc/internal/RetryScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

/**
* This interface is used to schedule future retry attempts for a failed operation. The retry delay
* and the number of attempt is defined by implementing classes. Implementations should assure
* that only one future retry operation is ever scheduled at a time.
*/
public interface RetryScheduler {

/**
* A request to schedule a future retry (or retries) for a failed operation.
*
* @return The delay in nanos before the operation fires or -1 if it was not scheduled.
*/
long schedule(Runnable retryOperation);

/**
* Resets the scheduler, effectively cancelling any future retry operation.
*/
void reset();
}
Loading