diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java index 8b1b6dc695..12f37d3300 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java @@ -64,6 +64,8 @@ public class EventMeshTCPServer extends AbstractRemotingServer { private ExecutorService taskHandleExecutorService; + private ExecutorService broadcastMsgDownstreamExecutorService; + public void setClientSessionGroupMapping(ClientSessionGroupMapping clientSessionGroupMapping) { this.clientSessionGroupMapping = clientSessionGroupMapping; } @@ -88,6 +90,10 @@ public ExecutorService getTaskHandleExecutorService() { return taskHandleExecutorService; } + public ExecutorService getBroadcastMsgDownstreamExecutorService() { + return broadcastMsgDownstreamExecutorService; + } + public void setTaskHandleExecutorService(ExecutorService taskHandleExecutorService) { this.taskHandleExecutorService = taskHandleExecutorService; } @@ -240,7 +246,8 @@ private void initThreadPool() throws Exception { scheduler = ThreadPoolFactory.createScheduledExecutor(eventMeshTCPConfiguration.eventMeshTcpGlobalScheduler, new EventMeshThreadFactoryImpl("eventMesh-tcp-scheduler", true)); taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor(eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize, eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize, new LinkedBlockingQueue(10000), new EventMeshThreadFactoryImpl("eventMesh-tcp-task-handle", true)); - ; + + broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize, eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize, new LinkedBlockingQueue(10000), new EventMeshThreadFactoryImpl("eventMesh-tcp-msg-downstream", true)); } private void shutdownThreadPool() { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java index 7ecd6e5510..975edf8af9 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java @@ -49,6 +49,8 @@ public class EventMeshTCPConfiguration extends CommonConfiguration { public int eventMeshTcpTaskHandleExecutorPoolSize = Runtime.getRuntime().availableProcessors(); + public int eventMeshTcpMsgDownStreamExecutorPoolSize = Runtime.getRuntime().availableProcessors() > 8 ? Runtime.getRuntime().availableProcessors() : 8; + public int eventMeshTcpSessionExpiredInMills = 60000; public int eventMeshTcpSessionUpstreamBufferSize = 100; @@ -128,6 +130,13 @@ public void init() { eventMeshTcpTaskHandleExecutorPoolSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpTaskHandleExecutorPoolSizeStr)); } + String eventMeshTcpMsgDownStreamExecutorPoolSizeStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_TCP_MSG_DOWNSTREAM_POOL_SIZE); + if(StringUtils.isNotEmpty(eventMeshTcpMsgDownStreamExecutorPoolSizeStr)){ + Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpMsgDownStreamExecutorPoolSizeStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_TCP_MSG_DOWNSTREAM_POOL_SIZE)); + eventMeshTcpMsgDownStreamExecutorPoolSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpMsgDownStreamExecutorPoolSizeStr)); + } + String eventMeshTcpSessionExpiredInMillsStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME); if (StringUtils.isNotEmpty(eventMeshTcpSessionExpiredInMillsStr)) { Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpSessionExpiredInMillsStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME)); @@ -205,6 +214,7 @@ static class ConfKeys { public static String KEYS_EVENTMESH_SERVER_TCP_REBALANCE_INTERVAL = "eventMesh.server.tcp.RebalanceIntervalInMills"; public static String KEYS_EVENTMESH_SERVER_GLOBAL_SCHEDULER = "eventMesh.server.global.scheduler"; public static String KEYS_EVENTMESH_SERVER_TCP_TASK_HANDLE_POOL_SIZE = "eventMesh.server.tcp.taskHandleExecutorPoolSize"; + public static String KEYS_EVENTMESH_SERVER_TCP_MSG_DOWNSTREAM_POOL_SIZE = "eventMesh.server.tcp.msgDownStreamExecutorPoolSize"; public static String KEYS_EVENTMESH_SERVER_SESSION_EXPIRED_TIME = "eventMesh.server.session.expiredInMills"; public static String KEYS_EVENTMESH_SERVER_SESSION_UPSTREAM_BUFFER_SIZE = "eventMesh.server.session.upstreamBufferSize"; public static String KEYS_EVENTMESH_SERVER_SESSION_DOWNSTREAM_UNACK_SIZE = "eventMesh.server.session.downstreamUnackSize"; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java index 633dd570dd..07a58ac7fd 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java @@ -544,9 +544,16 @@ public void consume(Message message, AsyncConsumeContext context) { } downStreamMsgContext.session = session; - //msg put in eventmesh,waiting client ack - session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); - session.downstreamMsg(downStreamMsgContext); + + //downstream broadcast msg asynchronously + eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService().submit(new Runnable() { + @Override + public void run() { + //msg put in eventmesh,waiting client ack + session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); + session.downstreamMsg(downStreamMsgContext); + } + }); } // context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());