Skip to content

Commit

Permalink
[ISSUE #10374] Add default selectors (#11142)
Browse files Browse the repository at this point in the history
* Add default selectors

* add tests

* Update SubscribeSelector_ITCase

* add unsubscribe test

* Removes some methods for NamingSelectorFactor

* Update SelectorManager
  • Loading branch information
ldyedu committed Sep 22, 2023
1 parent 4ad98d8 commit 263e223
Show file tree
Hide file tree
Showing 11 changed files with 739 additions and 225 deletions.
24 changes: 22 additions & 2 deletions api/src/main/java/com/alibaba/nacos/api/naming/NamingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,17 @@ Instance selectOneHealthyInstance(String serviceName, String groupName, List<Str
*/
void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException;


/**
* Subscribe service to receive events of instances alteration.
*
* @param serviceName name of service
* @param selector selector of instances
* @param listener event listener
* @throws NacosException nacos exception
*/
void subscribe(String serviceName, NamingSelector selector, EventListener listener) throws NacosException;

/**
* Subscribe service to receive events of instances alteration.
*
Expand Down Expand Up @@ -544,7 +554,17 @@ void subscribe(String serviceName, String groupName, NamingSelector selector, Ev
*/
void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException;


/**
* Unsubscribe event listener of service.
*
* @param serviceName name of service
* @param selector selector of instances
* @param listener event listener
* @throws NacosException nacos exception
*/
void unsubscribe(String serviceName, NamingSelector selector, EventListener listener) throws NacosException;

/**
* Unsubscribe event listener of service.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.InitUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.client.selector.SelectorFactory;
import com.alibaba.nacos.client.naming.selector.NamingSelectorFactory;
import com.alibaba.nacos.client.utils.ValidatorUtils;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.StringUtils;
Expand All @@ -50,7 +50,7 @@
import java.util.Properties;
import java.util.UUID;

import static com.alibaba.nacos.client.selector.SelectorFactory.getUniqueClusterString;
import static com.alibaba.nacos.client.naming.selector.NamingSelectorFactory.getUniqueClusterString;

/**
* Nacos Naming Service.
Expand Down Expand Up @@ -404,10 +404,15 @@ public void subscribe(String serviceName, List<String> clusters, EventListener l
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
NamingSelector clusterSelector = SelectorFactory.newClusterSelector(clusters);
NamingSelector clusterSelector = NamingSelectorFactory.newClusterSelector(clusters);
doSubscribe(serviceName, groupName, getUniqueClusterString(clusters), clusterSelector, listener);
}

@Override
public void subscribe(String serviceName, NamingSelector selector, EventListener listener) throws NacosException {
subscribe(serviceName, Constants.DEFAULT_GROUP, selector, listener);
}

@Override
public void subscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
throws NacosException {
Expand Down Expand Up @@ -443,10 +448,15 @@ public void unsubscribe(String serviceName, List<String> clusters, EventListener
@Override
public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
NamingSelector clusterSelector = SelectorFactory.newClusterSelector(clusters);
NamingSelector clusterSelector = NamingSelectorFactory.newClusterSelector(clusters);
unsubscribe(serviceName, groupName, clusterSelector, listener);
}

@Override
public void unsubscribe(String serviceName, NamingSelector selector, EventListener listener) throws NacosException {
unsubscribe(serviceName, Constants.DEFAULT_GROUP, selector, listener);
}

@Override
public void unsubscribe(String serviceName, String groupName, NamingSelector selector, EventListener listener)
throws NacosException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,45 @@
* limitations under the License.
*/

package com.alibaba.nacos.client.selector;
package com.alibaba.nacos.client.naming.selector;

import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.selector.NamingSelector;
import com.alibaba.nacos.client.naming.selector.DefaultNamingSelector;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.StringUtils;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.regex.Pattern;

/**
* Selectors factory.
*
* @author lideyou
*/
public final class SelectorFactory {
private static final NamingSelector EMPTY_SELECTOR = context -> context::getInstances;

public final class NamingSelectorFactory {

public static final NamingSelector EMPTY_SELECTOR = context -> context::getInstances;

public static final NamingSelector HEALTHY_SELECTOR = new DefaultNamingSelector(Instance::isHealthy);

/**
* Cluster selector.
*/
private static class ClusterSelector extends DefaultNamingSelector {

private final String clusterString;

public ClusterSelector(Predicate<Instance> filter, String clusterString) {
super(filter);
this.clusterString = clusterString;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -59,16 +64,16 @@ public boolean equals(Object o) {
ClusterSelector that = (ClusterSelector) o;
return Objects.equals(this.clusterString, that.clusterString);
}

@Override
public int hashCode() {
return Objects.hashCode(this.clusterString);
}
}

private SelectorFactory() {
private NamingSelectorFactory() {
}

/**
* Create a cluster selector.
*
Expand All @@ -85,9 +90,61 @@ public static NamingSelector newClusterSelector(Collection<String> clusters) {
return EMPTY_SELECTOR;
}
}


/**
* Create a IP selector.
*
* @param regex regular expression of IP
* @return IP selector
*/
public static NamingSelector newIpSelector(String regex) {
if (regex == null) {
throw new IllegalArgumentException("The parameter 'regex' cannot be null.");
}
return new DefaultNamingSelector(instance -> Pattern.matches(regex, instance.getIp()));
}

/**
* Create a metadata selector.
*
* @param metadata metadata that needs to be matched
* @return metadata selector
*/
public static NamingSelector newMetadataSelector(Map<String, String> metadata) {
return newMetadataSelector(metadata, false);
}

/**
* Create a metadata selector.
*
* @param metadata target metadata
* @param isAny true if any of the metadata needs to be matched, false if all the metadata need to be matched.
* @return metadata selector
*/
public static NamingSelector newMetadataSelector(Map<String, String> metadata, boolean isAny) {
if (metadata == null) {
throw new IllegalArgumentException("The parameter 'metadata' cannot be null.");
}

Predicate<Instance> filter = instance -> instance.getMetadata().size() >= metadata.size();

for (Map.Entry<String, String> entry : metadata.entrySet()) {
Predicate<Instance> nextFilter = instance -> {
Map<String, String> map = instance.getMetadata();
return Objects.equals(map.get(entry.getKey()), entry.getValue());
};
if (isAny) {
filter = filter.or(nextFilter);
} else {
filter = filter.and(nextFilter);
}
}
return new DefaultNamingSelector(filter);
}

public static String getUniqueClusterString(Collection<String> cluster) {
TreeSet<String> treeSet = new TreeSet<>(cluster);
return StringUtils.join(treeSet, ",");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,25 @@
* @author lideyou
*/
public class SelectorManager<S extends AbstractSelectorWrapper<?, ?, ?>> {

Map<String, Set<S>> selectorMap = new ConcurrentHashMap<>();

/**
* Add a selectorWrapper to subId.
*
* @param subId subscription id
* @param selector selector wrapper
* @param subId subscription id
* @param wrapper selector wrapper
*/
public void addSelectorWrapper(String subId, S selector) {
Set<S> selectors = selectorMap.computeIfAbsent(subId, key -> new ConcurrentHashSet<>());
selectors.add(selector);
public void addSelectorWrapper(String subId, S wrapper) {
selectorMap.compute(subId, (k, v) -> {
if (v == null) {
v = new ConcurrentHashSet<>();
}
v.add(wrapper);
return v;
});
}

/**
* Get all SelectorWrappers by id.
*
Expand All @@ -52,24 +58,20 @@ public void addSelectorWrapper(String subId, S selector) {
public Set<S> getSelectorWrappers(String subId) {
return selectorMap.get(subId);
}

/**
* Remove a SelectorWrapper by id.
*
* @param subId subscription id
* @param selector selector wrapper
* @param subId subscription id
* @param wrapper selector wrapper
*/
public void removeSelectorWrapper(String subId, S selector) {
Set<S> selectors = selectorMap.get(subId);
if (selectors == null) {
return;
}
selectors.remove(selector);
if (CollectionUtils.isEmpty(selectors)) {
selectorMap.remove(subId);
}
public void removeSelectorWrapper(String subId, S wrapper) {
selectorMap.computeIfPresent(subId, (k, v) -> {
v.remove(wrapper);
return v.isEmpty() ? null : v;
});
}

/**
* Remove a subscription by id.
*
Expand All @@ -78,7 +80,7 @@ public void removeSelectorWrapper(String subId, S selector) {
public void removeSubscription(String subId) {
selectorMap.remove(subId);
}

/**
* Get all subscriptions.
*
Expand All @@ -87,7 +89,7 @@ public void removeSubscription(String subId) {
public Set<String> getSubscriptions() {
return selectorMap.keySet();
}

/**
* Determine whether subId is subscribed.
*
Expand Down
Loading

0 comments on commit 263e223

Please sign in to comment.