From 741cdfb16c280d28a9f872088a1a80aa56fbfa8e Mon Sep 17 00:00:00 2001 From: mike_xwm Date: Wed, 23 Jun 2021 11:25:31 +0800 Subject: [PATCH] support unsubscribe topics while delconsumer in http mode (#396) * [ISSUE #325]Update gradle configuration for publishing package to maven repository * update build.gradle * update build.gradle and gradle.properties * update build.gradle and gradle.properties for publish to maven repository * * update gradle version for instructions * fix: dist task exception * [ISSUE #329]Missing Log4j dependency * update eventmesh-runtime.png * support unsubscribe topics while delconsumer in http mode --- .../protocol/http/consumer/ConsumerGroupManager.java | 12 ++++++++++++ .../core/protocol/http/consumer/ConsumerManager.java | 4 ++++ 2 files changed, 16 insertions(+) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java index c73523bdb1..93377ac2ea 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java @@ -18,8 +18,10 @@ package org.apache.eventmesh.runtime.core.protocol.http.consumer; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf; @@ -82,4 +84,14 @@ public synchronized void refresh(ConsumerGroupConf consumerGroupConfig) throws E public ConsumerGroupConf getConsumerGroupConfig() { return consumerGroupConfig; } + + public void unsubscribe(String consumerGroup) throws Exception { + if(StringUtils.equals(consumerGroupConfig.getConsumerGroup(), consumerGroup)){ + Set topics = consumerGroupConfig.getConsumerGroupTopicConf().keySet(); + for (String topic : topics){ + ConsumerGroupTopicConf consumerGroupTopicConf = consumerGroupConfig.getConsumerGroupTopicConf().get(topic); + eventMeshConsumer.unsubscribe(topic, consumerGroupTopicConf.getSubscriptionItem().getMode()); + } + } + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java index 14486bba19..51ede0bdd8 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java @@ -238,10 +238,14 @@ public ConsumerGroupManager getConsumer(String consumerGroup) throws Exception { * @param consumerGroup */ public synchronized void delConsumer(String consumerGroup) throws Exception { + logger.info("start delConsumer with consumerGroup {}", consumerGroup); if(consumerTable.containsKey(consumerGroup)) { ConsumerGroupManager cgm = consumerTable.remove(consumerGroup); + logger.info("start unsubscribe topic with consumer group manager {}", JSONObject.toJSONString(cgm)); + cgm.unsubscribe(consumerGroup); cgm.shutdown(); } + logger.info("end delConsumer with consumerGroup {}", consumerGroup); } @Subscribe