From 09eff654cf16dfa40581277c416337d9ff331051 Mon Sep 17 00:00:00 2001 From: fzsens Date: Wed, 7 Nov 2018 11:30:42 +0800 Subject: [PATCH] #1 Aysnc message degisn pattern \t #2 ExchangeHandlerAdapter closure mechanism --- .../exchange/support/DefaultFuture.java | 22 +++++++++++++++++++ .../support/header/HeaderExchangeHandler.java | 5 +++++ .../rpc/protocol/dubbo/DubboInvoker.java | 4 +++- .../rpc/protocol/dubbo/DubboProtocol.java | 9 ++++++++ 4 files changed, 39 insertions(+), 1 deletion(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java index 5fa6cfe9a47..ad29f8b885f 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java @@ -50,6 +50,7 @@ public class DefaultFuture implements ResponseFuture { private static final Map CHANNELS = new ConcurrentHashMap<>(); + // trace relationship between request-id and response private static final Map FUTURES = new ConcurrentHashMap<>(); public static final Timer TIME_OUT_TIMER = new HashedWheelTimer( @@ -142,6 +143,22 @@ public static void closeChannel(Channel channel) { } } + /** + * async message design pattern. + * + * 1. channel is one way communication pattern + * 2. send and received message has not relationship with each other. + * 3. we create a request and sent it through channel, at the sometimes, + * we create a {@link DefaultFuture} and save it to {@link DefaultFuture#FUTURES}. + * see {@link org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(Object)} + * 4. channel will send message independent. see {@link org.apache.dubbo.remoting.transport.AbstractClient#send(Object)} + * 5. received message is independent too. see {@link org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received(Channel, Object)} + * this method will call {@link DefaultFuture#received(Channel, Response)} + * 6. the response.getId equals with request.getId , so we can find the {@link DefaultFuture} that we create for the request. + * + * @param channel + * @param response + */ public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); @@ -226,6 +243,11 @@ public void setCallback(ResponseCallback callback) { } } + /** + * timeout design pattern + * create a task check future's status + * if timeout create a timeout-exception response and return it. + */ private static class TimeoutCheckTask implements TimerTask { private DefaultFuture future; diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java index 35ba609bea2..99f13285d79 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java @@ -51,6 +51,10 @@ public class HeaderExchangeHandler implements ChannelHandlerDelegate { public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP; + /** + * closure function , is a bridge between network and Invoker + * see {@link DubboProtocol#requestHandler} + */ private final ExchangeHandler handler; public HeaderExchangeHandler(ExchangeHandler handler) { @@ -216,6 +220,7 @@ public void received(Channel channel, Object message) throws RemotingException { } } } else if (message instanceof Response) { + // client side received response handle response handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java index 93343c0c26f..cb445f07336 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java @@ -90,6 +90,7 @@ protected Result doInvoke(final Invocation invocation) throws Throwable { RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { + // deal with async call ResponseFuture future = currentClient.request(inv, timeout); // For compatibility FutureAdapter futureAdapter = new FutureAdapter<>(future); @@ -97,7 +98,7 @@ protected Result doInvoke(final Invocation invocation) throws Throwable { Result result; if (isAsyncFuture) { - // register resultCallback, sometimes we need the asyn result being processed by the filter chain. + // register resultCallback, sometimes we need the async result being processed by the filter chain. result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } else { result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); @@ -105,6 +106,7 @@ protected Result doInvoke(final Invocation invocation) throws Throwable { return result; } else { RpcContext.getContext().setFuture(null); + // sync call current thread will block here until future.isDone or timeout exception throw. return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 324e93deda9..3cd4f5c9135 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -74,6 +74,15 @@ public class DubboProtocol extends AbstractProtocol { //consumer side export a stub service for dispatching event //servicekey-stubmethods private final ConcurrentMap stubServiceMethodsMap = new ConcurrentHashMap(); + + /** + * this handler will wrap in HeaderExchangeHandler. + * + * HeaderExchangeHandler deal with network communication's details. but it didn't know how to find Invoker. + * + * requestHandler just like a closure or hook function , HeaderExchangeHandler can use it find and invoke the Invoker. + * + */ private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override