Skip to content

Commit

Permalink
[ISSUE #378] downstream broadcast msg asynchronously (#379)
Browse files Browse the repository at this point in the history
* modify:optimize flow control in downstreaming msg

* modify:optimize stategy of selecting session in downstream msg

* modify:optimize msg downstream,msg store in session

* modify:fix bug:not a @sharable handler

* modify:downstream broadcast msg asynchronously

closed #378
  • Loading branch information
lrhkobe authored Jun 10, 2021
1 parent cfcb2a7 commit 7026d30
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class EventMeshTCPServer extends AbstractRemotingServer {

private ExecutorService taskHandleExecutorService;

private ExecutorService broadcastMsgDownstreamExecutorService;

public void setClientSessionGroupMapping(ClientSessionGroupMapping clientSessionGroupMapping) {
this.clientSessionGroupMapping = clientSessionGroupMapping;
}
Expand All @@ -88,6 +90,10 @@ public ExecutorService getTaskHandleExecutorService() {
return taskHandleExecutorService;
}

public ExecutorService getBroadcastMsgDownstreamExecutorService() {
return broadcastMsgDownstreamExecutorService;
}

public void setTaskHandleExecutorService(ExecutorService taskHandleExecutorService) {
this.taskHandleExecutorService = taskHandleExecutorService;
}
Expand Down Expand Up @@ -240,7 +246,8 @@ private void initThreadPool() throws Exception {
scheduler = ThreadPoolFactory.createScheduledExecutor(eventMeshTCPConfiguration.eventMeshTcpGlobalScheduler, new EventMeshThreadFactoryImpl("eventMesh-tcp-scheduler", true));

taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor(eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize, eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize, new LinkedBlockingQueue<Runnable>(10000), new EventMeshThreadFactoryImpl("eventMesh-tcp-task-handle", true));
;

broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize, eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize, new LinkedBlockingQueue<Runnable>(10000), new EventMeshThreadFactoryImpl("eventMesh-tcp-msg-downstream", true));
}

private void shutdownThreadPool() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {

public int eventMeshTcpTaskHandleExecutorPoolSize = Runtime.getRuntime().availableProcessors();

public int eventMeshTcpMsgDownStreamExecutorPoolSize = Runtime.getRuntime().availableProcessors() > 8 ? Runtime.getRuntime().availableProcessors() : 8;

public int eventMeshTcpSessionExpiredInMills = 60000;

public int eventMeshTcpSessionUpstreamBufferSize = 100;
Expand Down Expand Up @@ -128,6 +130,13 @@ public void init() {
eventMeshTcpTaskHandleExecutorPoolSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpTaskHandleExecutorPoolSizeStr));
}

String eventMeshTcpMsgDownStreamExecutorPoolSizeStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_TCP_MSG_DOWNSTREAM_POOL_SIZE);
if(StringUtils.isNotEmpty(eventMeshTcpMsgDownStreamExecutorPoolSizeStr)){
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpMsgDownStreamExecutorPoolSizeStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_TCP_MSG_DOWNSTREAM_POOL_SIZE));
eventMeshTcpMsgDownStreamExecutorPoolSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpMsgDownStreamExecutorPoolSizeStr));
}

String eventMeshTcpSessionExpiredInMillsStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME);
if (StringUtils.isNotEmpty(eventMeshTcpSessionExpiredInMillsStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpSessionExpiredInMillsStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME));
Expand Down Expand Up @@ -205,6 +214,7 @@ static class ConfKeys {
public static String KEYS_EVENTMESH_SERVER_TCP_REBALANCE_INTERVAL = "eventMesh.server.tcp.RebalanceIntervalInMills";
public static String KEYS_EVENTMESH_SERVER_GLOBAL_SCHEDULER = "eventMesh.server.global.scheduler";
public static String KEYS_EVENTMESH_SERVER_TCP_TASK_HANDLE_POOL_SIZE = "eventMesh.server.tcp.taskHandleExecutorPoolSize";
public static String KEYS_EVENTMESH_SERVER_TCP_MSG_DOWNSTREAM_POOL_SIZE = "eventMesh.server.tcp.msgDownStreamExecutorPoolSize";
public static String KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME = "eventMesh.server.session.expiredInMills";
public static String KEYS_EVENTMESH_SERVER_SESSION_UPSTREAM_BUFFER_SIZE = "eventMesh.server.session.upstreamBufferSize";
public static String KEYS_EVENTMESH_SERVER_SESSION_DOWNSTREAM_UNACK_SIZE = "eventMesh.server.session.downstreamUnackSize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,16 @@ public void consume(Message message, AsyncConsumeContext context) {
}

downStreamMsgContext.session = session;
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);

//downstream broadcast msg asynchronously
eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService().submit(new Runnable() {
@Override
public void run() {
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);
}
});
}

// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
Expand Down

0 comments on commit 7026d30

Please sign in to comment.