Skip to content

Commit

Permalink
[Issue #780] Modify the define level of EventListener from Topic to C…
Browse files Browse the repository at this point in the history
…onsumer (#781)

* modify: add group field in UserAgent, delete ProducerGroup and ConsumerGroup field

* modify: fix checksyle error

* modify: fix checksyle error in ClientGroupWrapper.java

* modify: move EventListner in the level of Consumer instead of binding with topic in EventMesh

* modify: fix the eventListener level problem in grpc protocal

* modify: fix the eventListener problem in test case

close #780
  • Loading branch information
lrhkobe authored Feb 23, 2022
1 parent 779c7fb commit f79c0ea
Show file tree
Hide file tree
Showing 12 changed files with 334 additions and 390 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ public interface Consumer extends LifeCycle {

void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context);

void subscribe(String topic, final EventListener listener) throws Exception;
//void subscribe(String topic, final EventListener listener) throws Exception;

void subscribe(String topic) throws Exception;

void unsubscribe(String topic);

void registerEventListener(EventListener listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class PushConsumerImpl {
private final DefaultMQPushConsumer rocketmqPushConsumer;
private final Properties properties;
private AtomicBoolean started = new AtomicBoolean(false);
private final Map<String, EventListener> subscribeTable = new ConcurrentHashMap<>();
private EventListener eventListener;
private final ClientConfig clientConfig;

public PushConsumerImpl(final Properties properties) {
Expand Down Expand Up @@ -134,9 +134,7 @@ public DefaultMQPushConsumer getRocketmqPushConsumer() {
return rocketmqPushConsumer;
}


public void subscribe(String topic, String subExpression, EventListener listener) {
this.subscribeTable.put(topic, listener);
public void subscribe(String topic, String subExpression) {
try {
this.rocketmqPushConsumer.subscribe(topic, subExpression);
} catch (MQClientException e) {
Expand All @@ -146,7 +144,6 @@ public void subscribe(String topic, String subExpression, EventListener listener


public void unsubscribe(String topic) {
this.subscribeTable.remove(topic);
try {
this.rocketmqPushConsumer.unsubscribe(topic);
} catch (Exception e) {
Expand Down Expand Up @@ -197,9 +194,7 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg,
cloudEvent = cloudEventBuilder.build();
}

EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic());

if (listener == null) {
if (eventListener == null) {
throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer",
msg.getTopic()));
}
Expand Down Expand Up @@ -231,7 +226,7 @@ public void commit(EventMeshAction action) {

eventMeshAsyncConsumeContext.setAbstractContext(context);

listener.consume(cloudEvent, eventMeshAsyncConsumeContext);
eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);

return EventMeshConsumeConcurrentlyStatus.valueOf(
contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
Expand Down Expand Up @@ -270,9 +265,7 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg,
cloudEvent = cloudEventBuilder.build();
}

EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic());

if (listener == null) {
if (eventListener == null) {
throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer",
msg.getTopic()));
}
Expand Down Expand Up @@ -306,12 +299,14 @@ public void commit(EventMeshAction action) {

eventMeshAsyncConsumeContext.setAbstractContext(context);

listener.consume(cloudEvent, eventMeshAsyncConsumeContext);
eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);

return EventMeshConsumeConcurrentlyStatus.valueOf(
contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
}
}


public void registerEventListener(EventListener listener) {
this.eventListener = listener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public synchronized void init(Properties keyValue) throws Exception {
}

@Override
public void subscribe(String topic, EventListener listener) throws Exception {
pushConsumer.subscribe(topic, "*", listener);
public void subscribe(String topic) throws Exception {
pushConsumer.subscribe(topic, "*");
}

@Override
Expand Down Expand Up @@ -99,6 +99,11 @@ public void unsubscribe(String topic) {
pushConsumer.unsubscribe(topic);
}

@Override
public void registerEventListener(EventListener listener) {
pushConsumer.registerEventListener(listener);
}

@Override
public synchronized void shutdown() {
pushConsumer.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,7 @@ public void testConsumeMessage() {
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE");
consumer.subscribe("HELLO_QUEUE", "*", new EventListener() {

@Override
public void consume(CloudEvent cloudEvent, org.apache.eventmesh.api.AsyncConsumeContext context) {
assertThat(cloudEvent.getExtension("MESSAGE_ID")).isEqualTo("NewMsgId");
assertThat(cloudEvent.getData()).isEqualTo(testBody);
context.commit(EventMeshAction.CommitMessage);
}
});
consumer.subscribe("HELLO_QUEUE", "*");
((MessageListenerConcurrently) rocketmqPushConsumer
.getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class StandaloneConsumer implements Consumer {

private StandaloneBroker standaloneBroker;

private EventListener listener;

private AtomicBoolean isStarted;

private final ConcurrentHashMap<String, SubScribeTask> subscribeTaskTable;
Expand Down Expand Up @@ -90,10 +92,8 @@ public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context)
}

@Override
public void subscribe(String topic, EventListener listener) throws Exception {
if (listener == null) {
throw new IllegalArgumentException("listener cannot be null");
}
public void subscribe(String topic) throws Exception {

if (subscribeTaskTable.containsKey(topic)) {
return;
}
Expand All @@ -116,4 +116,9 @@ public void unsubscribe(String topic) {
subscribeTaskTable.remove(topic);
}
}

@Override
public void registerEventListener(EventListener listener) {
this.listener = listener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,104 +69,17 @@ public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context)
}

@Override
public void subscribe(String topic, EventListener listener) throws Exception {
consumer.subscribe(topic, listener);
public void subscribe(String topic) throws Exception {
consumer.subscribe(topic);
}

@Override
public void unsubscribe(String topic) {
consumer.unsubscribe(topic);
}

//@Override
//public void init(Properties keyValue) throws Exception {
// String producerGroup = keyValue.getProperty("producerGroup");
//
// MessagingAccessPointImpl messagingAccessPoint = new MessagingAccessPointImpl(keyValue);
// consumer = (StandaloneConsumer) messagingAccessPoint.createConsumer(keyValue);
//
//}
//
//@Override
//public void updateOffset(List<Message> msgs, AbstractContext context) {
// for(Message message : msgs) {
// consumer.updateOffset(message);
// }
//}
//
//@Override
//public void subscribe(String topic, AsyncMessageListener listener) throws Exception {
// // todo: support subExpression
// consumer.subscribe(topic, "*", listener);
//}
//
//@Override
//public void unsubscribe(String topic) {
// consumer.unsubscribe(topic);
//}
//
//@Override
//public void subscribe(String topic, String subExpression, MessageListener listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public void subscribe(String topic, MessageSelector selector, MessageListener listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public <T> void subscribe(String topic, String subExpression, GenericMessageListener<T> listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public <T> void subscribe(String topic, MessageSelector selector, GenericMessageListener<T> listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public void subscribe(String topic, String subExpression, AsyncMessageListener listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public void subscribe(String topic, MessageSelector selector, AsyncMessageListener listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public <T> void subscribe(String topic, String subExpression, AsyncGenericMessageListener<T> listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public <T> void subscribe(String topic, MessageSelector selector, AsyncGenericMessageListener<T> listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public void updateCredential(Properties credentialProperties) {
//
//}
//
//@Override
//public boolean isStarted() {
// return consumer.isStarted();
//}
//
//@Override
//public boolean isClosed() {
// return consumer.isClosed();
//}
//
//@Override
//public void start() {
// consumer.start();
//}
//
//@Override
//public void shutdown() {
// consumer.shutdown();
//}
@Override
public void registerEventListener(EventListener listener) {
consumer.registerEventListener(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public MQConsumerWrapper(String connectorPluginType) {
}
}

public void subscribe(String topic, EventListener listener) throws Exception {
meshMQPushConsumer.subscribe(topic, listener);
public void subscribe(String topic) throws Exception {
meshMQPushConsumer.subscribe(topic);
}

public void unsubscribe(String topic) throws Exception {
Expand All @@ -69,9 +69,9 @@ public synchronized void shutdown() throws Exception {
started.compareAndSet(false, true);
}

//public void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently) {
// meshMQPushConsumer.registerMessageListener(messageListenerConcurrently);
//}
public void registerEventListener(EventListener listener) {
meshMQPushConsumer.registerEventListener(listener);
}

public void updateOffset(List<CloudEvent> events, AbstractContext eventMeshConsumeConcurrentlyContext) {
meshMQPushConsumer.updateOffset(events, eventMeshConsumeConcurrentlyContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ public synchronized void init() throws Exception {
keyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroup,
eventMeshGrpcConfiguration.eventMeshCluster));
persistentMqConsumer.init(keyValue);
EventListener clusterEventListner = createEventListener(SubscriptionMode.CLUSTERING);
persistentMqConsumer.registerEventListener(clusterEventListner);

Properties broadcastKeyValue = new Properties();
broadcastKeyValue.put("isBroadcast", "true");
Expand All @@ -148,6 +150,8 @@ public synchronized void init() throws Exception {
broadcastKeyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroup,
eventMeshGrpcConfiguration.eventMeshCluster));
broadcastMqConsumer.init(broadcastKeyValue);
EventListener broadcastEventListner = createEventListener(SubscriptionMode.BROADCASTING);
broadcastMqConsumer.registerEventListener(broadcastEventListner);

serviceState = ServiceState.INITED;
logger.info("EventMeshConsumer [{}] initialized.............", consumerGroup);
Expand Down Expand Up @@ -184,9 +188,9 @@ public ServiceState getStatus() {

public void subscribe(String topic, SubscriptionMode subscriptionMode) throws Exception {
if (SubscriptionMode.CLUSTERING.equals(subscriptionMode)) {
persistentMqConsumer.subscribe(topic, createEventListener(subscriptionMode));
persistentMqConsumer.subscribe(topic);
} else if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) {
broadcastMqConsumer.subscribe(topic, createEventListener(subscriptionMode));
broadcastMqConsumer.subscribe(topic);
} else {
logger.error("Subscribe Failed. Incorrect Subscription Mode");
throw new Exception("Subscribe Failed. Incorrect Subscription Mode");
Expand Down
Loading

0 comments on commit f79c0ea

Please sign in to comment.