Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #476] Biz Exceptions occured in EventMesh cause connection close of client #477

Merged
merged 20 commits into from
Aug 2, 2021
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b81d393
modify:optimize flow control in downstreaming msg
lrhkobe May 15, 2021
3f2f8b5
modify:optimize stategy of selecting session in downstream msg
lrhkobe May 17, 2021
97c6a3f
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe May 26, 2021
fda8068
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe May 28, 2021
fc3686b
modify:optimize msg downstream,msg store in session
lrhkobe May 28, 2021
cb3e551
modify:fix bug:not a @Sharable handler
lrhkobe May 28, 2021
2568539
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe May 31, 2021
4218b4f
modify:downstream broadcast msg asynchronously
lrhkobe Jun 1, 2021
8c69ea5
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Jun 16, 2021
23a7175
modify:remove unneccessary interface in eventmesh-connector-api
lrhkobe Jun 17, 2021
a478849
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Jun 21, 2021
9c7fcbc
modify:fix conflict
lrhkobe Jun 21, 2021
4ee468b
modify:add license in EventMeshAction
lrhkobe Jun 21, 2021
8936bd2
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Jun 24, 2021
fea8a57
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Jul 23, 2021
368c478
modify:fix ack problem
lrhkobe Jul 23, 2021
20efba7
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Jul 29, 2021
35a73b3
modify:fix exception handle when exception occured in EventMeshTcpMes…
lrhkobe Jul 29, 2021
7594503
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Jul 29, 2021
5e3f88f
modify:fix log print
lrhkobe Aug 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.SessionState;
Expand Down Expand Up @@ -76,9 +75,46 @@ protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Excep

dispatch(ctx, pkg, startTime, cmd);
} catch (Exception e) {
logger.error("exception occurred while pkg|cmd={}|pkg={}|errMsg={}", cmd, pkg, e);
//throw new RuntimeException(e);
throw e;
logger.error("exception occurred while pkg|cmd={}|pkg={}", cmd, pkg, e);
writeToClient(cmd, pkg, ctx, e);
}
}

private void writeToClient(Command cmd, Package pkg, ChannelHandlerContext ctx, Exception e){
try{
Package res = new Package();
res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(), pkg.getHeader()
.getSeq()));
ctx.writeAndFlush(res);
}catch (Exception ex){
logger.warn("writeToClient failed", ex);
}
}

private Command getReplyCommand(Command cmd){
switch (cmd) {
case HELLO_REQUEST:
return Command.HELLO_RESPONSE;
case RECOMMEND_REQUEST:
return Command.RECOMMEND_RESPONSE;
case HEARTBEAT_REQUEST:
return Command.HEARTBEAT_RESPONSE;
case SUBSCRIBE_REQUEST:
return Command.SUBSCRIBE_RESPONSE;
case UNSUBSCRIBE_REQUEST:
return Command.UNSUBSCRIBE_RESPONSE;
case LISTEN_REQUEST:
return Command.LISTEN_RESPONSE;
case CLIENT_GOODBYE_REQUEST:
return Command.CLIENT_GOODBYE_RESPONSE;
case REQUEST_TO_SERVER:
return Command.RESPONSE_TO_CLIENT;
case ASYNC_MESSAGE_TO_SERVER:
return Command.ASYNC_MESSAGE_TO_SERVER_ACK;
case BROADCAST_MESSAGE_TO_SERVER:
return Command.BROADCAST_MESSAGE_TO_SERVER_ACK;
default:
return cmd;
}
}

Expand Down