Skip to content

Commit

Permalink
apache#1 Aysnc message degisn pattern \t apache#2 ExchangeHandlerAda…
Browse files Browse the repository at this point in the history
…pter closure mechanism
  • Loading branch information
fzsens committed Nov 7, 2018
1 parent 2b4c1b6 commit 09eff65
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class DefaultFuture implements ResponseFuture {

private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();

// trace relationship between request-id and response
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
Expand Down Expand Up @@ -142,6 +143,22 @@ public static void closeChannel(Channel channel) {
}
}

/**
* async message design pattern.
*
* 1. channel is one way communication pattern
* 2. send and received message has not relationship with each other.
* 3. we create a request and sent it through channel, at the sometimes,
* we create a {@link DefaultFuture} and save it to {@link DefaultFuture#FUTURES}.
* see {@link org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(Object)}
* 4. channel will send message independent. see {@link org.apache.dubbo.remoting.transport.AbstractClient#send(Object)}
* 5. received message is independent too. see {@link org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received(Channel, Object)}
* this method will call {@link DefaultFuture#received(Channel, Response)}
* 6. the response.getId equals with request.getId , so we can find the {@link DefaultFuture} that we create for the request.
*
* @param channel
* @param response
*/
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
Expand Down Expand Up @@ -226,6 +243,11 @@ public void setCallback(ResponseCallback callback) {
}
}

/**
* timeout design pattern
* create a task check future's status
* if timeout create a timeout-exception response and return it.
*/
private static class TimeoutCheckTask implements TimerTask {

private DefaultFuture future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public class HeaderExchangeHandler implements ChannelHandlerDelegate {

public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP;

/**
* closure function , is a bridge between network and Invoker
* see {@link DubboProtocol#requestHandler}
*/
private final ExchangeHandler handler;

public HeaderExchangeHandler(ExchangeHandler handler) {
Expand Down Expand Up @@ -216,6 +220,7 @@ public void received(Channel channel, Object message) throws RemotingException {
}
}
} else if (message instanceof Response) {
// client side received response handle response
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,23 @@ protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// deal with async call
ResponseFuture future = currentClient.request(inv, timeout);
// For compatibility
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);

Result result;
if (isAsyncFuture) {
// register resultCallback, sometimes we need the asyn result being processed by the filter chain.
// register resultCallback, sometimes we need the async result being processed by the filter chain.
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
} else {
RpcContext.getContext().setFuture(null);
// sync call current thread will block here until future.isDone or timeout exception throw.
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ public class DubboProtocol extends AbstractProtocol {
//consumer side export a stub service for dispatching event
//servicekey-stubmethods
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<String, String>();

/**
* this handler will wrap in HeaderExchangeHandler.
*
* HeaderExchangeHandler deal with network communication's details. but it didn't know how to find Invoker.
*
* requestHandler just like a closure or hook function , HeaderExchangeHandler can use it find and invoke the Invoker.
*
*/
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

@Override
Expand Down

0 comments on commit 09eff65

Please sign in to comment.