Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.0] Fix concurrency issues of service discovery #9684

Merged
merged 16 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,17 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_RECONNECT_TASK_PERIOD;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_RECONNECT_TASK_TRY_COUNT;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.RECONNECT_TASK_PERIOD;
import static org.apache.dubbo.common.constants.CommonConstants.RECONNECT_TASK_TRY_COUNT;
import static org.apache.dubbo.common.constants.CommonConstants.REGISTER_IP_KEY;
import static org.apache.dubbo.common.utils.StringUtils.isNotEmpty;
import static org.apache.dubbo.rpc.cluster.Constants.CONSUMER_URL_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;

Expand Down Expand Up @@ -152,9 +153,9 @@ public AbstractDirectory(URL url, RouterChain<T> routerChain, boolean isUrlFromR
this.queryMap = applicationModel.getBeanFactory().getBean(ClusterUtils.class).mergeLocalParams(queryMap);

if (consumerUrl == null) {
String host = StringUtils.isNotEmpty(queryMap.get(REGISTER_IP_KEY)) ? queryMap.get(REGISTER_IP_KEY) : this.url.getHost();
String path = StringUtils.isNotEmpty(queryMap.get(PATH_KEY)) ? queryMap.get(PATH_KEY) : queryMap.get(INTERFACE_KEY);
String consumedProtocol = StringUtils.isNotEmpty(queryMap.get(PROTOCOL_KEY)) ? queryMap.get(PROTOCOL_KEY) : DUBBO;
String host = isNotEmpty(queryMap.get(REGISTER_IP_KEY)) ? queryMap.get(REGISTER_IP_KEY) : this.url.getHost();
String path = isNotEmpty(queryMap.get(PATH_KEY)) ? queryMap.get(PATH_KEY) : queryMap.get(INTERFACE_KEY);
String consumedProtocol = isNotEmpty(queryMap.get(PROTOCOL_KEY)) ? queryMap.get(PROTOCOL_KEY) : CONSUMER;

URL consumerUrlFrom = this.url
.setHost(host)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_CONNECTIVITY_VALIDATION;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
Expand Down Expand Up @@ -262,7 +262,7 @@ public void testCloseAvailablecheck() {
private URL turnRegistryUrlToConsumerUrl(URL url, Map<String, String> queryMap) {
String host = StringUtils.isNotEmpty(queryMap.get("register.ip")) ? queryMap.get("register.ip") : this.url.getHost();
String path = queryMap.get(PATH_KEY);
String consumedProtocol = queryMap.get(PROTOCOL_KEY) == null ? DUBBO : queryMap.get(PROTOCOL_KEY);
String consumedProtocol = queryMap.get(PROTOCOL_KEY) == null ? CONSUMER : queryMap.get(PROTOCOL_KEY);

URL consumerUrlFrom = this.url
.setHost(host)
Expand Down
11 changes: 10 additions & 1 deletion dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.HOST_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
Expand Down Expand Up @@ -1398,7 +1400,14 @@ public String getProtocolServiceKey() {
if (protocolServiceKey != null) {
return protocolServiceKey;
}
this.protocolServiceKey = getServiceKey() + ":" + getProtocol();
this.protocolServiceKey = getServiceKey();
/*
Special treatment if this is a consumer subscription url instance with no protocol specified - starts with 'consumer://'
If the specific protocol is specified on the consumer side, then this method will return as normal.
*/
if (!CONSUMER.equals(getProtocol())) {
this.protocolServiceKey += (GROUP_CHAR_SEPARATOR + getProtocol());
}
return protocolServiceKey;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ public class FileCacheStoreFactory {

public static FileCacheStore getInstance(String basePath, String cacheName) {
if (basePath == null) {
basePath = System.getProperty("user.home") + "/.dubbo/";
basePath = System.getProperty("user.home") + File.separator + ".dubbo";
}
if (basePath.endsWith(File.separator)) {
basePath = basePath.substring(0, basePath.length() - 1);
}

File candidate = new File(basePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public class DefaultExecutorRepository implements ExecutorRepository, ExtensionA

private ScheduledExecutorService connectivityScheduledExecutor;

private ScheduledExecutorService cacheRefreshingScheduledExecutor;

private ExecutorService mappingRefreshingExecutor;

public Ring<ScheduledExecutorService> registryNotificationExecutorRing = new Ring<>();

private Ring<ScheduledExecutorService> serviceDiscoveryAddressNotificationExecutorRing = new Ring<>();
Expand Down Expand Up @@ -103,6 +107,8 @@ public DefaultExecutorRepository() {
}

connectivityScheduledExecutor = Executors.newScheduledThreadPool(DEFAULT_SCHEDULER_SIZE, new NamedThreadFactory("Dubbo-connectivity-scheduler", true));
cacheRefreshingScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-cache-refreshing-scheduler", true));
mappingRefreshingExecutor = Executors.newFixedThreadPool(DEFAULT_SCHEDULER_SIZE, new NamedThreadFactory("Dubbo-mapping-refreshing-scheduler", true));
poolRouterExecutor = new ThreadPoolExecutor(1, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024),
new NamedInternalThreadFactory("Dubbo-state-router-pool-router", true), new ThreadPoolExecutor.AbortPolicy());

Expand Down Expand Up @@ -365,6 +371,15 @@ public ScheduledExecutorService getConnectivityScheduledExecutor() {
return connectivityScheduledExecutor;
}

@Override
public ScheduledExecutorService getCacheRefreshingScheduledExecutor() {
return cacheRefreshingScheduledExecutor;
}

public ExecutorService getMappingRefreshingExecutor() {
return mappingRefreshingExecutor;
}

@Override
public void destroyAll() {
logger.info("destroying executor repository ..");
Expand Down Expand Up @@ -398,6 +413,7 @@ public void destroyAll() {

// connectivityScheduledExecutor
shutdownExecutorService(connectivityScheduledExecutor, "connectivityScheduledExecutor");
shutdownExecutorService(cacheRefreshingScheduledExecutor, "cacheRefreshingScheduledExecutor");

// shutdown share executor
shutdownExecutorService(sharedExecutor, "sharedExecutor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ public interface ExecutorRepository {
*/
ScheduledExecutorService getConnectivityScheduledExecutor();

/**
* Scheduler used to refresh file based caches from memory to disk.
*
* @return
*/
ScheduledExecutorService getCacheRefreshingScheduledExecutor();

/**
* Executor used to run async mapping tasks
*
* @return
*/
ExecutorService getMappingRefreshingExecutor();

/**
* Destroy all executors that are not in shutdown state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;

import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -395,4 +396,14 @@ public static <T> T first(Collection<T> values) {
}
}

public static <T> Set<T> toTreeSet(Set<T> set) {
if (isEmpty(set)) {
return set;
}
if (!(set instanceof TreeSet)) {
set = new TreeSet<>(set);
}
return set;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ public class FileCacheStoreFactoryTest {
@Test
public void testSafeName() throws URISyntaxException {
FileCacheStore store1 = FileCacheStoreFactory.getInstance(getDirectoryOfClassPath(), "../../../dubbo");
Assertions.assertEquals(getDirectoryOfClassPath() + File.separator + "..%002f..%002f..%002fdubbo.dubbo.cache", store1.getCacheFilePath());
Assertions.assertEquals(getDirectoryOfClassPath() + "..%002f..%002f..%002fdubbo.dubbo.cache", store1.getCacheFilePath());
store1.destroy();

FileCacheStore store2 = FileCacheStoreFactory.getInstance(getDirectoryOfClassPath(), "../../../中文");
Assertions.assertEquals(getDirectoryOfClassPath() + File.separator + "..%002f..%002f..%002f%4e2d%6587.dubbo.cache", store2.getCacheFilePath());
Assertions.assertEquals(getDirectoryOfClassPath() + "..%002f..%002f..%002f%4e2d%6587.dubbo.cache", store2.getCacheFilePath());
store2.destroy();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ protected synchronized void init() {
private void initServiceAppsMapping(Map<String, String> referenceParameters) {
ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension(getScopeModel());
URL url = new ServiceConfigURL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceName, referenceParameters);
serviceNameMapping.getServices(url);
serviceNameMapping.initInterfaceAppMapping(url);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,9 @@ private URL exportRemote(URL url, List<URL> registryURLs) {

if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url.getServiceKey() + " to registry " + registryURL.getAddress());
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL.getAddress());
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url.getServiceKey());
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.dubbo.config.mock;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.rpc.model.ApplicationModel;
Expand Down Expand Up @@ -49,7 +48,7 @@ public void doRegister(ServiceInstance serviceInstance) throws RuntimeException
}

@Override
public void doUpdate(ServiceInstance serviceInstance, MetadataInfo metadataInfo) throws RuntimeException {
public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
this.serviceInstance = serviceInstance;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.dubbo.config.spring.registry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.rpc.model.ApplicationModel;
Expand Down Expand Up @@ -48,7 +47,7 @@ public void doRegister(ServiceInstance serviceInstance) throws RuntimeException
}

@Override
public void doUpdate(ServiceInstance serviceInstance, MetadataInfo metadataInfo) throws RuntimeException {
public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,16 @@

<dubbo:registry id="demo1" address="zookeeper://127.0.0.1:2181?registry-type=service"/>

<dubbo:protocol name="dubbo" port="-1"/>
<dubbo:protocol name="rest" port="-1"/>
<dubbo:protocol name="tri" port="-1"/>

<dubbo:reference id="demoService" check="false"
interface="org.apache.dubbo.demo.DemoService"/>

<dubbo:reference version="1.0.0" group="greeting" id="greetingService" check="false"
interface="org.apache.dubbo.demo.GreetingService"/>

<dubbo:reference protocol="rest" version="1.0.0" id="restDemoService" check="false"
<dubbo:reference version="1.0.0" id="restDemoService" check="false"
interface="org.apache.dubbo.demo.RestDemoService"/>

<dubbo:reference protocol="tri" version="1.0.0" id="tripleService" check="false"
<dubbo:reference version="1.0.0" id="tripleService" check="false"
interface="org.apache.dubbo.demo.TripleService"/>

</beans>
Loading