Skip to content

Commit

Permalink
Merge 5e3f88f into 0e4def2
Browse files Browse the repository at this point in the history
  • Loading branch information
lrhkobe authored Aug 2, 2021
2 parents 0e4def2 + 5e3f88f commit bab7c3c
Showing 1 changed file with 41 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.SessionState;
Expand Down Expand Up @@ -76,9 +75,46 @@ protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Excep

dispatch(ctx, pkg, startTime, cmd);
} catch (Exception e) {
logger.error("exception occurred while pkg|cmd={}|pkg={}|errMsg={}", cmd, pkg, e);
//throw new RuntimeException(e);
throw e;
logger.error("exception occurred while pkg|cmd={}|pkg={}", cmd, pkg, e);
writeToClient(cmd, pkg, ctx, e);
}
}

private void writeToClient(Command cmd, Package pkg, ChannelHandlerContext ctx, Exception e){
try{
Package res = new Package();
res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(), pkg.getHeader()
.getSeq()));
ctx.writeAndFlush(res);
}catch (Exception ex){
logger.warn("writeToClient failed", ex);
}
}

private Command getReplyCommand(Command cmd){
switch (cmd) {
case HELLO_REQUEST:
return Command.HELLO_RESPONSE;
case RECOMMEND_REQUEST:
return Command.RECOMMEND_RESPONSE;
case HEARTBEAT_REQUEST:
return Command.HEARTBEAT_RESPONSE;
case SUBSCRIBE_REQUEST:
return Command.SUBSCRIBE_RESPONSE;
case UNSUBSCRIBE_REQUEST:
return Command.UNSUBSCRIBE_RESPONSE;
case LISTEN_REQUEST:
return Command.LISTEN_RESPONSE;
case CLIENT_GOODBYE_REQUEST:
return Command.CLIENT_GOODBYE_RESPONSE;
case REQUEST_TO_SERVER:
return Command.RESPONSE_TO_CLIENT;
case ASYNC_MESSAGE_TO_SERVER:
return Command.ASYNC_MESSAGE_TO_SERVER_ACK;
case BROADCAST_MESSAGE_TO_SERVER:
return Command.BROADCAST_MESSAGE_TO_SERVER_ACK;
default:
return cmd;
}
}

Expand Down

0 comments on commit bab7c3c

Please sign in to comment.