diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java index 9bd3497e143..d1b58ac0a05 100644 --- a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java +++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java @@ -25,6 +25,8 @@ import com.alibaba.dubbo.common.utils.NetUtils; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; +import com.alibaba.dubbo.rpc.RpcContext; +import com.alibaba.dubbo.rpc.RpcInvocation; import com.alibaba.dubbo.rpc.Result; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.Directory; @@ -33,6 +35,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -225,6 +228,13 @@ private Invoker reselect(LoadBalance loadbalance, Invocation invocation, public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance = null; + + // binding attachments into invocation. + Map contextAttachments = RpcContext.getContext().getAttachments(); + if (contextAttachments != null && contextAttachments.size() != 0) { + ((RpcInvocation) invocation).addAttachments(contextAttachments); + } + List> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvoker.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvoker.java index 7d9a58d824b..ae652f704bf 100644 --- a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvoker.java +++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvoker.java @@ -57,50 +57,55 @@ public ForkingClusterInvoker(Directory directory) { @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException { - checkInvokers(invokers, invocation); - final List> selected; - final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); - final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); - if (forks <= 0 || forks >= invokers.size()) { - selected = invokers; - } else { - selected = new ArrayList>(); - for (int i = 0; i < forks; i++) { - // TODO. Add some comment here, refer chinese version for more details. - Invoker invoker = select(loadbalance, invocation, invokers, selected); - if (!selected.contains(invoker)) {//Avoid add the same invoker several times. - selected.add(invoker); + try { + checkInvokers(invokers, invocation); + final List> selected; + final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); + final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); + if (forks <= 0 || forks >= invokers.size()) { + selected = invokers; + } else { + selected = new ArrayList>(); + for (int i = 0; i < forks; i++) { + // TODO. Add some comment here, refer chinese version for more details. + Invoker invoker = select(loadbalance, invocation, invokers, selected); + if (!selected.contains(invoker)) {//Avoid add the same invoker several times. + selected.add(invoker); + } } } - } - RpcContext.getContext().setInvokers((List) selected); - final AtomicInteger count = new AtomicInteger(); - final BlockingQueue ref = new LinkedBlockingQueue(); - for (final Invoker invoker : selected) { - executor.execute(new Runnable() { - @Override - public void run() { - try { - Result result = invoker.invoke(invocation); - ref.offer(result); - } catch (Throwable e) { - int value = count.incrementAndGet(); - if (value >= selected.size()) { - ref.offer(e); + RpcContext.getContext().setInvokers((List) selected); + final AtomicInteger count = new AtomicInteger(); + final BlockingQueue ref = new LinkedBlockingQueue(); + for (final Invoker invoker : selected) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + Result result = invoker.invoke(invocation); + ref.offer(result); + } catch (Throwable e) { + int value = count.incrementAndGet(); + if (value >= selected.size()) { + ref.offer(e); + } } } + }); + } + try { + Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); + if (ret instanceof Throwable) { + Throwable e = (Throwable) ret; + throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } - }); - } - try { - Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); - if (ret instanceof Throwable) { - Throwable e = (Throwable) ret; - throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); + return (Result) ret; + } catch (InterruptedException e) { + throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); } - return (Result) ret; - } catch (InterruptedException e) { - throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); + } finally { + // clear attachments which is binding to current thread. + RpcContext.getContext().clearAttachments(); } } } diff --git a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java index 611683ce000..cac83e93ea5 100644 --- a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java +++ b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java @@ -23,6 +23,7 @@ import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Result; +import com.alibaba.dubbo.rpc.RpcContext; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.RpcInvocation; import com.alibaba.dubbo.rpc.cluster.Directory; @@ -132,6 +133,31 @@ protected Result doInvoke(Invocation invocation, List invokers, LoadBalance load } + @Test + public void testBindingAttachment() { + final String attachKey = "attach"; + final String attachValue = "value"; + + // setup attachment + RpcContext.getContext().setAttachment(attachKey, attachValue); + Map attachments = RpcContext.getContext().getAttachments(); + Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1); + + cluster = new AbstractClusterInvoker(dic) { + @Override + protected Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance) + throws RpcException { + // attachment will be bind to invocation + String value = invocation.getAttachment(attachKey); + Assert.assertTrue("binding attachment failed!", value != null && value.equals(attachValue)); + return null; + } + }; + + // invoke + cluster.invoke(invocation); + } + @Test public void testSelect_Invokersize0() throws Exception { { diff --git a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java index f41864ffb39..00a1de41e2d 100644 --- a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java +++ b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java @@ -166,7 +166,7 @@ public void testNoInvoke() { * then we should reselect from the latest invokers before retry. */ @Test - public void testInvokerDestoryAndReList() { + public void testInvokerDestroyAndReList() { final URL url = URL.valueOf("test://localhost/" + Demo.class.getName() + "?loadbalance=roundrobin&retries=" + retries); RpcException exception = new RpcException(RpcException.TIMEOUT_EXCEPTION); MockInvoker invoker1 = new MockInvoker(Demo.class, url); diff --git a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java index 5a6886c0f4e..066c01aeac0 100644 --- a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java +++ b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java @@ -19,25 +19,29 @@ import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Result; +import com.alibaba.dubbo.rpc.RpcContext; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.RpcInvocation; import com.alibaba.dubbo.rpc.RpcResult; import com.alibaba.dubbo.rpc.cluster.Directory; -import junit.framework.Assert; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; /** * ForkingClusterInvokerTest - * */ @SuppressWarnings("unchecked") public class ForkingClusterInvokerTest { @@ -71,6 +75,7 @@ public void setUp() throws Exception { invokers.add(invoker3); } + private void resetInvokerToException() { given(invoker1.invoke(invocation)).willThrow(new RuntimeException()); given(invoker1.getUrl()).willReturn(url); @@ -106,7 +111,7 @@ private void resetInvokerToNoException() { } @Test - public void testInvokeExceptoin() { + public void testInvokeException() { resetInvokerToException(); ForkingClusterInvoker invoker = new ForkingClusterInvoker( dic); @@ -115,20 +120,44 @@ public void testInvokeExceptoin() { invoker.invoke(invocation); Assert.fail(); } catch (RpcException expected) { - Assert.assertTrue(expected.getMessage().contains("Failed to forking invoke provider")); + assertThat(expected.getMessage().contains("Failed to forking invoke provider"), is(true)); + assertFalse(expected.getCause() instanceof RpcException); + } + } + + @Test + public void testClearRpcContext() { + resetInvokerToException(); + ForkingClusterInvoker invoker = new ForkingClusterInvoker( + dic); + + String attachKey = "attach"; + String attachValue = "value"; + + RpcContext.getContext().setAttachment(attachKey, attachValue); + + Map attachments = RpcContext.getContext().getAttachments(); + assertThat("set attachment failed!", attachments != null && attachments.size() == 1, is(true)); + try { + invoker.invoke(invocation); + Assert.fail(); + } catch (RpcException expected) { + assertThat(expected.getMessage().contains("Failed to forking invoke provider"), is(true)); assertFalse(expected.getCause() instanceof RpcException); } + Map afterInvoke = RpcContext.getContext().getAttachments(); + assertThat(afterInvoke != null && afterInvoke.size() == 0, is(true)); } @Test() - public void testInvokeNoExceptoin() { + public void testInvokeNoException() { resetInvokerToNoException(); ForkingClusterInvoker invoker = new ForkingClusterInvoker( dic); Result ret = invoker.invoke(invocation); - Assert.assertSame(result, ret); + assertSame(result, ret); } } \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java index dd5c158e332..2e4980b72d1 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java @@ -135,7 +135,7 @@ public Result invoke(Invocation inv) throws RpcException { invocation.addAttachmentsIfAbsent(attachment); } Map contextAttachments = RpcContext.getContext().getAttachments(); - if (contextAttachments != null) { + if (contextAttachments != null && contextAttachments.size() != 0) { /** * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here, * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered