From a3949da845c8a000f225d9871f0488ce603a05ef Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Thu, 13 May 2021 23:29:22 -0400 Subject: [PATCH] [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook (#343) * [Issue #337] Fix HttpSubscriber startup issue * [Issue #337] test commit * [Issue #337] revert test commit * [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook * [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook * [Issue #337] Address code review comment for Subscriber Demo App Co-authored-by: j00441484 --- .../http/demo/AsyncPublishInstance.java | 5 +- .../demo/sub/controller/SubController.java | 7 +++ .../http/demo/sub/service/SubService.java | 51 ++++++++++++------- 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java index b718bcc2e0..558773fc56 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java @@ -35,6 +35,9 @@ public class AsyncPublishInstance { public static Logger logger = LoggerFactory.getLogger(AsyncPublishInstance.class); + // This messageSize is also used in SubService.java (Subscriber) + public static int messageSize = 5; + public static void main(String[] args) throws Exception { Properties properties = Utils.readPropertiesFile("application.properties"); final String eventMeshIp = properties.getProperty("eventmesh.ip"); @@ -62,7 +65,7 @@ public static void main(String[] args) throws Exception { liteProducer = new LiteProducer(eventMeshClientConfig); liteProducer.start(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < messageSize; i++) { LiteMessage liteMessage = new LiteMessage(); liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30)) // .setContent("contentStr with special protocal") diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java index 8f8a7a7f9b..92ca09d700 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java @@ -19,8 +19,10 @@ import com.alibaba.fastjson.JSONObject; +import org.apache.eventmesh.http.demo.sub.service.SubService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -33,9 +35,14 @@ public class SubController { public static Logger logger = LoggerFactory.getLogger(SubController.class); + @Autowired + private SubService subService; + @RequestMapping(value = "/test", method = RequestMethod.POST) public String subTest(@RequestBody String message) { logger.info("=======receive message======= {}", JSONObject.toJSONString(message)); + subService.consumeMessage(message); + JSONObject result = new JSONObject(); result.put("retCode", 1); return result.toJSONString(); diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 84432ca003..9a51e4d2fa 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -3,18 +3,20 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; - +import java.util.concurrent.CountDownLatch; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.client.http.conf.LiteClientConfig; import org.apache.eventmesh.client.http.consumer.LiteConsumer; import org.apache.eventmesh.common.EventMeshException; import org.apache.eventmesh.common.IPUtil; import org.apache.eventmesh.common.ThreadUtil; +import org.apache.eventmesh.http.demo.AsyncPublishInstance; import org.apache.eventmesh.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; @Component public class SubService implements InitializingBean { @@ -38,6 +40,9 @@ public class SubService implements InitializingBean { final String dcn = "FT0"; final String subsys = "1234"; + // CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher) + private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize); + @Override public void afterPropertiesSet() throws Exception { @@ -59,31 +64,41 @@ public void afterPropertiesSet() throws Exception { liteConsumer.heartBeat(topicList, url); liteConsumer.subscribe(topicList, url); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("start destory ...."); - try { - liteConsumer.unsubscribe(topicList, url); - } catch (EventMeshException e) { - e.printStackTrace(); - } - try { - liteConsumer.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - logger.info("end destory."); - })); - + // Wait for all messaged to be consumed Thread stopThread = new Thread(() -> { try { - Thread.sleep(5 * 60 * 1000); + countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("stopThread start...."); System.exit(0); }); + stopThread.start(); + } + + @PreDestroy + public void cleanup() { + logger.info("start destory ...."); + try { + liteConsumer.unsubscribe(topicList, url); + } catch (EventMeshException e) { + e.printStackTrace(); + } + try { + liteConsumer.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + logger.info("end destory."); + } - stopThread.start(); + /** + * Count the message already consumed + */ + public void consumeMessage(String msg) { + logger.info("consume message {}", msg); + countDownLatch.countDown(); + logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount()); } }