Skip to content

Commit

Permalink
Async optimization (apache#3738)
Browse files Browse the repository at this point in the history
* Result implement CF

* Result implement CF

* Result implement CF

* Add AsyncRpcResult

* Fix bugs and refactor Filter

* Try to add onSend onError for Filter

* invoke different filter method according to result status.

*  make generic work with async call, including add $invokeAsync

* refactor legacy Filter implementation to work with onResponse.

* demo changes

* Fixes apache#3620, provider attachment lose on consumer side, fix this by reverting RpcContext copy

* AsyncRpcResult should always holds an Invocation instance

* refactor filter signature

* reimplement embedded Filters

* use ProviderModel modification in 3.x

* Fix address notification processing workflow after merging 3.x branch

* Fix UT

* Fix UT

* Unit test of JValidator; Clean code of JValidator (apache#3723)

* Fixes apache#3625 (apache#3730)

use constant to replace magic number

* Fix conflict when merging master and 3.x

* Fix conflict when merging master and 3.x

* Result interface itself has Future status.

* Fix DefaultFuture UT

* Wrap all protocol Invoker with AsyncToSyncInvoker & Fix UT

* Add license

* fix UT

* Fix ut in MonitorFilterTest

* avoid duplicate async to sync wrapper

* return async result in CacheFilter.

* fix UT in CacheFilterTest

* Add generic condition check to GenericFilter callback.

* Fix UT

* Get generic from RpcContext if the value in Invocation is empty.

* Fix RSocketProtocol to meet AbstractProtocol adjustment

* rename RpcResult to AppResponse to help avoid confusion with AsyncRpcResult.

* RSocket module switch to AsyncRpcResult
  • Loading branch information
chickenlj committed May 8, 2019
1 parent f2bed88 commit 1f52668
Show file tree
Hide file tree
Showing 132 changed files with 1,751 additions and 1,571 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.dubbo.common.timer.Timer;
import org.apache.dubbo.common.timer.TimerTask;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;

Expand Down Expand Up @@ -99,7 +99,7 @@ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, Load
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
addFailed(loadbalance, invocation, invokers, invoker);
return new RpcResult(); // ignore
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;

Expand Down Expand Up @@ -50,7 +50,7 @@ public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBal
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return new RpcResult(); // ignore
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.atomic.AtomicInteger;

/**
* NOTICE! This implementation does not work well with async call.
*
* Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources.
*
* <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a>
Expand Down Expand Up @@ -66,7 +68,6 @@ public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, L
} else {
selected = new ArrayList<>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {
//Avoid add the same invoker several times.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.Merger;
Expand All @@ -41,12 +41,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* NOTICE! Does not work with async call.
* @param <T>
*/
@SuppressWarnings("unchecked")
public class MergeableClusterInvoker<T> extends AbstractClusterInvoker<T> {

Expand Down Expand Up @@ -86,26 +87,19 @@ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, Load
returnType = null;
}

Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
Map<String, Result> results = new HashMap<>();
for (final Invoker<T> invoker : invokers) {
Future<Result> future = executor.submit(new Callable<Result>() {
@Override
public Result call() throws Exception {
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
results.put(invoker.getUrl().getServiceKey(), future);
results.put(invoker.getUrl().getServiceKey(), invoker.invoke(new RpcInvocation(invocation, invoker)));
}

Object result = null;

List<Result> resultList = new ArrayList<Result>(results.size());

int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
for (Map.Entry<String, Result> entry : results.entrySet()) {
Result asyncResult = entry.getValue();
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
Result r = asyncResult.get();
if (r.hasException()) {
log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
" failed: " + r.getException().getMessage(),
Expand All @@ -119,13 +113,13 @@ public Result call() throws Exception {
}

if (resultList.isEmpty()) {
return new RpcResult((Object) null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else if (resultList.size() == 1) {
return resultList.iterator().next();
}

if (returnType == void.class) {
return new RpcResult((Object) null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
}

if (merger.startsWith(".")) {
Expand Down Expand Up @@ -173,7 +167,7 @@ public Result call() throws Exception {
throw new RpcException("There is no merger to merge result.");
}
}
return new RpcResult(result);
return AsyncRpcResult.newDefaultAsyncResult(result, invocation);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.support.MockInvoker;

Expand Down Expand Up @@ -113,7 +113,7 @@ private Result doMockInvoke(Invocation invocation, RpcException e) {
result = minvoker.invoke(invocation);
} catch (RpcException me) {
if (me.isBiz()) {
result = new RpcResult(me.getCause());
result = AsyncRpcResult.newDefaultAsyncResult(me.getCause(), invocation);
} else {
throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;

import org.junit.jupiter.api.Assertions;
Expand All @@ -48,7 +48,7 @@ public class StickyTest {
private Invoker<StickyTest> invoker2 = mock(Invoker.class);
private RpcInvocation invocation;
private Directory<StickyTest> dic;
private Result result = new RpcResult();
private Result result = new AppResponse();
private StickyClusterInvoker<StickyTest> clusterinvoker = null;
private URL url = URL.valueOf("test://test:11/test?"
+ "&loadbalance=roundrobin"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ public Map<String, String> getAttachments() {
return attachments;
}

@Override
public void setAttachment(String key, String value) {

}

@Override
public void setAttachmentIfAbsent(String key, String value) {

}

public Invoker<?> getInvoker() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.RouterFactory;
Expand Down Expand Up @@ -52,7 +52,7 @@ public class FileRouterEngineTest {
Invoker<FileRouterEngineTest> invoker2 = mock(Invoker.class);
Invocation invocation;
StaticDirectory<FileRouterEngineTest> dic;
Result result = new RpcResult();
Result result = new AppResponse();
private RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();

@BeforeAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.LogUtil;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.filter.DemoService;

Expand All @@ -48,7 +48,7 @@ public class FailSafeClusterInvokerTest {
Invoker<DemoService> invoker = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<DemoService> dic;
Result result = new RpcResult();
Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.DubboAppender;
import org.apache.dubbo.common.utils.LogUtil;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;

import org.apache.log4j.Level;
Expand Down Expand Up @@ -59,7 +59,7 @@ public class FailbackClusterInvokerTest {
Invoker<FailbackClusterInvokerTest> invoker = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<FailbackClusterInvokerTest> dic;
Result result = new RpcResult();
Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;

import org.junit.jupiter.api.Assertions;
Expand All @@ -47,7 +47,7 @@ public class FailfastClusterInvokerTest {
Invoker<FailfastClusterInvokerTest> invoker1 = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<FailfastClusterInvokerTest> dic;
Result result = new RpcResult();
Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class FailoverClusterInvokerTest {
private Invoker<FailoverClusterInvokerTest> invoker2 = mock(Invoker.class);
private RpcInvocation invocation = new RpcInvocation();
private Directory<FailoverClusterInvokerTest> dic;
private Result result = new RpcResult();
private Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.cluster.Directory;

import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -50,7 +50,7 @@ public class ForkingClusterInvokerTest {
private Invoker<ForkingClusterInvokerTest> invoker3 = mock(Invoker.class);
private RpcInvocation invocation = new RpcInvocation();
private Directory<ForkingClusterInvokerTest> dic;
private Result result = new RpcResult();
private Result result = new AppResponse();

@BeforeEach
public void setUp() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;

import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -119,7 +120,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
return MenuService.class;
}
if ("invoke".equals(method.getName())) {
return new RpcResult(firstMenu);
return AsyncRpcResult.newDefaultAsyncResult(firstMenu, invocation);
}
return null;
}
Expand All @@ -135,7 +136,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
return MenuService.class;
}
if ("invoke".equals(method.getName())) {
return new RpcResult(secondMenu);
return AsyncRpcResult.newDefaultAsyncResult(secondMenu, invocation);
}
return null;
}
Expand Down Expand Up @@ -195,14 +196,14 @@ public void testAddMenu() throws Exception {
given(firstInvoker.getUrl()).willReturn(
url.addParameter(Constants.GROUP_KEY, "first"));
given(firstInvoker.getInterface()).willReturn(MenuService.class);
given(firstInvoker.invoke(invocation)).willReturn(new RpcResult())
given(firstInvoker.invoke(invocation)).willReturn(new AppResponse())
;
given(firstInvoker.isAvailable()).willReturn(true);

given(secondInvoker.getUrl()).willReturn(
url.addParameter(Constants.GROUP_KEY, "second"));
given(secondInvoker.getInterface()).willReturn(MenuService.class);
given(secondInvoker.invoke(invocation)).willReturn(new RpcResult())
given(secondInvoker.invoke(invocation)).willReturn(new AppResponse())
;
given(secondInvoker.isAvailable()).willReturn(true);

Expand Down
Loading

0 comments on commit 1f52668

Please sign in to comment.