Skip to content

Commit

Permalink
[3.0-Triple] Add health service cancel handler (#9004)
Browse files Browse the repository at this point in the history
* add health service cancel handler

* change log

* change indent style
  • Loading branch information
earthchen authored Oct 12, 2021
1 parent 1dc1822 commit 23e4456
Showing 1 changed file with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.dubbo.rpc.protocol.tri.service;

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;

import grpc.health.v1.Health;
Expand All @@ -28,14 +31,12 @@
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

import static org.apache.dubbo.rpc.RpcException.METHOD_NOT_FOUND;

public class TriHealthImpl implements Health {

private static final Logger logger = Logger.getLogger(TriHealthImpl.class.getName());
private static final Logger logger = LoggerFactory.getLogger(TriHealthImpl.class);

// Due to the latency of rpc calls, synchronization of the map does not help with consistency.
// However, need use ConcurrentHashMap to allow concurrent reading by check().
Expand Down Expand Up @@ -82,13 +83,25 @@ public void watch(HealthCheckRequest request, StreamObserver<HealthCheckResponse
}
serviceWatchers.put(responseObserver, Boolean.TRUE);
}
// todo add client cancel listener
RpcContext.getCancellationContext()
.addListener(context -> {
synchronized (watchLock) {
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
watchers.get(service);
if (serviceWatchers != null) {
serviceWatchers.remove(responseObserver);
if (serviceWatchers.isEmpty()) {
watchers.remove(service);
}
}
}
});
}

void setStatus(String service, HealthCheckResponse.ServingStatus status) {
synchronized (watchLock) {
if (terminal) {
logger.log(Level.FINE, "Ignoring status {} for {}", new Object[]{status, service});
logger.info("Ignoring status " + status + " for " + service);
return;
}
setStatusInternal(service, status);
Expand All @@ -105,7 +118,7 @@ private void setStatusInternal(String service, HealthCheckResponse.ServingStatus
void clearStatus(String service) {
synchronized (watchLock) {
if (terminal) {
logger.log(Level.FINE, "Ignoring status clearing for {}", new Object[]{service});
logger.info("Ignoring status clearing for " + service);
return;
}
HealthCheckResponse.ServingStatus prevStatus = statusMap.remove(service);
Expand All @@ -118,7 +131,7 @@ void clearStatus(String service) {
void enterTerminalState() {
synchronized (watchLock) {
if (terminal) {
logger.log(Level.WARNING, "Already terminating", new RuntimeException());
logger.warn("Already terminating", new RuntimeException());
return;
}
terminal = true;
Expand Down

0 comments on commit 23e4456

Please sign in to comment.