From d638ec44b83f0f6b79e74077b5c24b63c57f30bf Mon Sep 17 00:00:00 2001 From: j00441484 Date: Mon, 10 May 2021 17:30:47 -0400 Subject: [PATCH 1/8] [Issue #337] Fix HttpSubscriber startup issue --- .../eventmesh/http/demo/sub/service/SubService.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index e20fea98be..84432ca003 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -75,12 +75,15 @@ public void afterPropertiesSet() throws Exception { })); Thread stopThread = new Thread(() -> { + try { + Thread.sleep(5 * 60 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } logger.info("stopThread start...."); System.exit(0); }); - Thread.sleep(5 * 60 * 1000); - -// stopThread.start(); + stopThread.start(); } } From 5ebfb54ece6b2020fb4d1b5fe2be26a3a2a6bbd8 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Mon, 10 May 2021 17:46:45 -0400 Subject: [PATCH 2/8] [Issue #337] test commit --- .../org/apache/eventmesh/http/demo/sub/service/SubService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 84432ca003..c75cef7c51 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -84,6 +84,6 @@ public void afterPropertiesSet() throws Exception { System.exit(0); }); - stopThread.start(); + // stopThread.start(); } } From a3afff3582a3a0e9baf600276116f34d9e95e191 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Mon, 10 May 2021 17:49:04 -0400 Subject: [PATCH 3/8] [Issue #337] revert test commit --- .../org/apache/eventmesh/http/demo/sub/service/SubService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index c75cef7c51..84432ca003 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -84,6 +84,6 @@ public void afterPropertiesSet() throws Exception { System.exit(0); }); - // stopThread.start(); + stopThread.start(); } } From 50f959d6c7416da01d51da0b33e8c9b47cee0611 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Tue, 11 May 2021 11:40:24 -0400 Subject: [PATCH 4/8] [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook --- .../http/demo/AsyncPublishInstance.java | 5 +- .../demo/sub/controller/SubController.java | 9 ++- .../http/demo/sub/service/SubService.java | 57 ++++++++++++------- 3 files changed, 50 insertions(+), 21 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java index b718bcc2e0..558773fc56 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java @@ -35,6 +35,9 @@ public class AsyncPublishInstance { public static Logger logger = LoggerFactory.getLogger(AsyncPublishInstance.class); + // This messageSize is also used in SubService.java (Subscriber) + public static int messageSize = 5; + public static void main(String[] args) throws Exception { Properties properties = Utils.readPropertiesFile("application.properties"); final String eventMeshIp = properties.getProperty("eventmesh.ip"); @@ -62,7 +65,7 @@ public static void main(String[] args) throws Exception { liteProducer = new LiteProducer(eventMeshClientConfig); liteProducer.start(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < messageSize; i++) { LiteMessage liteMessage = new LiteMessage(); liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30)) // .setContent("contentStr with special protocal") diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java index 8f8a7a7f9b..a3b9f4ede4 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java @@ -19,8 +19,10 @@ import com.alibaba.fastjson.JSONObject; +import org.apache.eventmesh.http.demo.sub.service.SubService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -33,12 +35,17 @@ public class SubController { public static Logger logger = LoggerFactory.getLogger(SubController.class); + @Autowired + private SubService subService; + @RequestMapping(value = "/test", method = RequestMethod.POST) public String subTest(@RequestBody String message) { logger.info("=======receive message======= {}", JSONObject.toJSONString(message)); JSONObject result = new JSONObject(); result.put("retCode", 1); - return result.toJSONString(); + String strResult = result.toJSONString(); + subService.consumeMessage(strResult); + return strResult; } } diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 84432ca003..8fb4746cef 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -3,18 +3,22 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; - +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.client.http.conf.LiteClientConfig; import org.apache.eventmesh.client.http.consumer.LiteConsumer; import org.apache.eventmesh.common.EventMeshException; import org.apache.eventmesh.common.IPUtil; import org.apache.eventmesh.common.ThreadUtil; +import org.apache.eventmesh.http.demo.AsyncPublishInstance; import org.apache.eventmesh.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; @Component public class SubService implements InitializingBean { @@ -38,6 +42,11 @@ public class SubService implements InitializingBean { final String dcn = "FT0"; final String subsys = "1234"; + // CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher) + private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize); + + private ExecutorService executorService = Executors.newFixedThreadPool(5); + @Override public void afterPropertiesSet() throws Exception { @@ -59,31 +68,41 @@ public void afterPropertiesSet() throws Exception { liteConsumer.heartBeat(topicList, url); liteConsumer.subscribe(topicList, url); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("start destory ...."); - try { - liteConsumer.unsubscribe(topicList, url); - } catch (EventMeshException e) { - e.printStackTrace(); - } + // Wait for all messaged to be consumed + executorService.submit(() ->{ try { - liteConsumer.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - logger.info("end destory."); - })); - - Thread stopThread = new Thread(() -> { - try { - Thread.sleep(5 * 60 * 1000); + countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("stopThread start...."); System.exit(0); }); + } + + @PreDestroy + public void cleanup() { + logger.info("start destory ...."); + try { + liteConsumer.unsubscribe(topicList, url); + } catch (EventMeshException e) { + e.printStackTrace(); + } + try { + liteConsumer.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + executorService.shutdown(); + logger.info("end destory."); + } - stopThread.start(); + /** + * Count the message already consumed + */ + public void consumeMessage(String msg) { + logger.info("consume message {}", msg); + countDownLatch.countDown(); + logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount()); } } From d48ead5aac61d602880a6d8715e7bc131c908bf8 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Tue, 11 May 2021 12:11:20 -0400 Subject: [PATCH 5/8] [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook --- .../http/demo/sub/service/SubService.java | 57 +++++++++++++------ 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 84432ca003..213ea40078 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -3,6 +3,9 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.client.http.conf.LiteClientConfig; @@ -10,12 +13,15 @@ import org.apache.eventmesh.common.EventMeshException; import org.apache.eventmesh.common.IPUtil; import org.apache.eventmesh.common.ThreadUtil; +import org.apache.eventmesh.http.demo.AsyncPublishInstance; import org.apache.eventmesh.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; + @Component public class SubService implements InitializingBean { @@ -38,6 +44,11 @@ public class SubService implements InitializingBean { final String dcn = "FT0"; final String subsys = "1234"; + // CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher) + private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize); + + private ExecutorService executorService = Executors.newFixedThreadPool(5); + @Override public void afterPropertiesSet() throws Exception { @@ -59,31 +70,41 @@ public void afterPropertiesSet() throws Exception { liteConsumer.heartBeat(topicList, url); liteConsumer.subscribe(topicList, url); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("start destory ...."); - try { - liteConsumer.unsubscribe(topicList, url); - } catch (EventMeshException e) { - e.printStackTrace(); - } + // Wait for all messaged to be consumed + executorService.submit(() ->{ try { - liteConsumer.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - logger.info("end destory."); - })); - - Thread stopThread = new Thread(() -> { - try { - Thread.sleep(5 * 60 * 1000); + countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("stopThread start...."); System.exit(0); }); + } + + @PreDestroy + public void cleanup() { + logger.info("start destory ...."); + try { + liteConsumer.unsubscribe(topicList, url); + } catch (EventMeshException e) { + e.printStackTrace(); + } + try { + liteConsumer.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + executorService.shutdown(); + logger.info("end destory."); + } - stopThread.start(); + /** + * Count the message already consumed + */ + public void consumeMessage(String msg) { + logger.info("consume message {}", msg); + countDownLatch.countDown(); + logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount()); } } From c9021fea94148c118ab27608fc8d34404a0c9a3f Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Wed, 12 May 2021 11:25:11 -0400 Subject: [PATCH 6/8] [Issue #337] Address code review comment for Subscriber Demo App --- .../http/demo/sub/controller/SubController.java | 6 +++--- .../eventmesh/http/demo/sub/service/SubService.java | 10 ++-------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java index a3b9f4ede4..92ca09d700 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java @@ -41,11 +41,11 @@ public class SubController { @RequestMapping(value = "/test", method = RequestMethod.POST) public String subTest(@RequestBody String message) { logger.info("=======receive message======= {}", JSONObject.toJSONString(message)); + subService.consumeMessage(message); + JSONObject result = new JSONObject(); result.put("retCode", 1); - String strResult = result.toJSONString(); - subService.consumeMessage(strResult); - return strResult; + return result.toJSONString(); } } diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 213ea40078..9a51e4d2fa 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -4,9 +4,6 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.client.http.conf.LiteClientConfig; import org.apache.eventmesh.client.http.consumer.LiteConsumer; @@ -19,7 +16,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; - import javax.annotation.PreDestroy; @Component @@ -47,8 +43,6 @@ public class SubService implements InitializingBean { // CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher) private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize); - private ExecutorService executorService = Executors.newFixedThreadPool(5); - @Override public void afterPropertiesSet() throws Exception { @@ -71,7 +65,7 @@ public void afterPropertiesSet() throws Exception { liteConsumer.subscribe(topicList, url); // Wait for all messaged to be consumed - executorService.submit(() ->{ + Thread stopThread = new Thread(() -> { try { countDownLatch.await(); } catch (InterruptedException e) { @@ -80,6 +74,7 @@ public void afterPropertiesSet() throws Exception { logger.info("stopThread start...."); System.exit(0); }); + stopThread.start(); } @PreDestroy @@ -95,7 +90,6 @@ public void cleanup() { } catch (Exception e) { e.printStackTrace(); } - executorService.shutdown(); logger.info("end destory."); } From 498327d21b8a3cb0e68fa6c5d3078c753193353a Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Fri, 17 Sep 2021 09:52:35 -0400 Subject: [PATCH 7/8] Issue #523 adding FlowControl Ratelimiter support for Http message send processors --- .../http/common/EventMeshRetCode.java | 3 +- eventmesh-runtime/conf/eventmesh.properties | 1 + .../runtime/boot/EventMeshHTTPServer.java | 39 +++++++++++-------- .../EventMeshHTTPConfiguration.java | 22 +++++++---- .../processor/BatchSendMessageProcessor.java | 15 ++++--- .../BatchSendMessageV2Processor.java | 7 ++-- .../http/processor/ReplyMessageProcessor.java | 19 +++++++-- .../processor/SendAsyncMessageProcessor.java | 15 ++++++- .../processor/SendSyncMessageProcessor.java | 19 ++++++--- 9 files changed, 93 insertions(+), 47 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java index 7795507cc8..11f036dfbb 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java @@ -39,7 +39,8 @@ public enum EventMeshRetCode { EVENTMESH_SUBSCRIBE_ERR(17, "eventMesh subscribe err"), EVENTMESH_UNSUBSCRIBE_ERR(18, "eventMesh unsubscribe err"), EVENTMESH_HEARTBEAT_ERR(19, "eventMesh heartbeat err"), - EVENTMESH_ACL_ERR(20, "eventMesh acl err"); + EVENTMESH_ACL_ERR(20, "eventMesh acl err"), + EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR(20, "eventMesh http msg send over the limit, "); private Integer retCode; diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index cf09fb7ca9..0e42575b5c 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -36,6 +36,7 @@ eventMesh.server.tcp.RebalanceIntervalInMills=30000 eventMesh.server.session.expiredInMills=60000 # flow control, include the global level and session level eventMesh.server.tcp.msgReqnumPerSecond=15000 +eventMesh.server.http.msgReqnumPerSecond=15000 eventMesh.server.session.upstreamBufferSize=20 # thread number about global scheduler diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java index ba1c170485..f5055ba4af 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java @@ -17,35 +17,27 @@ package org.apache.eventmesh.runtime.boot; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; - import com.google.common.eventbus.EventBus; - +import com.google.common.util.concurrent.RateLimiter; import org.apache.eventmesh.common.ThreadPoolFactory; import org.apache.eventmesh.common.protocol.http.common.RequestCode; import org.apache.eventmesh.runtime.common.ServiceState; import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager; -import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageV2Processor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.HeartBeatProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.ReplyMessageProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncMessageProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.SendSyncMessageProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.SubscribeProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProcessor; +import org.apache.eventmesh.runtime.core.protocol.http.processor.*; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client; import org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerManager; import org.apache.eventmesh.runtime.core.protocol.http.push.AbstractHTTPPushRequest; import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer; import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; + public class EventMeshHTTPServer extends AbstractHTTPServer { private EventMeshServer eventMeshServer; @@ -89,6 +81,10 @@ public EventMeshServer getEventMeshServer() { public ThreadPoolExecutor adminExecutor; + private RateLimiter msgRateLimiter; + + private RateLimiter batchRateLimiter; + public void shutdownThreadPool() throws Exception { batchMsgExecutor.shutdown(); adminExecutor.shutdown(); @@ -149,12 +145,23 @@ public ThreadPoolExecutor getAdminExecutor() { return adminExecutor; } + public RateLimiter getMsgRateLimiter() { + return msgRateLimiter; + } + + public RateLimiter getBatchRateLimiter() { + return batchRateLimiter; + } + public void init() throws Exception { logger.info("==================EventMeshHTTPServer Initialing=================="); super.init("eventMesh-http"); initThreadPool(); + msgRateLimiter = RateLimiter.create(eventMeshHttpConfiguration.eventMeshHttpMsgReqNumPerSecond); + batchRateLimiter = RateLimiter.create(eventMeshHttpConfiguration.eventMeshBatchMsgRequestNumPerSecond); + metrics = new HTTPMetricsServer(this); metrics.init(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java index ffa6cbd9b2..05511b0368 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java @@ -18,8 +18,6 @@ package org.apache.eventmesh.runtime.configuration; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.RateLimiter; - import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.config.CommonConfiguration; import org.apache.eventmesh.common.config.ConfigurationWrapper; @@ -28,8 +26,6 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration { public int httpServerPort = 10105; - public RateLimiter eventMeshServerBatchMsgNumLimiter = RateLimiter.create(20000); - public boolean eventMeshServerBatchMsgBatchEnabled = Boolean.TRUE; public int eventMeshServerBatchMsgThreadNum = 10; @@ -68,6 +64,10 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration { public boolean eventMeshServerUseTls = false; + public int eventMeshHttpMsgReqNumPerSecond = 15000; + + public int eventMeshBatchMsgRequestNumPerSecond = 20000; + public EventMeshHTTPConfiguration(ConfigurationWrapper configurationWrapper) { super(configurationWrapper); } @@ -86,9 +86,9 @@ public void init() { eventMeshServerBatchMsgThreadNum = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerBatchMsgThreadNumStr)); } - String eventMeshServerBatchMsgNumLimiterStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_BATCHMSG_RATELIMITER); - if (StringUtils.isNotEmpty(eventMeshServerBatchMsgNumLimiterStr) && StringUtils.isNumeric(eventMeshServerBatchMsgNumLimiterStr)) { - eventMeshServerBatchMsgNumLimiter = RateLimiter.create(Double.valueOf(StringUtils.deleteWhitespace(eventMeshServerBatchMsgNumLimiterStr))); + String eventMeshServerBatchMsgReqNumPerSecondStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_BATCHMSG_REQ_NUM_PER_SECOND); + if (StringUtils.isNotEmpty(eventMeshServerBatchMsgReqNumPerSecondStr) && StringUtils.isNumeric(eventMeshServerBatchMsgReqNumPerSecondStr)) { + eventMeshBatchMsgRequestNumPerSecond = Integer.valueOf(eventMeshServerBatchMsgReqNumPerSecondStr); } String eventMeshServerBatchMsgBatchEnableStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_BATCHMSG_BATCH_ENABLED); @@ -180,6 +180,11 @@ public void init() { if (StringUtils.isNotEmpty(eventMeshServerUseTlsStr)) { eventMeshServerUseTls = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerUseTlsStr)); } + + String eventMeshHttpMsgReqNumPerSecondStr = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECOND); + if (StringUtils.isNotEmpty(eventMeshHttpMsgReqNumPerSecondStr) && StringUtils.isNumeric(eventMeshHttpMsgReqNumPerSecondStr)) { + eventMeshHttpMsgReqNumPerSecond = Integer.valueOf(eventMeshHttpMsgReqNumPerSecondStr); + } } } @@ -189,7 +194,7 @@ static class ConfKeys { public static String KEYS_EVENTMESH_BATCHMSG_THREAD_NUM = "eventMesh.server.batchmsg.threads.num"; - public static String KEYS_EVENTMESH_BATCHMSG_RATELIMITER = "eventMesh.server.batchmsg.speed.ratelimiter"; + public static String KEYS_EVENTMESH_BATCHMSG_REQ_NUM_PER_SECOND = "eventMesh.server.batchmsg.reqNumPerSecond"; public static String KEYS_EVENTMESH_BATCHMSG_BATCH_ENABLED = "eventMesh.server.batchmsg.batch.enabled"; @@ -227,5 +232,6 @@ static class ConfKeys { public static String KEY_EVENTMESH_HTTPS_ENABLED = "eventMesh.server.useTls.enabled"; + public static String KEY_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECOND = "eventMesh.server.http.msgReqnumPerSecond"; } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java index cc7ea48af9..a8b7197816 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java @@ -17,18 +17,11 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - import io.netty.channel.ChannelHandlerContext; import io.openmessaging.api.Message; import io.openmessaging.api.OnExceptionContext; import io.openmessaging.api.SendCallback; import io.openmessaging.api.SendResult; - import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; @@ -52,6 +45,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + public class BatchSendMessageProcessor implements HttpRequestProcessor { public Logger cmdLogger = LoggerFactory.getLogger("cmd"); @@ -104,7 +103,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext return; } - if (!eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerBatchMsgNumLimiter + if (!eventMeshHTTPServer.getBatchRateLimiter() .tryAcquire(Integer.valueOf(sendMessageBatchRequestBody.getSize()), EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) { responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( sendMessageBatchResponseHeader, diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java index 07a2dbeecd..10950ec7ec 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java @@ -17,14 +17,11 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.TimeUnit; - import io.netty.channel.ChannelHandlerContext; import io.openmessaging.api.Message; import io.openmessaging.api.OnExceptionContext; import io.openmessaging.api.SendCallback; import io.openmessaging.api.SendResult; - import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.IPUtil; @@ -48,6 +45,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + public class BatchSendMessageV2Processor implements HttpRequestProcessor { public Logger cmdLogger = LoggerFactory.getLogger("cmd"); @@ -122,7 +121,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext } } - if (!eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerBatchMsgNumLimiter + if (!eventMeshHTTPServer.getBatchRateLimiter() .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) { responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( sendMessageBatchV2ResponseHeader, diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java index 6b86b2ecd4..fb780dba97 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java @@ -17,19 +17,15 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.Map; - import io.netty.channel.ChannelHandlerContext; import io.openmessaging.api.Message; import io.openmessaging.api.OnExceptionContext; import io.openmessaging.api.SendCallback; import io.openmessaging.api.SendResult; - import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.IPUtil; -import org.apache.eventmesh.common.LiteMessage; import org.apache.eventmesh.common.command.HttpCommand; import org.apache.eventmesh.common.protocol.http.body.message.ReplyMessageRequestBody; import org.apache.eventmesh.common.protocol.http.body.message.ReplyMessageResponseBody; @@ -50,6 +46,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.concurrent.TimeUnit; + public class ReplyMessageProcessor implements HttpRequestProcessor { public Logger messageLogger = LoggerFactory.getLogger("message"); @@ -104,6 +103,18 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext return; } + // control flow rate limit + if (!eventMeshHTTPServer.getMsgRateLimiter() + .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) { + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + replyMessageResponseHeader, + ReplyMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg())); + eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPDiscard(); + asyncContext.onComplete(responseEventMeshCommand); + return; + } + String producerGroup = replyMessageRequestBody.getProducerGroup(); EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java index b79b587a23..d64ef6bcec 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java @@ -22,7 +22,6 @@ import io.openmessaging.api.OnExceptionContext; import io.openmessaging.api.SendCallback; import io.openmessaging.api.SendResult; - import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.IPUtil; @@ -47,6 +46,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + public class SendAsyncMessageProcessor implements HttpRequestProcessor { public Logger messageLogger = LoggerFactory.getLogger("message"); @@ -129,6 +130,18 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext } } + // control flow rate limit + if (!eventMeshHTTPServer.getMsgRateLimiter() + .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) { + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg())); + eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPDiscard(); + asyncContext.onComplete(responseEventMeshCommand); + return; + } + String producerGroup = sendMessageRequestBody.getProducerGroup(); EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java index edb1a5837b..5aa623d125 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java @@ -18,13 +18,8 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; import com.alibaba.fastjson.JSON; - import io.netty.channel.ChannelHandlerContext; import io.openmessaging.api.Message; -import io.openmessaging.api.OnExceptionContext; -import io.openmessaging.api.SendCallback; -import io.openmessaging.api.SendResult; - import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.common.Constants; @@ -51,6 +46,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + public class SendSyncMessageProcessor implements HttpRequestProcessor { public Logger messageLogger = LoggerFactory.getLogger("message"); @@ -130,6 +127,18 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext } } + // control flow rate limit + if (!eventMeshHTTPServer.getMsgRateLimiter() + .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) { + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg())); + eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPDiscard(); + asyncContext.onComplete(responseEventMeshCommand); + return; + } + String producerGroup = sendMessageRequestBody.getProducerGroup(); EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup); From 14459b00aba486af8a7999e9f4ae521eb167f2f2 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Fri, 24 Sep 2021 09:46:11 -0400 Subject: [PATCH 8/8] [Issue #523] Fixing the Eventmesh ratelimit error return code. --- .../eventmesh/common/protocol/http/common/EventMeshRetCode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java index 11f036dfbb..116b0d1670 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java @@ -40,7 +40,7 @@ public enum EventMeshRetCode { EVENTMESH_UNSUBSCRIBE_ERR(18, "eventMesh unsubscribe err"), EVENTMESH_HEARTBEAT_ERR(19, "eventMesh heartbeat err"), EVENTMESH_ACL_ERR(20, "eventMesh acl err"), - EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR(20, "eventMesh http msg send over the limit, "); + EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR(21, "eventMesh http msg send over the limit, "); private Integer retCode;