Skip to content

Commit

Permalink
add javadoc for registry and some code (apache#3140)
Browse files Browse the repository at this point in the history
add javadoc for registry and optimize code
  • Loading branch information
Leishunyu authored and khanimteyaz committed Jan 18, 2019
1 parent 04abe1d commit ab145f3
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public Registry getRegistry(URL url) {
if (registry != null) {
return registry;
}
//create registry by spi/ioc
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,12 @@ public DubboRegistry(Invoker<RegistryService> registryInvoker, RegistryService r
this.registryService = registryService;
// Start reconnection timer
this.reconnectPeriod = registryInvoker.getUrl().getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, RECONNECT_PERIOD_DEFAULT);
reconnectFuture = reconnectTimer.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Check and connect to the registry
try {
connect();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
}
reconnectFuture = reconnectTimer.scheduleWithFixedDelay(() -> {
// Check and connect to the registry
try {
connect();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
}
}, reconnectPeriod, reconnectPeriod, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private static URL getRegistryURL(URL url) {
.addParameterIfAbsent(Constants.TIMEOUT_KEY, "10000")
.addParameterIfAbsent(Constants.CALLBACK_INSTANCES_LIMIT_KEY, "10000")
.addParameterIfAbsent(Constants.CONNECT_TIMEOUT_KEY, "10000")
.addParameter(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(Wrapper.getWrapper(RegistryService.class).getDeclaredMethodNames())), ","))
.addParameter(Constants.METHODS_KEY, StringUtils.join(new HashSet<>(Arrays.asList(Wrapper.getWrapper(RegistryService.class).getDeclaredMethodNames())), ","))
//.addParameter(Constants.STUB_KEY, RegistryServiceStub.class.getName())
//.addParameter(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()) //for event dispatch
//.addParameter(Constants.ON_DISCONNECT_KEY, "disconnect")
Expand All @@ -79,7 +79,7 @@ public void setCluster(Cluster cluster) {
@Override
public Registry createRegistry(URL url) {
url = getRegistryURL(url);
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();
urls.add(url.removeParameter(Constants.BACKUP_KEY));
String backup = url.getParameter(Constants.BACKUP_KEY);
if (backup != null && backup.length() > 0) {
Expand All @@ -88,7 +88,7 @@ public Registry createRegistry(URL url) {
urls.add(url.setAddress(address));
}
}
RegistryDirectory<RegistryService> directory = new RegistryDirectory<RegistryService>(RegistryService.class, url.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(Constants.REFER_KEY, url.toParameterString()));
RegistryDirectory<RegistryService> directory = new RegistryDirectory<>(RegistryService.class, url.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(Constants.REFER_KEY, url.toParameterString()));
Invoker<RegistryService> registryInvoker = cluster.join(directory);
RegistryService registryService = proxyFactory.getProxy(registryInvoker);
DubboRegistry registry = new DubboRegistry(registryInvoker, registryService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class RedisRegistry extends FailbackRegistry {

private final String root;

private final Map<String, JedisPool> jedisPools = new ConcurrentHashMap<String, JedisPool>();
private final Map<String, JedisPool> jedisPools = new ConcurrentHashMap<>();

private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<String, Notifier>();

Expand Down Expand Up @@ -360,12 +360,12 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
admin = true;
Set<String> keys = jedis.keys(service);
if (keys != null && !keys.isEmpty()) {
Map<String, Set<String>> serviceKeys = new HashMap<String, Set<String>>();
Map<String, Set<String>> serviceKeys = new HashMap<>();
for (String key : keys) {
String serviceKey = toServicePath(key);
Set<String> sk = serviceKeys.get(serviceKey);
if (sk == null) {
sk = new HashSet<String>();
sk = new HashSet<>();
serviceKeys.put(serviceKey, sk);
}
sk.add(key);
Expand Down Expand Up @@ -400,8 +400,8 @@ public void doUnsubscribe(URL url, NotifyListener listener) {
}

private void doNotify(Jedis jedis, String key) {
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(getSubscribed()).entrySet()) {
doNotify(jedis, Arrays.asList(key), entry.getKey(), new HashSet<NotifyListener>(entry.getValue()));
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
doNotify(jedis, Arrays.asList(key), entry.getKey(), new HashSet<>(entry.getValue()));
}
}

Expand All @@ -411,7 +411,7 @@ private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<
return;
}
long now = System.currentTimeMillis();
List<URL> result = new ArrayList<URL>();
List<URL> result = new ArrayList<>();
List<String> categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0]));
String consumerService = url.getServiceInterface();
for (String key : keys) {
Expand All @@ -425,7 +425,7 @@ private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<
if (!categories.contains(Constants.ANY_VALUE) && !categories.contains(category)) {
continue;
}
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();
Map<String, String> values = jedis.hgetAll(key);
if (values != null && values.size() > 0) {
for (Map.Entry<String, String> entry : values.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ZookeeperRegistry extends FailbackRegistry {

private final String root;

private final Set<String> anyServices = new ConcurrentHashSet<String>();
private final Set<String> anyServices = new ConcurrentHashSet<>();

private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();

Expand All @@ -67,15 +67,12 @@ public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
Expand Down Expand Up @@ -133,21 +130,18 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
});
Expand All @@ -164,21 +158,16 @@ public void childChanged(String parentPath, List<String> currentChilds) {
}
}
} else {
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
Expand Down Expand Up @@ -218,7 +207,7 @@ public List<URL> lookup(URL url) {
throw new IllegalArgumentException("lookup url == null");
}
try {
List<String> providers = new ArrayList<String>();
List<String> providers = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
List<String> children = zkClient.getChildren(path);
if (children != null) {
Expand Down Expand Up @@ -274,7 +263,7 @@ private String toUrlPath(URL url) {
}

private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();
if (providers != null && !providers.isEmpty()) {
for (String provider : providers) {
provider = URL.decode(provider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

private ZookeeperTransporter zookeeperTransporter;

/**
* Invisible injection of zookeeper client via IOC/SPI
* @param zookeeperTransporter
*/
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
Expand Down

0 comments on commit ab145f3

Please sign in to comment.