Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move name resolution retry from managed channel to name resolver (take #2) #9812

Merged
merged 12 commits into from
Feb 4, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Schedules a retry operation according to a {@link BackoffPolicy}. The retry is run within a
* {@link SynchronizationContext}. At most one retry is scheduled at a time.
*/
final class BackoffPolicyRetryScheduler implements RetryScheduler {
private final ScheduledExecutorService scheduledExecutorService;
private final SynchronizationContext syncContext;
private final BackoffPolicy.Provider policyProvider;

private BackoffPolicy policy;
private ScheduledHandle scheduledHandle;

private static final Logger logger = Logger.getLogger(
BackoffPolicyRetryScheduler.class.getName());

BackoffPolicyRetryScheduler(BackoffPolicy.Provider policyProvider,
ScheduledExecutorService scheduledExecutorService,
SynchronizationContext syncContext) {
this.policyProvider = policyProvider;
this.scheduledExecutorService = scheduledExecutorService;
this.syncContext = syncContext;
}

/**
* Schedules a future retry operation. Only allows one retry to be scheduled at any given time.
*/
@Override
public void schedule(Runnable retryOperation) {
syncContext.throwIfNotInThisSynchronizationContext();

if (policy == null) {
policy = policyProvider.get();
}
// If a retry is already scheduled, take no further action.
if (scheduledHandle != null && scheduledHandle.isPending()) {
temawi marked this conversation as resolved.
Show resolved Hide resolved
return;
}
long delayNanos = policy.nextBackoffNanos();
scheduledHandle = syncContext.schedule(retryOperation, delayNanos, TimeUnit.NANOSECONDS,
scheduledExecutorService);
logger.log(Level.FINE, "Scheduling DNS resolution backoff for {0}ns", delayNanos);
}

/**
* Resets the {@link BackoffPolicyRetryScheduler} and cancels any pending retry task. The policy
* will be cleared thus also resetting any state associated with it (e.g. a backoff multiplier).
*/
@Override
public void reset() {
syncContext.throwIfNotInThisSynchronizationContext();

syncContext.execute(() -> {
if (scheduledHandle != null && scheduledHandle.isPending()) {
scheduledHandle.cancel();
}
policy = null;
});
}

}
22 changes: 14 additions & 8 deletions core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,25 @@ public final class DnsNameResolverProvider extends NameResolverProvider {
private static final String SCHEME = "dns";

@Override
public DnsNameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
if (SCHEME.equals(targetUri.getScheme())) {
String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
Preconditions.checkArgument(targetPath.startsWith("/"),
"the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
String name = targetPath.substring(1);
return new DnsNameResolver(
targetUri.getAuthority(),
name,
args,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
Stopwatch.createUnstarted(),
InternalServiceProviders.isAndroid(getClass().getClassLoader()));
return new RetryingNameResolver(
new DnsNameResolver(
targetUri.getAuthority(),
name,
args,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
Stopwatch.createUnstarted(),
InternalServiceProviders.isAndroid(getClass().getClassLoader())),
new BackoffPolicyRetryScheduler(
new ExponentialBackoffPolicy.Provider(),
args.getScheduledExecutorService(),
args.getSynchronizationContext()),
args.getSynchronizationContext());
} else {
return null;
}
Expand Down
103 changes: 34 additions & 69 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
import io.grpc.internal.RetriableStream.Throttle;
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down Expand Up @@ -367,7 +368,6 @@ private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
checkState(lbHelper != null, "lbHelper is null");
}
if (nameResolver != null) {
cancelNameResolverBackoff();
nameResolver.shutdown();
nameResolverStarted = false;
if (channelIsActive) {
Expand Down Expand Up @@ -450,42 +450,10 @@ private void rescheduleIdleTimer() {
idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
}

// Run from syncContext
@VisibleForTesting
class DelayedNameResolverRefresh implements Runnable {
@Override
public void run() {
scheduledNameResolverRefresh = null;
refreshNameResolution();
}
}

// Must be used from syncContext
@Nullable private ScheduledHandle scheduledNameResolverRefresh;
// The policy to control backoff between name resolution attempts. Non-null when an attempt is
// scheduled. Must be used from syncContext
@Nullable private BackoffPolicy nameResolverBackoffPolicy;

// Must be run from syncContext
private void cancelNameResolverBackoff() {
syncContext.throwIfNotInThisSynchronizationContext();
if (scheduledNameResolverRefresh != null) {
scheduledNameResolverRefresh.cancel();
scheduledNameResolverRefresh = null;
nameResolverBackoffPolicy = null;
}
}

/**
* Force name resolution refresh to happen immediately and reset refresh back-off. Must be run
* Force name resolution refresh to happen immediately. Must be run
* from syncContext.
*/
private void refreshAndResetNameResolution() {
syncContext.throwIfNotInThisSynchronizationContext();
cancelNameResolverBackoff();
refreshNameResolution();
}

private void refreshNameResolution() {
syncContext.throwIfNotInThisSynchronizationContext();
if (nameResolverStarted) {
Expand Down Expand Up @@ -782,7 +750,24 @@ static NameResolver getNameResolver(
if (overrideAuthority == null) {
return resolver;
}
return new ForwardingNameResolver(resolver) {

// If the nameResolver is not already a RetryingNameResolver, then wrap it with it.
// This helps guarantee that name resolution retry remains supported even as it has been
// removed from ManagedChannelImpl.
// TODO: After a transition period, all NameResolver implementations that need retry should use
// RetryingNameResolver directly and this step can be removed.
NameResolver usedNameResolver;
if (resolver instanceof RetryingNameResolver) {
usedNameResolver = resolver;
} else {
usedNameResolver = new RetryingNameResolver(resolver,
new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
nameResolverArgs.getScheduledExecutorService(),
nameResolverArgs.getSynchronizationContext()),
nameResolverArgs.getSynchronizationContext());
}

return new ForwardingNameResolver(usedNameResolver) {
@Override
public String getServiceAuthority() {
return overrideAuthority;
Expand Down Expand Up @@ -1290,7 +1275,7 @@ private void maybeTerminateChannel() {
// Must be called from syncContext
private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
refreshAndResetNameResolution();
refreshNameResolution();
}
}

Expand Down Expand Up @@ -1337,9 +1322,8 @@ public void run() {
if (shutdown.get()) {
return;
}
if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
checkState(nameResolverStarted, "name resolver must be started");
refreshAndResetNameResolution();
if (nameResolverStarted) {
refreshNameResolution();
}
for (InternalSubchannel subchannel : subchannels) {
subchannel.resetConnectBackoff();
Expand Down Expand Up @@ -1495,7 +1479,7 @@ public void refreshNameResolution() {
final class LoadBalancerRefreshNameResolution implements Runnable {
@Override
public void run() {
refreshAndResetNameResolution();
ManagedChannelImpl.this.refreshNameResolution();
}
}

Expand Down Expand Up @@ -1726,7 +1710,7 @@ public ChannelCredentials withoutBearerTokens() {
}
}

private final class NameResolverListener extends NameResolver.Listener2 {
final class NameResolverListener extends NameResolver.Listener2 {
final LbHelperImpl helper;
final NameResolver resolver;

Expand Down Expand Up @@ -1758,8 +1742,9 @@ public void run() {
lastResolutionState = ResolutionState.SUCCESS;
}

nameResolverBackoffPolicy = null;
ConfigOrError configOrError = resolutionResult.getServiceConfig();
ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
.get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
InternalConfigSelector resolvedConfigSelector =
resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
ManagedChannelServiceConfig validServiceConfig =
Expand Down Expand Up @@ -1816,6 +1801,9 @@ public void run() {
// we later check for these error codes when investigating pick results in
// GrpcUtil.getTransportFromPickResult().
onError(configOrError.getError());
if (resolutionResultListener != null) {
resolutionResultListener.resolutionAttempted(false);
}
return;
} else {
effectiveServiceConfig = lastServiceConfig;
Expand Down Expand Up @@ -1859,15 +1847,15 @@ public void run() {
}
Attributes attributes = attrBuilder.build();

boolean addressesAccepted = helper.lb.tryAcceptResolvedAddresses(
boolean lastAddressesAccepted = helper.lb.tryAcceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
.build());

if (!addressesAccepted) {
scheduleExponentialBackOffInSyncContext();
// If a listener is provided, let it know if the addresses were accepted.
if (resolutionResultListener != null) {
resolutionResultListener.resolutionAttempted(lastAddressesAccepted);
}
}
}
Expand Down Expand Up @@ -1903,29 +1891,6 @@ private void handleErrorInSyncContext(Status error) {
}

helper.lb.handleNameResolutionError(error);

scheduleExponentialBackOffInSyncContext();
}

private void scheduleExponentialBackOffInSyncContext() {
if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
// The name resolver may invoke onError multiple times, but we only want to
// schedule one backoff attempt
// TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
// want to reset the backoff interval upon repeated onError() calls
return;
}
if (nameResolverBackoffPolicy == null) {
nameResolverBackoffPolicy = backoffPolicyProvider.get();
}
long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
channelLogger.log(
ChannelLogLevel.DEBUG,
"Scheduling DNS resolution backoff for {0} ns", delayNanos);
scheduledNameResolverRefresh =
syncContext.schedule(
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
transportFactory .getScheduledExecutorService());
}
}

Expand Down
36 changes: 36 additions & 0 deletions core/src/main/java/io/grpc/internal/RetryScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

/**
* This interface is used to schedule future retry attempts for a failed operation. The retry delay
* and the number of attempt is defined by implementing classes. Implementations should assure
* that only one future retry operation is ever scheduled at a time.
*/
public interface RetryScheduler {

/**
* A request to schedule a future retry (or retries) for a failed operation. Noop if an operation
* has already been scheduled.
*/
void schedule(Runnable retryOperation);

/**
* Resets the scheduler, effectively cancelling any future retry operation.
*/
void reset();
}
Loading