Skip to content

Commit

Permalink
rls: Reduce RLS channel logging
Browse files Browse the repository at this point in the history
The channel log is shared by many components and is poorly suited to
the noise of per-RPC events. This commit restricts RLS usage of the
logger to no more frequent than cache entry events. This may still be
too frequent, but should substantially improve the signal-to-noise and
we can do further rework as needed.

Many of the log entries were poor because they lacked enough context.
They weren't even clear they were from RLS. The cache entry events now
regularly include the request key in the logs, allowing you to follow
events for specific keys. I would have preferred using the hash code,
but NumberFormat is annoying and toString() may be acceptable given its
convenience.

This commit reverts much of eba699a. Those logs have not proven to be
helpful as they produce more output than can be reasonably stored.
  • Loading branch information
ejona86 authored Nov 27, 2024
1 parent ebb43a6 commit 7f9c1f3
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 50 deletions.
59 changes: 18 additions & 41 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,37 +278,38 @@ private void periodicClean() {
@GuardedBy("lock")
private CachedRouteLookupResponse asyncRlsCall(
RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) {
logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS");
if (throttler.shouldThrottle()) {
logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
logger.log(ChannelLogLevel.DEBUG, "[RLS Entry {0}] Throttled RouteLookup", request);
// Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
// on this result
return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy));
}
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest);
logger.log(ChannelLogLevel.DEBUG,
"[RLS Entry {0}] Starting RouteLookup: {1}", request, routeLookupRequest);
rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
.routeLookup(
routeLookupRequest,
new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
@Override
public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
logger.log(ChannelLogLevel.DEBUG, "Received RouteLookupResponse: {0}", value);
logger.log(ChannelLogLevel.DEBUG,
"[RLS Entry {0}] RouteLookup succeeded: {1}", request, value);
response.set(RESPONSE_CONVERTER.reverse().convert(value));
}

@Override
public void onError(Throwable t) {
logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
logger.log(ChannelLogLevel.DEBUG,
"[RLS Entry {0}] RouteLookup failed: {1}", request, t);
response.setException(t);
throttler.registerBackendResponse(true);
}

