Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #10374] Select clusters using the selector #10995

Merged
merged 3 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions api/src/main/java/com/alibaba/nacos/api/naming/NamingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.selector.NamingSelector;
import com.alibaba.nacos.api.selector.AbstractSelector;

import java.util.List;
Expand Down Expand Up @@ -490,6 +491,18 @@ Instance selectOneHealthyInstance(String serviceName, String groupName, List<Str
*/
void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException;

/**
* Subscribe service to receive events of instances alteration.
*
* @param serviceName name of service
* @param groupName group of service
* @param selector selector of instances
* @param listener event listener
* @throws NacosException nacos exception
*/
void subscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
throws NacosException;

/**
* Unsubscribe event listener of service.
Expand Down Expand Up @@ -531,6 +544,18 @@ void subscribe(String serviceName, String groupName, List<String> clusters, Even
*/
void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException;

/**
* Unsubscribe event listener of service.
*
* @param serviceName name of service
* @param groupName group of service
* @param selector selector of instances
* @param listener event listener
* @throws NacosException nacos exception
*/
void unsubscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
throws NacosException;

/**
* Get all service names from server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,22 @@
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.selector.NamingSelector;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.client.env.NacosClientProperties;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.core.Balancer;
import com.alibaba.nacos.client.naming.event.InstancesChangeEvent;
import com.alibaba.nacos.client.naming.event.InstancesChangeNotifier;
import com.alibaba.nacos.client.naming.event.InstancesDiff;
import com.alibaba.nacos.client.naming.remote.NamingClientProxy;
import com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate;
import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.InitUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.client.selector.SelectorFactory;
import com.alibaba.nacos.client.utils.ValidatorUtils;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.StringUtils;
Expand All @@ -46,6 +50,8 @@
import java.util.Properties;
import java.util.UUID;

import static com.alibaba.nacos.client.selector.SelectorFactory.getUniqueClusterString;

/**
* Nacos Naming Service.
*
Expand All @@ -54,7 +60,7 @@
@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
public class NacosNamingService implements NamingService {

private static final String DEFAULT_NAMING_LOG_FILE_PATH = "naming.log";
private static final String DEFAULT_NAMING_LOG_FILE_PATH = "naming.log";

private static final String UP = "UP";

Expand Down Expand Up @@ -93,13 +99,14 @@ private void init(Properties properties) throws NacosException {
InitUtils.initSerialization();
InitUtils.initWebRootContext(nacosClientProperties);
initLogName(nacosClientProperties);

this.notifierEventScope = UUID.randomUUID().toString();
this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties,
changeNotifier);
}

private void initLogName(NacosClientProperties properties) {
Expand Down Expand Up @@ -373,8 +380,8 @@ public Instance selectOneHealthyInstance(String serviceName, String groupName, L
}
return Balancer.RandomByWeight.selectHost(serviceInfo);
} else {
ServiceInfo serviceInfo = clientProxy
.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
ServiceInfo serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0,
false);
return Balancer.RandomByWeight.selectHost(serviceInfo);
}
}
Expand All @@ -397,12 +404,25 @@ public void subscribe(String serviceName, List<String> clusters, EventListener l
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
if (null == listener) {
NamingSelector clusterSelector = SelectorFactory.newClusterSelector(clusters);
doSubscribe(serviceName, groupName, getUniqueClusterString(clusters), clusterSelector, listener);
}

@Override
public void subscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
throws NacosException {
doSubscribe(serviceName, groupName, Constants.NULL, selector, listener);
}

private void doSubscribe(String serviceName, String groupName, String clusters, NamingSelector selector,
EventListener listener) throws NacosException {
if (selector == null || listener == null) {
return;
}
String clusterString = StringUtils.join(clusters, ",");
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
clientProxy.subscribe(serviceName, groupName, clusterString);
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, groupName, clusters, selector, listener);
notifyIfSubscribed(serviceName, groupName, wrapper);
changeNotifier.registerListener(groupName, serviceName, wrapper);
clientProxy.subscribe(serviceName, groupName, Constants.NULL);
}

@Override
Expand All @@ -423,10 +443,25 @@ public void unsubscribe(String serviceName, List<String> clusters, EventListener
@Override
public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
String clustersString = StringUtils.join(clusters, ",");
changeNotifier.deregisterListener(groupName, serviceName, clustersString, listener);
if (!changeNotifier.isSubscribed(groupName, serviceName, clustersString)) {
clientProxy.unsubscribe(serviceName, groupName, clustersString);
NamingSelector clusterSelector = SelectorFactory.newClusterSelector(clusters);
unsubscribe(serviceName, groupName, clusterSelector, listener);
}

@Override
public void unsubscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
throws NacosException {
doUnsubscribe(serviceName, groupName, selector, listener);
}

private void doUnsubscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
throws NacosException {
if (selector == null || listener == null) {
return;
}
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(selector, listener);
changeNotifier.deregisterListener(groupName, serviceName, wrapper);
if (!changeNotifier.isSubscribed(groupName, serviceName)) {
clientProxy.unsubscribe(serviceName, groupName, Constants.NULL);
}
}

Expand Down Expand Up @@ -467,6 +502,23 @@ public void shutDown() throws NacosException {
serviceInfoHolder.shutdown();
clientProxy.shutdown();
NotifyCenter.deregisterSubscriber(changeNotifier);
}

private void notifyIfSubscribed(String serviceName, String groupName, NamingSelectorWrapper wrapper) {
if (changeNotifier.isSubscribed(groupName, serviceName)) {
ServiceInfo serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, Constants.NULL);
InstancesChangeEvent event = transferToEvent(serviceInfo);
wrapper.notifyListener(event);
}
}

private InstancesChangeEvent transferToEvent(ServiceInfo serviceInfo) {
if (serviceInfo == null) {
return null;
}
InstancesDiff diff = new InstancesDiff();
diff.setAddedInstances(serviceInfo.getHosts());
return new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts(), diff);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void run() {
long delayTime = DEFAULT_DELAY;

try {
if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
if (!changeNotifier.isSubscribed(groupName, serviceName) && !futureMap.containsKey(
serviceKey)) {
NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
isCancel = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,18 @@

package com.alibaba.nacos.client.naming.event;

import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.naming.listener.NamingChangeEvent;
import com.alibaba.nacos.client.naming.selector.NamingSelectorWrapper;
import com.alibaba.nacos.client.selector.SelectorManager;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
* A subscriber to notify eventListener callback.
Expand All @@ -43,7 +39,7 @@ public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {

private final String eventScope;

private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<>();
private final SelectorManager<NamingSelectorWrapper> selectorManager = new SelectorManager<>();

@JustForTest
public InstancesChangeNotifier() {
Expand All @@ -59,79 +55,58 @@ public InstancesChangeNotifier(String eventScope) {
*
* @param groupName group name
* @param serviceName serviceName
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @param listener custom listener
* @param wrapper selectorWrapper
*/
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.computeIfAbsent(key, keyInner -> new ConcurrentHashSet<>());
eventListeners.add(listener);
public void registerListener(String groupName, String serviceName, NamingSelectorWrapper wrapper) {
if (wrapper == null) {
return;
}
String subId = NamingUtils.getGroupedName(serviceName, groupName);
selectorManager.addSelectorWrapper(subId, wrapper);
}

/**
* deregister listener.
*
* @param groupName group name
* @param serviceName serviceName
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @param listener custom listener
* @param wrapper selectorWrapper
*/
public void deregisterListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
public void deregisterListener(String groupName, String serviceName, NamingSelectorWrapper wrapper) {
if (wrapper == null) {
return;
}
eventListeners.remove(listener);
if (CollectionUtils.isEmpty(eventListeners)) {
listenerMap.remove(key);
}
String subId = NamingUtils.getGroupedName(serviceName, groupName);
selectorManager.removeSelectorWrapper(subId, wrapper);
}

/**
* check serviceName,clusters is subscribed.
* check serviceName,groupName is subscribed.
*
* @param groupName group name
* @param serviceName serviceName
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @return is serviceName,clusters subscribed
*/
public boolean isSubscribed(String groupName, String serviceName, String clusters) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
return CollectionUtils.isNotEmpty(eventListeners);
public boolean isSubscribed(String groupName, String serviceName) {
String subId = NamingUtils.getGroupedName(serviceName, groupName);
return selectorManager.isSubscribed(subId);
}

public List<ServiceInfo> getSubscribeServices() {
List<ServiceInfo> serviceInfos = new ArrayList<>();
for (String key : listenerMap.keySet()) {
for (String key : selectorManager.getSubscriptions()) {
serviceInfos.add(ServiceInfo.fromKey(key));
}
return serviceInfos;
}

@Override
public void onEvent(InstancesChangeEvent event) {
String key = ServiceInfo
.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (CollectionUtils.isEmpty(eventListeners)) {
return;
String subId = NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName());
Collection<NamingSelectorWrapper> selectorWrappers = selectorManager.getSelectorWrappers(subId);
for (NamingSelectorWrapper selectorWrapper : selectorWrappers) {
selectorWrapper.notifyListener(event);
}
for (final EventListener listener : eventListeners) {
final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
} else {
listener.onEvent(namingEvent);
}
}
}

private com.alibaba.nacos.api.naming.listener.Event transferToNamingEvent(
InstancesChangeEvent instancesChangeEvent) {
return new NamingChangeEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(),
instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts(), instancesChangeEvent.getInstancesDiff());
}

@Override
Expand Down
Loading