Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #378] downstream broadcast msg asynchronously #379

Merged
merged 8 commits into from
Jun 10, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class EventMeshTCPServer extends AbstractRemotingServer {

private ExecutorService taskHandleExecutorService;

private ExecutorService broadcastMsgDownstreamExecutorService;

public void setClientSessionGroupMapping(ClientSessionGroupMapping clientSessionGroupMapping) {
this.clientSessionGroupMapping = clientSessionGroupMapping;
}
Expand All @@ -88,6 +90,10 @@ public ExecutorService getTaskHandleExecutorService() {
return taskHandleExecutorService;
}

public ExecutorService getBroadcastMsgDownstreamExecutorService() {
return broadcastMsgDownstreamExecutorService;
}

public void setTaskHandleExecutorService(ExecutorService taskHandleExecutorService) {
this.taskHandleExecutorService = taskHandleExecutorService;
}
Expand Down Expand Up @@ -240,7 +246,8 @@ private void initThreadPool() throws Exception {
scheduler = ThreadPoolFactory.createScheduledExecutor(eventMeshTCPConfiguration.eventMeshTcpGlobalScheduler, new EventMeshThreadFactoryImpl("eventMesh-tcp-scheduler", true));

taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor(eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize, eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize, new LinkedBlockingQueue<Runnable>(10000), new EventMeshThreadFactoryImpl("eventMesh-tcp-task-handle", true));
;

broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize, eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize, new LinkedBlockingQueue<Runnable>(10000), new EventMeshThreadFactoryImpl("eventMesh-tcp-msg-downstream", true));
}

private void shutdownThreadPool() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {

public int eventMeshTcpTaskHandleExecutorPoolSize = Runtime.getRuntime().availableProcessors();

public int eventMeshTcpMsgDownStreamExecutorPoolSize = Runtime.getRuntime().availableProcessors() > 8 ? Runtime.getRuntime().availableProcessors() : 8;

public int eventMeshTcpSessionExpiredInMills = 60000;

public int eventMeshTcpSessionUpstreamBufferSize = 100;
Expand Down Expand Up @@ -128,6 +130,13 @@ public void init() {
eventMeshTcpTaskHandleExecutorPoolSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpTaskHandleExecutorPoolSizeStr));
}

String eventMeshTcpMsgDownStreamExecutorPoolSizeStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_TCP_MSG_DOWNSTREAM_POOL_SIZE);
if(StringUtils.isNotEmpty(eventMeshTcpMsgDownStreamExecutorPoolSizeStr)){
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpMsgDownStreamExecutorPoolSizeStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_TCP_MSG_DOWNSTREAM_POOL_SIZE));
eventMeshTcpMsgDownStreamExecutorPoolSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpMsgDownStreamExecutorPoolSizeStr));
}

String eventMeshTcpSessionExpiredInMillsStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME);
if (StringUtils.isNotEmpty(eventMeshTcpSessionExpiredInMillsStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpSessionExpiredInMillsStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME));
Expand Down Expand Up @@ -205,6 +214,7 @@ static class ConfKeys {
public static String KEYS_EVENTMESH_SERVER_TCP_REBALANCE_INTERVAL = "eventMesh.server.tcp.RebalanceIntervalInMills";
public static String KEYS_EVENTMESH_SERVER_GLOBAL_SCHEDULER = "eventMesh.server.global.scheduler";
public static String KEYS_EVENTMESH_SERVER_TCP_TASK_HANDLE_POOL_SIZE = "eventMesh.server.tcp.taskHandleExecutorPoolSize";
public static String KEYS_EVENTMESH_SERVER_TCP_MSG_DOWNSTREAM_POOL_SIZE = "eventMesh.server.tcp.msgDownStreamExecutorPoolSize";
public static String KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME = "eventMesh.server.session.expiredInMills";
public static String KEYS_EVENTMESH_SERVER_SESSION_UPSTREAM_BUFFER_SIZE = "eventMesh.server.session.upstreamBufferSize";
public static String KEYS_EVENTMESH_SERVER_SESSION_DOWNSTREAM_UNACK_SIZE = "eventMesh.server.session.downstreamUnackSize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,16 @@ public void consume(Message message, AsyncConsumeContext context) {
}

downStreamMsgContext.session = session;
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);

//downstream broadcast msg asynchronously
eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService().submit(new Runnable() {
@Override
public void run() {
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);
}
});
}

// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
Expand Down