Skip to content

Commit

Permalink
[ISSUE apache#391] Optimize interface design in eventmesh-connector-a…
Browse files Browse the repository at this point in the history
…pi (apache#392)

* modify:optimize flow control in downstreaming msg

* modify:optimize stategy of selecting session in downstream msg

* modify:optimize msg downstream,msg store in session

* modify:fix bug:not a @sharable handler

* modify:downstream broadcast msg asynchronously

* modify:remove unneccessary interface in eventmesh-connector-api

* modify:fix conflict

* modify:add license in EventMeshAction
close apache#391
  • Loading branch information
lrhkobe authored and jjz921024 committed Jul 25, 2021
1 parent 9000aeb commit 3555f13
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api;

import io.openmessaging.api.AsyncConsumeContext;

public abstract class MeshAsyncConsumeContext extends AsyncConsumeContext {
private AbstractContext context;
public enum EventMeshAction {
CommitMessage,

public AbstractContext getContext() {
return context;
}
ReconsumeLater,

public void setContext(AbstractContext context) {
this.context = context;
}
ManualAck
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eventmesh.api;

import io.openmessaging.api.Action;
import io.openmessaging.api.AsyncConsumeContext;

public abstract class EventMeshAsyncConsumeContext extends AsyncConsumeContext {

private AbstractContext abstractContext;

public AbstractContext getAbstractContext() {
return abstractContext;
}

public void setAbstractContext(AbstractContext abstractContext) {
this.abstractContext = abstractContext;
}

public abstract void commit(EventMeshAction action);

@Override
public void commit(Action action) {
throw new UnsupportedOperationException("not support yet");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ public interface MeshMQPushConsumer extends Consumer {

void init(Properties keyValue) throws Exception;

@Override
void start();

// void updateOffset(List<MessageExt> msgs, ConsumeConcurrentlyContext context);

void updateOffset(List<Message> msgs, AbstractContext context);

// void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ public interface MeshMQProducer extends Producer {

void init(Properties properties) throws Exception;

@Override
void start();

void send(Message message, SendCallback sendCallback) throws Exception;

void request(Message message, SendCallback sendCallback, RRCallback rrCallback, long timeout) throws Exception;
Expand All @@ -40,12 +37,8 @@ public interface MeshMQProducer extends Producer {

boolean reply(final Message message, final SendCallback sendCallback) throws Exception;

MeshMQProducer getMeshMQProducer();

String buildMQClientId();
void checkTopicExist(String topic) throws Exception;

void setExtFields();

void getDefaultTopicRouteInfoFromNameServer(String topic, long timeout) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import io.openmessaging.api.Action;
import io.openmessaging.api.AsyncConsumeContext;
import io.openmessaging.api.AsyncGenericMessageListener;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Consumer;
Expand All @@ -32,9 +29,8 @@
import io.openmessaging.api.MessageListener;
import io.openmessaging.api.MessageSelector;
import io.openmessaging.api.exception.OMSRuntimeException;

import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.MeshAsyncConsumeContext;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;
import org.apache.eventmesh.connector.rocketmq.config.ClientConfig;
Expand Down Expand Up @@ -116,13 +112,25 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMes

final Properties contextProperties = new Properties();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
MeshAsyncConsumeContext omsContext = new MeshAsyncConsumeContext() {
EventMeshAsyncConsumeContext omsContext = new EventMeshAsyncConsumeContext() {
@Override
public void commit(Action action) {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
public void commit(EventMeshAction action) {
switch (action){
case CommitMessage:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
break;
case ReconsumeLater:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
break;
case ManualAck:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
break;
default:
break;
}
}
};
omsContext.setContext(context);
omsContext.setAbstractContext(context);
listener.consume(omsMsg, omsContext);

return EventMeshConsumeConcurrentlyStatus.valueOf(contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
Expand Down Expand Up @@ -156,14 +164,25 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMes

contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());

MeshAsyncConsumeContext omsContext = new MeshAsyncConsumeContext() {
EventMeshAsyncConsumeContext omsContext = new EventMeshAsyncConsumeContext() {
@Override
public void commit(Action action) {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
public void commit(EventMeshAction action) {
switch (action) {
case CommitMessage:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
break;
case ReconsumeLater:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
break;
case ManualAck:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
break;
default:
break;
}
}
};
omsContext.setContext(context);
omsContext.setAbstractContext(context);
listener.consume(omsMsg, omsContext);

return EventMeshConsumeConcurrentlyStatus.valueOf(contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public void start() {
producer.start();
}


@Override
public synchronized void shutdown() {
producer.shutdown();
Expand Down Expand Up @@ -115,27 +114,15 @@ public boolean reply(final Message message, final SendCallback sendCallback) thr
}

@Override
public MeshMQProducer getMeshMQProducer() {
return this;
}

@Override
public String buildMQClientId() {
return producer.getRocketmqProducer().buildMQClientId();
public void checkTopicExist(String topic) throws Exception {
this.producer.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getDefaultTopicRouteInfoFromNameServer(topic, EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}

@Override
public void setExtFields() {
producer.setExtFields();
}

@Override
public void getDefaultTopicRouteInfoFromNameServer(String topic, long timeout) throws Exception {
producer.getRocketmqProducer().getDefaultMQProducerImpl()
.getmQClientFactory().getMQClientAPIImpl().getDefaultTopicRouteInfoFromNameServer(topic,
timeout);
}

@Override
public SendResult send(Message message) {
return producer.send(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;

import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.connector.rocketmq.consumer.PushConsumerImpl;
import org.apache.eventmesh.connector.rocketmq.domain.NonStandardKeys;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
Expand Down Expand Up @@ -95,7 +97,7 @@ public void testConsumeMessage() {
public void consume(Message message, AsyncConsumeContext context) {
assertThat(message.getSystemProperties("MESSAGE_ID")).isEqualTo("NewMsgId");
assertThat(message.getBody()).isEqualTo(testBody);
context.commit(Action.CommitMessage);
((EventMeshAsyncConsumeContext)context).commit(EventMeshAction.CommitMessage);
}
});
((MessageListenerConcurrently) rocketmqPushConsumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ public void unsubscribe(String topic) throws Exception {
meshMQPushConsumer.unsubscribe(topic);
}

// public boolean isPause() {
// return meshMQPushConsumer.isPause();
// }
//
// public void pause() {
// meshMQPushConsumer.pause();
// }

public synchronized void init(Properties keyValue) throws Exception {
meshMQPushConsumer = getMeshMQPushConsumer();
if (meshMQPushConsumer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import io.openmessaging.api.Action;
import io.openmessaging.api.AsyncConsumeContext;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Message;
Expand All @@ -31,7 +30,8 @@

import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.MeshAsyncConsumeContext;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
Expand Down Expand Up @@ -125,25 +125,26 @@ public void consume(Message message, AsyncConsumeContext context) {
}

ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;

if (currentTopicConfig == null) {
logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(message, uniqueId, bizSeqNo);
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
context.commit(Action.CommitMessage);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
}
}
HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
topic, message, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);

if (httpMessageHandler.handle(handleMsgContext)) {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
// context.ack();
context.commit(Action.CommitMessage);
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
sendMessageBack(message, uniqueId, bizSeqNo);
Expand All @@ -152,7 +153,7 @@ public void consume(Message message, AsyncConsumeContext context) {
}
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
context.commit(Action.CommitMessage);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
}
};
Expand All @@ -174,25 +175,26 @@ public void consume(Message message, AsyncConsumeContext context) {
}

ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;

if (currentTopicConfig == null) {
logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(message, uniqueId, bizSeqNo);
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
context.commit(Action.CommitMessage);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
}
}
HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
topic, message, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);

if (httpMessageHandler.handle(handleMsgContext)) {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
// context.ack();
context.commit(Action.CommitMessage);
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
sendMessageBack(message, uniqueId, bizSeqNo);
Expand All @@ -201,7 +203,7 @@ public void consume(Message message, AsyncConsumeContext context) {
}
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
context.commit(Action.CommitMessage);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
omsMsg.putUserProperties("msgType", "persistent");
omsMsg.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
omsMsg.putUserProperties(Constants.RMB_UNIQ_ID, sendMessageRequestBody.getUniqueId());
omsMsg.putUserProperties("REPLY_TO", eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
// omsMsg.putUserProperties("REPLY_TO", eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().buildMQClientId());

if (messageLogger.isDebugEnabled()) {
messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", sendMessageRequestBody.getBizSeqNo(),
Expand Down
Loading

0 comments on commit 3555f13

Please sign in to comment.