From f79c0ea005d0abb7bfcae6ff8341618c74c62ade Mon Sep 17 00:00:00 2001 From: lrhkobe <34571087+lrhkobe@users.noreply.github.com> Date: Wed, 23 Feb 2022 15:59:36 +0800 Subject: [PATCH] [Issue #780] Modify the define level of EventListener from Topic to Consumer (#781) * modify: add group field in UserAgent, delete ProducerGroup and ConsumerGroup field * modify: fix checksyle error * modify: fix checksyle error in ClientGroupWrapper.java * modify: move EventListner in the level of Consumer instead of binding with topic in EventMesh * modify: fix the eventListener level problem in grpc protocal * modify: fix the eventListener problem in test case close #780 --- .../eventmesh/api/consumer/Consumer.java | 6 +- .../rocketmq/consumer/PushConsumerImpl.java | 23 +- .../consumer/RocketMQConsumerImpl.java | 9 +- .../consumer/PushConsumerImplTest.java | 10 +- .../consumer/StandaloneConsumer.java | 13 +- .../consumer/StandaloneConsumerAdaptor.java | 99 +----- .../core/plugin/MQConsumerWrapper.java | 10 +- .../grpc/consumer/EventMeshConsumer.java | 8 +- .../http/consumer/EventMeshConsumer.java | 254 ++++++++-------- .../tcp/client/group/ClientGroupWrapper.java | 286 ++++++++++-------- .../group/ClientSessionGroupMapping.java | 2 +- .../protocol/tcp/client/session/Session.java | 4 +- 12 files changed, 334 insertions(+), 390 deletions(-) diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java index 87b3ac7bba..52b1a93d64 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java @@ -38,7 +38,11 @@ public interface Consumer extends LifeCycle { void updateOffset(List cloudEvents, AbstractContext context); - void subscribe(String topic, final EventListener listener) throws Exception; + //void subscribe(String topic, final EventListener listener) throws Exception; + + void subscribe(String topic) throws Exception; void unsubscribe(String topic); + + void registerEventListener(EventListener listener); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java index f6d6083343..71f921fddb 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java @@ -59,7 +59,7 @@ public class PushConsumerImpl { private final DefaultMQPushConsumer rocketmqPushConsumer; private final Properties properties; private AtomicBoolean started = new AtomicBoolean(false); - private final Map subscribeTable = new ConcurrentHashMap<>(); + private EventListener eventListener; private final ClientConfig clientConfig; public PushConsumerImpl(final Properties properties) { @@ -134,9 +134,7 @@ public DefaultMQPushConsumer getRocketmqPushConsumer() { return rocketmqPushConsumer; } - - public void subscribe(String topic, String subExpression, EventListener listener) { - this.subscribeTable.put(topic, listener); + public void subscribe(String topic, String subExpression) { try { this.rocketmqPushConsumer.subscribe(topic, subExpression); } catch (MQClientException e) { @@ -146,7 +144,6 @@ public void subscribe(String topic, String subExpression, EventListener listener public void unsubscribe(String topic) { - this.subscribeTable.remove(topic); try { this.rocketmqPushConsumer.unsubscribe(topic); } catch (Exception e) { @@ -197,9 +194,7 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, cloudEvent = cloudEventBuilder.build(); } - EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic()); - - if (listener == null) { + if (eventListener == null) { throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer", msg.getTopic())); } @@ -231,7 +226,7 @@ public void commit(EventMeshAction action) { eventMeshAsyncConsumeContext.setAbstractContext(context); - listener.consume(cloudEvent, eventMeshAsyncConsumeContext); + eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext); return EventMeshConsumeConcurrentlyStatus.valueOf( contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS)); @@ -270,9 +265,7 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, cloudEvent = cloudEventBuilder.build(); } - EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic()); - - if (listener == null) { + if (eventListener == null) { throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer", msg.getTopic())); } @@ -306,12 +299,14 @@ public void commit(EventMeshAction action) { eventMeshAsyncConsumeContext.setAbstractContext(context); - listener.consume(cloudEvent, eventMeshAsyncConsumeContext); + eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext); return EventMeshConsumeConcurrentlyStatus.valueOf( contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS)); } } - + public void registerEventListener(EventListener listener) { + this.eventListener = listener; + } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java index 12fc214475..eaf2a42a2d 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java @@ -70,8 +70,8 @@ public synchronized void init(Properties keyValue) throws Exception { } @Override - public void subscribe(String topic, EventListener listener) throws Exception { - pushConsumer.subscribe(topic, "*", listener); + public void subscribe(String topic) throws Exception { + pushConsumer.subscribe(topic, "*"); } @Override @@ -99,6 +99,11 @@ public void unsubscribe(String topic) { pushConsumer.unsubscribe(topic); } + @Override + public void registerEventListener(EventListener listener) { + pushConsumer.registerEventListener(listener); + } + @Override public synchronized void shutdown() { pushConsumer.shutdown(); diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java index 31bcd5f8e1..683183bc21 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java @@ -89,15 +89,7 @@ public void testConsumeMessage() { consumedMsg.setBody(testBody); consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); consumedMsg.setTopic("HELLO_QUEUE"); - consumer.subscribe("HELLO_QUEUE", "*", new EventListener() { - - @Override - public void consume(CloudEvent cloudEvent, org.apache.eventmesh.api.AsyncConsumeContext context) { - assertThat(cloudEvent.getExtension("MESSAGE_ID")).isEqualTo("NewMsgId"); - assertThat(cloudEvent.getData()).isEqualTo(testBody); - context.commit(EventMeshAction.CommitMessage); - } - }); + consumer.subscribe("HELLO_QUEUE", "*"); ((MessageListenerConcurrently) rocketmqPushConsumer .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null); diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.java index 225b208309..f56fc2b20d 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumer.java @@ -37,6 +37,8 @@ public class StandaloneConsumer implements Consumer { private StandaloneBroker standaloneBroker; + private EventListener listener; + private AtomicBoolean isStarted; private final ConcurrentHashMap subscribeTaskTable; @@ -90,10 +92,8 @@ public void updateOffset(List cloudEvents, AbstractContext context) } @Override - public void subscribe(String topic, EventListener listener) throws Exception { - if (listener == null) { - throw new IllegalArgumentException("listener cannot be null"); - } + public void subscribe(String topic) throws Exception { + if (subscribeTaskTable.containsKey(topic)) { return; } @@ -116,4 +116,9 @@ public void unsubscribe(String topic) { subscribeTaskTable.remove(topic); } } + + @Override + public void registerEventListener(EventListener listener) { + this.listener = listener; + } } \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java index c09d422ffc..d7d72578ab 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java @@ -69,8 +69,8 @@ public void updateOffset(List cloudEvents, AbstractContext context) } @Override - public void subscribe(String topic, EventListener listener) throws Exception { - consumer.subscribe(topic, listener); + public void subscribe(String topic) throws Exception { + consumer.subscribe(topic); } @Override @@ -78,95 +78,8 @@ public void unsubscribe(String topic) { consumer.unsubscribe(topic); } - //@Override - //public void init(Properties keyValue) throws Exception { - // String producerGroup = keyValue.getProperty("producerGroup"); - // - // MessagingAccessPointImpl messagingAccessPoint = new MessagingAccessPointImpl(keyValue); - // consumer = (StandaloneConsumer) messagingAccessPoint.createConsumer(keyValue); - // - //} - // - //@Override - //public void updateOffset(List msgs, AbstractContext context) { - // for(Message message : msgs) { - // consumer.updateOffset(message); - // } - //} - // - //@Override - //public void subscribe(String topic, AsyncMessageListener listener) throws Exception { - // // todo: support subExpression - // consumer.subscribe(topic, "*", listener); - //} - // - //@Override - //public void unsubscribe(String topic) { - // consumer.unsubscribe(topic); - //} - // - //@Override - //public void subscribe(String topic, String subExpression, MessageListener listener) { - // throw new UnsupportedOperationException("not supported yet"); - //} - // - //@Override - //public void subscribe(String topic, MessageSelector selector, MessageListener listener) { - // throw new UnsupportedOperationException("not supported yet"); - //} - // - //@Override - //public void subscribe(String topic, String subExpression, GenericMessageListener listener) { - // throw new UnsupportedOperationException("not supported yet"); - //} - // - //@Override - //public void subscribe(String topic, MessageSelector selector, GenericMessageListener listener) { - // throw new UnsupportedOperationException("not supported yet"); - //} - // - //@Override - //public void subscribe(String topic, String subExpression, AsyncMessageListener listener) { - // throw new UnsupportedOperationException("not supported yet"); - //} - // - //@Override - //public void subscribe(String topic, MessageSelector selector, AsyncMessageListener listener) { - // throw new UnsupportedOperationException("not supported yet"); - //} - // - //@Override - //public void subscribe(String topic, String subExpression, AsyncGenericMessageListener listener) { - // throw new UnsupportedOperationException("not supported yet"); - //} - // - //@Override - //public void subscribe(String topic, MessageSelector selector, AsyncGenericMessageListener listener) { - // throw new UnsupportedOperationException("not supported yet"); - //} - // - //@Override - //public void updateCredential(Properties credentialProperties) { - // - //} - // - //@Override - //public boolean isStarted() { - // return consumer.isStarted(); - //} - // - //@Override - //public boolean isClosed() { - // return consumer.isClosed(); - //} - // - //@Override - //public void start() { - // consumer.start(); - //} - // - //@Override - //public void shutdown() { - // consumer.shutdown(); - //} + @Override + public void registerEventListener(EventListener listener) { + consumer.registerEventListener(listener); + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java index edce54a88f..6bd8875f89 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java @@ -44,8 +44,8 @@ public MQConsumerWrapper(String connectorPluginType) { } } - public void subscribe(String topic, EventListener listener) throws Exception { - meshMQPushConsumer.subscribe(topic, listener); + public void subscribe(String topic) throws Exception { + meshMQPushConsumer.subscribe(topic); } public void unsubscribe(String topic) throws Exception { @@ -69,9 +69,9 @@ public synchronized void shutdown() throws Exception { started.compareAndSet(false, true); } - //public void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently) { - // meshMQPushConsumer.registerMessageListener(messageListenerConcurrently); - //} + public void registerEventListener(EventListener listener) { + meshMQPushConsumer.registerEventListener(listener); + } public void updateOffset(List events, AbstractContext eventMeshConsumeConcurrentlyContext) { meshMQPushConsumer.updateOffset(events, eventMeshConsumeConcurrentlyContext); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java index 6bf69627c3..6324619e9a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java @@ -140,6 +140,8 @@ public synchronized void init() throws Exception { keyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroup, eventMeshGrpcConfiguration.eventMeshCluster)); persistentMqConsumer.init(keyValue); + EventListener clusterEventListner = createEventListener(SubscriptionMode.CLUSTERING); + persistentMqConsumer.registerEventListener(clusterEventListner); Properties broadcastKeyValue = new Properties(); broadcastKeyValue.put("isBroadcast", "true"); @@ -148,6 +150,8 @@ public synchronized void init() throws Exception { broadcastKeyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroup, eventMeshGrpcConfiguration.eventMeshCluster)); broadcastMqConsumer.init(broadcastKeyValue); + EventListener broadcastEventListner = createEventListener(SubscriptionMode.BROADCASTING); + broadcastMqConsumer.registerEventListener(broadcastEventListner); serviceState = ServiceState.INITED; logger.info("EventMeshConsumer [{}] initialized.............", consumerGroup); @@ -184,9 +188,9 @@ public ServiceState getStatus() { public void subscribe(String topic, SubscriptionMode subscriptionMode) throws Exception { if (SubscriptionMode.CLUSTERING.equals(subscriptionMode)) { - persistentMqConsumer.subscribe(topic, createEventListener(subscriptionMode)); + persistentMqConsumer.subscribe(topic); } else if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) { - broadcastMqConsumer.subscribe(topic, createEventListener(subscriptionMode)); + broadcastMqConsumer.subscribe(topic); } else { logger.error("Subscribe Failed. Incorrect Subscription Mode"); throw new Exception("Subscribe Failed. Incorrect Subscription Mode"); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java index 96398d46fe..b4e5386604 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java @@ -92,7 +92,69 @@ public synchronized void init() throws Exception { eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster)); persistentMqConsumer.init(keyValue); - // + EventListener cluserEventListener = new EventListener() { + @Override + public void consume(CloudEvent event, AsyncConsumeContext context) { + String topic = event.getSubject(); + //String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION); + String bizSeqNo = (String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS); + String uniqueId = (String) event.getExtension(Constants.RMB_UNIQ_ID); + + event = CloudEventBuilder.from(event) + .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())) + .build(); + //message.getUserProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())); + if (messageLogger.isDebugEnabled()) { + messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event); + } else { + messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId); + } + + ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), + topic, null); + EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context; + + if (currentTopicConfig == null) { + logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic); + try { + sendMessageBack(event, uniqueId, bizSeqNo); + //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, + // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); + //context.ack(); + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + return; + } catch (Exception ex) { + //ignore + } + } + + SubscriptionItem subscriptionItem = consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem(); + HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), + consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this, + topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), + consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); + + if (httpMessageHandler.handle(handleMsgContext)) { + //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, + // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name()); + //context.ack(); + eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); + } else { + try { + sendMessageBack(event, uniqueId, bizSeqNo); + } catch (Exception e) { + //ignore + } + //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, + // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); + //context.ack(); + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + } + } + }; + persistentMqConsumer.registerEventListener(cluserEventListener); + + //broacast consumer Properties broadcastKeyValue = new Properties(); broadcastKeyValue.put("isBroadcast", "true"); broadcastKeyValue.put("consumerGroup", consumerGroupConf.getConsumerGroup()); @@ -100,6 +162,72 @@ public synchronized void init() throws Exception { broadcastKeyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster)); broadcastMqConsumer.init(broadcastKeyValue); + + EventListener broadcastEventListener = new EventListener() { + @Override + public void consume(CloudEvent event, AsyncConsumeContext context) { + + event = CloudEventBuilder.from(event) + .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, + String.valueOf(System.currentTimeMillis())) + .build(); + + String topic = event.getSubject(); + String bizSeqNo = event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS).toString(); + String uniqueId = event.getExtension(Constants.RMB_UNIQ_ID).toString(); + + if (messageLogger.isDebugEnabled()) { + messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event); + } else { + messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, + uniqueId); + } + + ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject( + consumerGroupConf.getConsumerGroupTopicConf(), topic, null); + EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context; + + if (currentTopicConfig == null) { + logger.error("no topicConfig found, consumerGroup:{} topic:{}", + consumerGroupConf.getConsumerGroup(), topic); + try { + sendMessageBack(event, uniqueId, bizSeqNo); + //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, + // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); + //context.ack(); + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + return; + } catch (Exception ex) { + //ignore + } + } + + SubscriptionItem subscriptionItem = consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem(); + HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), + consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this, + topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), + consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); + + if (httpMessageHandler.handle(handleMsgContext)) { + //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, + // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name()); + //context.ack(); + eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); + } else { + try { + sendMessageBack(event, uniqueId, bizSeqNo); + } catch (Exception e) { + //ignore + } + //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, + // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); + //context.ack(); + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + } + } + }; + broadcastMqConsumer.registerEventListener(broadcastEventListener); + inited4Persistent.compareAndSet(false, true); inited4Broadcast.compareAndSet(false, true); logger.info("EventMeshConsumer [{}] inited.............", consumerGroupConf.getConsumerGroup()); @@ -113,130 +241,10 @@ public synchronized void start() throws Exception { } public void subscribe(String topic, SubscriptionItem subscriptionItem) throws Exception { - EventListener listener = null; if (!SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) { - listener = new EventListener() { - @Override - public void consume(CloudEvent event, AsyncConsumeContext context) { - String topic = event.getSubject(); - //String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION); - String bizSeqNo = (String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS); - String uniqueId = (String) event.getExtension(Constants.RMB_UNIQ_ID); - - event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())) - .build(); - //message.getUserProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())); - if (messageLogger.isDebugEnabled()) { - messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event); - } else { - messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId); - } - - ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), - topic, null); - EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context; - - if (currentTopicConfig == null) { - logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic); - try { - sendMessageBack(event, uniqueId, bizSeqNo); - //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); - //context.ack(); - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); - return; - } catch (Exception ex) { - //ignore - } - } - HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), - consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this, - topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), - consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); - - if (httpMessageHandler.handle(handleMsgContext)) { - //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name()); - //context.ack(); - eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); - } else { - try { - sendMessageBack(event, uniqueId, bizSeqNo); - } catch (Exception e) { - //ignore - } - //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); - //context.ack(); - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); - } - } - }; - persistentMqConsumer.subscribe(topic, listener); + persistentMqConsumer.subscribe(topic); } else { - listener = new EventListener() { - @Override - public void consume(CloudEvent event, AsyncConsumeContext context) { - - event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, - String.valueOf(System.currentTimeMillis())) - .build(); - - String topic = event.getSubject(); - String bizSeqNo = event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS).toString(); - String uniqueId = event.getExtension(Constants.RMB_UNIQ_ID).toString(); - - if (messageLogger.isDebugEnabled()) { - messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event); - } else { - messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, - uniqueId); - } - - ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject( - consumerGroupConf.getConsumerGroupTopicConf(), topic, null); - EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context; - - if (currentTopicConfig == null) { - logger.error("no topicConfig found, consumerGroup:{} topic:{}", - consumerGroupConf.getConsumerGroup(), topic); - try { - sendMessageBack(event, uniqueId, bizSeqNo); - //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); - //context.ack(); - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); - return; - } catch (Exception ex) { - //ignore - } - } - HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), - consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this, - topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), - consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); - - if (httpMessageHandler.handle(handleMsgContext)) { - //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name()); - //context.ack(); - eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); - } else { - try { - sendMessageBack(event, uniqueId, bizSeqNo); - } catch (Exception e) { - //ignore - } - //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); - //context.ack(); - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); - } - } - }; - broadcastMqConsumer.subscribe(topic, listener); + broadcastMqConsumer.subscribe(topic); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java index 7d45219268..e6937f14b7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.tcp.client.group; -import org.apache.eventmesh.api.AsyncConsumeContext; import org.apache.eventmesh.api.EventListener; import org.apache.eventmesh.api.EventMeshAction; import org.apache.eventmesh.api.EventMeshAsyncConsumeContext; @@ -104,6 +103,8 @@ public class ClientGroupWrapper { private ConcurrentHashMap> topic2sessionInGroupMapping = new ConcurrentHashMap>(); + private ConcurrentHashMap subscriptions = new ConcurrentHashMap<>(); + public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE); private MQProducerWrapper mqProducerWrapper; @@ -180,7 +181,12 @@ public MQProducerWrapper getMqProducerWrapper() { return mqProducerWrapper; } - public boolean addSubscription(String topic, Session session) throws Exception { + public boolean addSubscription(SubscriptionItem subscriptionItem, Session session) throws Exception { + if (subscriptionItem == null) { + logger.error("addSubscription param error,subscriptionItem is null", session); + return false; + } + String topic = subscriptionItem.getTopic(); if (session == null || !StringUtils.equalsIgnoreCase(group, EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) { logger.error("addSubscription param error,topic:{},session:{}", topic, session); @@ -203,6 +209,8 @@ public boolean addSubscription(String topic, Session session) throws Exception { .warn("addSubscription fail, group:{} topic:{} client:{}", group, topic, session.getClient()); } + + subscriptions.putIfAbsent(topic, subscriptionItem); } catch (Exception e) { logger .error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e); @@ -213,7 +221,12 @@ public boolean addSubscription(String topic, Session session) throws Exception { return r; } - public boolean removeSubscription(String topic, Session session) { + public boolean removeSubscription(SubscriptionItem subscriptionItem, Session session) { + if (subscriptionItem == null) { + logger.error("addSubscription param error,subscriptionItem is null", session); + return false; + } + String topic = subscriptionItem.getTopic(); if (session == null || !StringUtils.equalsIgnoreCase(group, EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) { @@ -238,6 +251,7 @@ public boolean removeSubscription(String topic, Session session) { } if (CollectionUtils.size(topic2sessionInGroupMapping.get(topic)) == 0) { topic2sessionInGroupMapping.remove(topic); + subscriptions.remove(topic); logger.info("removeSubscription remove topic success, group:{} topic:{}", group, topic); } @@ -401,6 +415,79 @@ public synchronized void initClientGroupPersistentConsumer() throws Exception { persistentMsgConsumer.init(keyValue); + EventListener listener = (event, context) -> { + eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum() + .incrementAndGet(); + event = CloudEventBuilder.from(event) + .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, + String.valueOf(System.currentTimeMillis())) + .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, + eventMeshTCPConfiguration.eventMeshServerIp).build(); + String topic = event.getSubject(); + + EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = + (EventMeshAsyncConsumeContext) context; + Session session = downstreamDispatchStrategy + .select(group, topic, groupConsumerSessions); + String bizSeqNo = EventMeshUtil.getMessageBizSeq(event); + if (session == null) { + try { + Integer sendBackTimes = 0; + String sendBackFromEventMeshIp = ""; + if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension( + EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) { + sendBackTimes = (Integer) event.getExtension( + EventMeshConstants.EVENTMESH_SEND_BACK_TIMES); + } + if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension( + EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) { + sendBackFromEventMeshIp = (String) event.getExtension( + EventMeshConstants.EVENTMESH_SEND_BACK_IP); + } + + logger.error( + "found no session to downstream msg,groupName:{}, topic:{}, " + + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}", + group, topic, bizSeqNo, sendBackTimes, + sendBackFromEventMeshIp); + + if (sendBackTimes >= eventMeshTCPServer + .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) { + logger.error( + "sendBack to broker over max times:{}, groupName:{}, topic:{}, " + + "bizSeqNo:{}", eventMeshTCPServer + .getEventMeshTCPConfiguration() + .eventMeshTcpSendBackMaxTimes, + group, topic, bizSeqNo); + } else { + sendBackTimes++; + event = CloudEventBuilder.from(event) + .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, + sendBackTimes.toString()) + .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP, + eventMeshTCPConfiguration.eventMeshServerIp).build(); + sendMsgBackToBroker(event, bizSeqNo); + } + } catch (Exception e) { + logger.warn("handle msg exception when no session found", e); + } + + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + return; + } + + SubscriptionItem subscriptionItem = subscriptions.get(topic); + DownStreamMsgContext downStreamMsgContext = + new DownStreamMsgContext(event, session, persistentMsgConsumer, + eventMeshAsyncConsumeContext.getAbstractContext(), false, + subscriptionItem); + //msg put in eventmesh,waiting client ack + session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); + session.downstreamMsg(downStreamMsgContext); + eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); + }; + persistentMsgConsumer.registerEventListener(listener); + inited4Persistent.compareAndSet(false, true); logger.info("init persistentMsgConsumer success, group:{}", group); } @@ -427,6 +514,66 @@ public synchronized void initClientGroupBroadcastConsumer() throws Exception { .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster)); broadCastMsgConsumer.init(keyValue); + EventListener listener = (event, context) -> { + + eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum() + .incrementAndGet(); + event = CloudEventBuilder.from(event) + .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, + String.valueOf(System.currentTimeMillis())) + .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, + eventMeshTCPConfiguration.eventMeshServerIp).build(); + String topic = event.getSubject(); + // message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION); + //message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, + // String.valueOf(System.currentTimeMillis())); + //message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, + // eventMeshTCPConfiguration.eventMeshServerIp); + + EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = + (EventMeshAsyncConsumeContext) context; + if (CollectionUtils.isEmpty(groupConsumerSessions)) { + logger.warn("found no session to downstream broadcast msg"); + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + return; + } + + Iterator sessionsItr = groupConsumerSessions.iterator(); + + SubscriptionItem subscriptionItem = subscriptions.get(topic); + DownStreamMsgContext downStreamMsgContext = + new DownStreamMsgContext(event, null, broadCastMsgConsumer, + eventMeshAsyncConsumeContext.getAbstractContext(), false, + subscriptionItem); + + while (sessionsItr.hasNext()) { + Session session = sessionsItr.next(); + + if (!session.isAvailable(topic)) { + logger + .warn("downstream broadcast msg,session is not available,client:{}", + session.getClient()); + continue; + } + + downStreamMsgContext.session = session; + + //downstream broadcast msg asynchronously + eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService() + .submit(new Runnable() { + @Override + public void run() { + //msg put in eventmesh,waiting client ack + session.getPusher() + .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); + session.downstreamMsg(downStreamMsgContext); + } + }); + } + eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); + }; + broadCastMsgConsumer.registerEventListener(listener); + inited4Broadcast.compareAndSet(false, true); logger.info("init broadCastMsgConsumer success, group:{}", group); } @@ -441,139 +588,10 @@ public synchronized void startClientGroupBroadcastConsumer() throws Exception { } public void subscribe(SubscriptionItem subscriptionItem) throws Exception { - EventListener listener = null; if (SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) { - listener = (event, context) -> { - - eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum() - .incrementAndGet(); - event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, - String.valueOf(System.currentTimeMillis())) - .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, - eventMeshTCPConfiguration.eventMeshServerIp).build(); - String topic = event.getSubject(); - // message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION); - //message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, - // String.valueOf(System.currentTimeMillis())); - //message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, - // eventMeshTCPConfiguration.eventMeshServerIp); - - EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = - (EventMeshAsyncConsumeContext) context; - if (CollectionUtils.isEmpty(groupConsumerSessions)) { - logger.warn("found no session to downstream broadcast msg"); - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); - return; - } - - Iterator sessionsItr = groupConsumerSessions.iterator(); - - DownStreamMsgContext downStreamMsgContext = - new DownStreamMsgContext(event, null, broadCastMsgConsumer, - eventMeshAsyncConsumeContext.getAbstractContext(), false, - subscriptionItem); - - while (sessionsItr.hasNext()) { - Session session = sessionsItr.next(); - - if (!session.isAvailable(topic)) { - logger - .warn("downstream broadcast msg,session is not available,client:{}", - session.getClient()); - continue; - } - - downStreamMsgContext.session = session; - - //downstream broadcast msg asynchronously - eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService() - .submit(new Runnable() { - @Override - public void run() { - //msg put in eventmesh,waiting client ack - session.getPusher() - .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); - session.downstreamMsg(downStreamMsgContext); - } - }); - } - - eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); - }; - broadCastMsgConsumer.subscribe(subscriptionItem.getTopic(), listener); + broadCastMsgConsumer.subscribe(subscriptionItem.getTopic()); } else { - listener = (event, context) -> { - eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum() - .incrementAndGet(); - event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, - String.valueOf(System.currentTimeMillis())) - .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, - eventMeshTCPConfiguration.eventMeshServerIp).build(); - String topic = event.getSubject(); - - EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = - (EventMeshAsyncConsumeContext) context; - Session session = downstreamDispatchStrategy - .select(group, topic, groupConsumerSessions); - String bizSeqNo = EventMeshUtil.getMessageBizSeq(event); - if (session == null) { - try { - Integer sendBackTimes = 0; - String sendBackFromEventMeshIp = ""; - if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension( - EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) { - sendBackTimes = (Integer) event.getExtension( - EventMeshConstants.EVENTMESH_SEND_BACK_TIMES); - } - if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension( - EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) { - sendBackFromEventMeshIp = (String) event.getExtension( - EventMeshConstants.EVENTMESH_SEND_BACK_IP); - } - - logger.error( - "found no session to downstream msg,groupName:{}, topic:{}, " - + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}", - group, topic, bizSeqNo, sendBackTimes, - sendBackFromEventMeshIp); - - if (sendBackTimes >= eventMeshTCPServer - .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) { - logger.error( - "sendBack to broker over max times:{}, groupName:{}, topic:{}, " - + "bizSeqNo:{}", eventMeshTCPServer - .getEventMeshTCPConfiguration() - .eventMeshTcpSendBackMaxTimes, - group, topic, bizSeqNo); - } else { - sendBackTimes++; - event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, - sendBackTimes.toString()) - .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP, - eventMeshTCPConfiguration.eventMeshServerIp).build(); - sendMsgBackToBroker(event, bizSeqNo); - } - } catch (Exception e) { - logger.warn("handle msg exception when no session found", e); - } - - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); - return; - } - - DownStreamMsgContext downStreamMsgContext = - new DownStreamMsgContext(event, session, persistentMsgConsumer, - eventMeshAsyncConsumeContext.getAbstractContext(), false, - subscriptionItem); - //msg put in eventmesh,waiting client ack - session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); - session.downstreamMsg(downStreamMsgContext); - eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); - }; - persistentMsgConsumer.subscribe(subscriptionItem.getTopic(), listener); + persistentMsgConsumer.subscribe(subscriptionItem.getTopic()); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java index 003186e3eb..2ba749f222 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java @@ -283,7 +283,7 @@ private void cleanClientGroupWrapperByClosePub(Session session) throws Exception */ private void cleanSubscriptionInSession(Session session) throws Exception { for (SubscriptionItem item : session.getSessionContext().subscribeTopics.values()) { - session.getClientGroupWrapper().get().removeSubscription(item.getTopic(), session); + session.getClientGroupWrapper().get().removeSubscription(item, session); if (!session.getClientGroupWrapper().get().hasSubscription(item.getTopic())) { session.getClientGroupWrapper().get().unsubscribe(item); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java index 37e7b5e44e..09b73966fb 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java @@ -171,7 +171,7 @@ public void subscribe(List items) throws Exception { clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().checkTopicExist(item.getTopic()); - clientGroupWrapper.get().addSubscription(item.getTopic(), this); + clientGroupWrapper.get().addSubscription(item, this); subscribeLogger.info("subscribe|succeed|topic={}|user={}", item.getTopic(), client); } } @@ -179,7 +179,7 @@ public void subscribe(List items) throws Exception { public void unsubscribe(List items) throws Exception { for (SubscriptionItem item : items) { sessionContext.subscribeTopics.remove(item.getTopic()); - clientGroupWrapper.get().removeSubscription(item.getTopic(), this); + clientGroupWrapper.get().removeSubscription(item, this); if (!clientGroupWrapper.get().hasSubscription(item.getTopic())) { clientGroupWrapper.get().unsubscribe(item);