@Override
public void onCompleted() {
logger.log(ChannelLogLevel.DEBUG, "routeLookup call completed");
throttler.registerBackendResponse(false);
}
});
Expand All @@ -323,13 +324,10 @@ public void onCompleted() {
*/
@CheckReturnValue
final CachedRouteLookupResponse get(final RouteLookupRequest request) {
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to get cached entry");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Acquired lock to get cached entry");
final CacheEntry cacheEntry;
cacheEntry = linkedHashLruCache.read(request);
if (cacheEntry == null) {
logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new RLS request");
PendingCacheEntry pendingEntry = pendingCallCache.get(request);
if (pendingEntry != null) {
return CachedRouteLookupResponse.pendingResponse(pendingEntry);
Expand All @@ -339,15 +337,12 @@ final CachedRouteLookupResponse get(final RouteLookupRequest request) {

if (cacheEntry instanceof DataCacheEntry) {
// cache hit, initiate async-refresh if entry is staled
logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
if (dataEntry.isStaled(ticker.read())) {
logger.log(ChannelLogLevel.DEBUG, "Cache entry is stale");
dataEntry.maybeRefresh();
}
return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
}
logger.log(ChannelLogLevel.DEBUG, "Cache hit for a backup entry");
return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
}
}
Expand Down Expand Up @@ -409,8 +404,8 @@ private DataCacheEntry createDataEntry(
RouteLookupRequest request, RouteLookupResponse routeLookupResponse) {
logger.log(
ChannelLogLevel.DEBUG,
"Transition to data cache: routeLookupResponse={0}",
routeLookupResponse);
"[RLS Entry {0}] Transition to data cache: routeLookupResponse={1}",
request, routeLookupResponse);
DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse);
// Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
// this cache update because the lock is held
Expand All @@ -421,18 +416,19 @@ private DataCacheEntry createDataEntry(
@GuardedBy("lock")
private BackoffCacheEntry createBackOffEntry(
RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) {
logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
if (backoffPolicy == null) {
backoffPolicy = backoffProvider.get();
}
long delayNanos = backoffPolicy.nextBackoffNanos();
logger.log(
ChannelLogLevel.DEBUG,
"[RLS Entry {0}] Transition to back off: status={1}, delayNanos={2}",
request, status, delayNanos);
BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
// Lock is held, so the task can't execute before the assignment
entry.scheduledFuture = scheduledExecutorService.schedule(
() -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
linkedHashLruCache.cacheAndClean(request, entry);
logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos",
delayNanos);
return entry;
}

Expand All @@ -443,7 +439,8 @@ private void refreshBackoffEntry(BackoffCacheEntry entry) {
// Future was previously cancelled
return;
}
logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
logger.log(ChannelLogLevel.DEBUG,
"[RLS Entry {0}] Calling RLS for transition to pending", entry.request);
linkedHashLruCache.invalidate(entry.request);
asyncRlsCall(entry.request, entry.backoffPolicy);
}
Expand Down Expand Up @@ -659,10 +656,10 @@ void maybeRefresh() {
synchronized (lock) { // Lock is already held, but ErrorProne can't tell
if (pendingCallCache.containsKey(request)) {
// pending already requested
logger.log(ChannelLogLevel.DEBUG,
"A pending refresh request already created, no need to proceed with refresh");
return;
}
logger.log(ChannelLogLevel.DEBUG,
"[RLS Entry {0}] Cache entry is stale, refreshing", request);
asyncRlsCall(request, /* backoffPolicy= */ null);
}
}
Expand Down Expand Up @@ -943,13 +940,10 @@ private final class BackoffRefreshListener implements ChildLbStatusListener {

@Override
public void onStatusChanged(ConnectivityState newState) {
logger.log(ChannelLogLevel.DEBUG, "LB status changed to: {0}", newState);
if (prevState == ConnectivityState.TRANSIENT_FAILURE
&& newState == ConnectivityState.READY) {
logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock force refresh backoff cache entries");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries");
for (CacheEntry value : linkedHashLruCache.values()) {
if (value instanceof BackoffCacheEntry) {
refreshBackoffEntry((BackoffCacheEntry) value);
Expand Down Expand Up @@ -983,31 +977,22 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
RouteLookupRequest request =
requestFactory.create(serviceName, methodName, args.getHeaders());
final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
logger.log(ChannelLogLevel.DEBUG,
"Got route lookup cache entry for service={0}, method={1}, headers={2}:\n {3}",
new Object[]{serviceName, methodName, args.getHeaders(), response});

if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
logger.log(ChannelLogLevel.DEBUG, "Updating RLS metadata from the RLS response headers");
Metadata headers = args.getHeaders();
headers.discardAll(RLS_DATA_KEY);
headers.put(RLS_DATA_KEY, response.getHeaderData());
}
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
logger.log(ChannelLogLevel.DEBUG, "defaultTarget = {0}", defaultTarget);
boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
if (response.hasData()) {
logger.log(ChannelLogLevel.DEBUG, "RLS response has data, proceed with selecting a picker");
ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
SubchannelPicker picker =
(childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
if (picker == null) {
logger.log(ChannelLogLevel.DEBUG,
"Child policy wrapper didn't return a picker, returning PickResult with no results");
return PickResult.withNoResult();
}
// Happy path
logger.log(ChannelLogLevel.DEBUG, "Returning PickResult");
PickResult pickResult = picker.pickSubchannel(args);
if (pickResult.hasResult()) {
helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
Expand All @@ -1017,20 +1002,15 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
}
return pickResult;
} else if (response.hasError()) {
logger.log(ChannelLogLevel.DEBUG, "RLS response has errors");
if (hasFallback) {
logger.log(ChannelLogLevel.DEBUG, "Using RLS fallback");
return useFallback(args);
}
logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error");
helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList());
return PickResult.withError(
convertRlsServerStatus(response.getStatus(),
lbPolicyConfig.getRouteLookupConfig().lookupService()));
} else {
logger.log(ChannelLogLevel.DEBUG,
"RLS response had no data, return a PickResult with no data");
return PickResult.withNoResult();
}
}
Expand Down Expand Up @@ -1067,21 +1047,18 @@ private String determineMetricsPickResult(PickResult pickResult) {

private void startFallbackChildPolicy() {
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to start fallback child policy");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Acquired lock for starting fallback child policy");
if (fallbackChildPolicyWrapper != null) {
return;
}
logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
}
}

// GuardedBy CachingRlsLbClient.lock
void close() {
synchronized (lock) { // Lock is already held, but ErrorProne can't tell
logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
if (fallbackChildPolicyWrapper != null) {
refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
}
Expand Down
10 changes: 1 addition & 9 deletions rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ final class RlsLoadBalancer extends LoadBalancer {

@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(ChannelLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
LbPolicyConfiguration lbPolicyConfiguration =
(LbPolicyConfiguration) resolvedAddresses.getLoadBalancingPolicyConfig();
checkNotNull(lbPolicyConfiguration, "Missing RLS LB config");
if (!lbPolicyConfiguration.equals(this.lbPolicyConfiguration)) {
logger.log(ChannelLogLevel.DEBUG, "A new RLS LB config received");
logger.log(ChannelLogLevel.DEBUG, "A new RLS LB config received: {0}", lbPolicyConfiguration);
boolean needToConnect = this.lbPolicyConfiguration == null
|| !this.lbPolicyConfiguration.getRouteLookupConfig().lookupService().equals(
lbPolicyConfiguration.getRouteLookupConfig().lookupService());
Expand All @@ -80,22 +79,18 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
// not required.
this.lbPolicyConfiguration = lbPolicyConfiguration;
}
logger.log(ChannelLogLevel.DEBUG, "RLS LB accepted resolved addresses successfully");
return Status.OK;
}

@Override
public void requestConnection() {
logger.log(ChannelLogLevel.DEBUG, "connection requested from RLS LB");
if (routeLookupClient != null) {
logger.log(ChannelLogLevel.DEBUG, "requesting a connection from the routeLookupClient");
routeLookupClient.requestConnection();
}
}

@Override
public void handleNameResolutionError(final Status error) {
logger.log(ChannelLogLevel.DEBUG, "Received resolution error: {0}", error);
class ErrorPicker extends SubchannelPicker {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
Expand All @@ -116,14 +111,11 @@ public String toString() {
routeLookupClient = null;
lbPolicyConfiguration = null;
}
logger.log(ChannelLogLevel.DEBUG,
"Updating balancing state to TRANSIENT_FAILURE with an error picker");
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker());
}

@Override
public void shutdown() {
logger.log(ChannelLogLevel.DEBUG, "Rls lb shutdown");
if (routeLookupClient != null) {
logger.log(ChannelLogLevel.DEBUG, "closing the routeLookupClient because of RLS LB shutdown");
routeLookupClient.close();
Expand Down

0 comments on commit 7f9c1f3

Please sign in to comment.