Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
panxiaojun233 committed Dec 16, 2020
1 parent 59abd04 commit 4ad4373
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@
package org.apache.dubbo.rpc.cluster;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool;
import org.apache.dubbo.common.utils.BitList;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.cluster.router.state.AddrCache;
import org.apache.dubbo.rpc.cluster.router.state.RouterCache;
import org.apache.dubbo.rpc.cluster.router.state.StateRouter;
Expand Down Expand Up @@ -75,10 +81,12 @@ public class RouterChain<T> {
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), new NamedInternalThreadFactory("dubbo-state-router-loop-",true), new ThreadPoolExecutor.AbortPolicy());

private final static ExecutorService poolRouterThreadPool = new ThreadPoolExecutor(1, 1,
private final static ExecutorService poolRouterThreadPool = new ThreadPoolExecutor(1, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), new NamedInternalThreadFactory("dubbo-state-router-pool-",true), new ThreadPoolExecutor.AbortPolicy());

private static final Logger logger = LoggerFactory.getLogger(StaticDirectory.class);

public static <T> RouterChain<T> buildChain(URL url) {
return new RouterChain<>(url);
}
Expand Down Expand Up @@ -176,9 +184,14 @@ private void sort() {
public List<Invoker<T>> route(URL url, Invocation invocation) {

AddrCache cache = this.cache.get();
//if (cache == null) {
// buildCache();
//}
if (cache == null) {
throw new RpcException(RpcException.ROUTER_CACHE_NOT_BUILD, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + url.getServiceInterface()
+ ". address cache not build "
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion()
+ ".");
}
BitList<Invoker<T>> finalBitListInvokers = new BitList<Invoker<T>>(invokers, false);
for (StateRouter stateRouter : stateRouters) {
if (stateRouter.isEnable()) {
Expand Down Expand Up @@ -225,11 +238,11 @@ private void buildCache() {
public void run() {
RouterCache routerCache = null;
try {
routerCache = poolRouter(stateRouter, origin, new ArrayList<>(copyInvokers));
routerCache = poolRouter(stateRouter, origin, copyInvokers);
//file cache
newCache.getCache().put(stateRouter.getName(), routerCache);
} catch (Exception e) {
e.printStackTrace();
} catch (Throwable t) {
logger.error("Failed to pool router: " + stateRouter.getUrl() + ", cause: " + t.getMessage(), t);
} finally {
cdl.countDown();
}
Expand All @@ -248,13 +261,16 @@ public void run() {

private RouterCache poolRouter(StateRouter router, AddrCache orign, List<Invoker<T>> invokers) {
String routerName = router.getName();

RouterCache routerCache = null;
if (isCacheMiss(orign, routerName) || router.shouldRePool()) {
return router.pool(invokers);

} else {
return orign.getCache().get(routerName);
routerCache = orign.getCache().get(routerName);
}
if (routerCache == null) {
return new RouterCache();
}
return routerCache;
}

private boolean isCacheMiss(AddrCache cache, String routerName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
public static final int NO_INVOKER_AVAILABLE_AFTER_FILTER = 6;
public static final int LIMIT_EXCEEDED_EXCEPTION = 7;
public static final int TIMEOUT_TERMINATE = 8;
public static final int ROUTER_CACHE_NOT_BUILD = 9;
private static final long serialVersionUID = 7815426752583648734L;
/**
* RpcException cannot be extended, use error code for exception type to keep compatibility
Expand Down

0 comments on commit 4ad4373

Please sign in to comment.