diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java index e5c3e5de0e..af222fd8c6 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java @@ -81,4 +81,6 @@ public class Constants { public static final String PROPERTY_MESSAGE_STORE_TIMESTAMP = "storetimestamp"; + public static final String MESSAGE_PROP_SEPARATOR = "99"; + } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java index a02d07428a..df42f6bd2e 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java @@ -46,13 +46,15 @@ protected boolean isContentTypeHeader(String key) { @Override protected boolean isCloudEventsHeader(String key) { - return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length()) - .startsWith(RocketMQHeaders.CE_PREFIX); +// return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length()) +// .startsWith(RocketMQHeaders.CE_PREFIX); + return true; } @Override protected String toCloudEventsKey(String key) { - return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase(); +// return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase(); + return key.toLowerCase(); } @Override diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQHeaders.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQHeaders.java index 99f5edb832..64600ae0a7 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQHeaders.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQHeaders.java @@ -27,7 +27,7 @@ public class RocketMQHeaders { public static final String CE_PREFIX = "CE_"; protected static final Map ATTRIBUTES_TO_HEADERS = - MessageUtils.generateAttributesToHeadersMapping(v -> CE_PREFIX + v); + MessageUtils.generateAttributesToHeadersMapping(v -> v); public static final String CONTENT_TYPE = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.DATACONTENTTYPE); diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java index d253069d92..d7a472a806 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java @@ -68,11 +68,12 @@ public RocketMQMessageWriter(String topic, String keys, String tags) { public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { - String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name); - if (propName == null) { - propName = RocketMQHeaders.CE_PREFIX + name; - } - message.putUserProperty(propName, value); +// String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name); +// if (propName == null) { +// propName = RocketMQHeaders.CE_PREFIX + name; +// } +// message.putUserProperty(propName, value); + message.putUserProperty(name, value); return this; } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java index 93c61a3df8..e7e5ae8e7e 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java @@ -183,24 +183,18 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, String.valueOf(msg.getStoreTimestamp())); //for rr request/reply - String cluster = msg.getProperty(MessageConst.PROPERTY_CLUSTER); - String replyClient = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT); - String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); - CloudEvent cloudEvent = RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent(); - CloudEventBuilder cloudEventBuilder; - if (StringUtils.isNotEmpty(cluster)) { - cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("cluster", cluster); - cloudEvent = cloudEventBuilder.build(); - } - if (StringUtils.isNotEmpty(replyClient)) { - cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("replytoclient", replyClient); - cloudEvent = cloudEventBuilder.build(); + CloudEventBuilder cloudEventBuilder = null; + for (String sysPropKey : MessageConst.STRING_HASH_SET) { + if (StringUtils.isNotEmpty(msg.getProperty(sysPropKey))) { + String prop = msg.getProperty(sysPropKey); + sysPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR); + cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension(sysPropKey, prop); + } } - if (StringUtils.isNotEmpty(correlationId)) { - cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("correlationid", correlationId); + if (cloudEventBuilder != null) { cloudEvent = cloudEventBuilder.build(); } @@ -259,25 +253,19 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, msg.putUserProperty(EventMeshConstants.STORE_TIMESTAMP, String.valueOf(msg.getStoreTimestamp())); - //for rr request/reply - String cluster = msg.getProperty(MessageConst.PROPERTY_CLUSTER); - String replyClient = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT); - String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); - CloudEvent cloudEvent = RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent(); - CloudEventBuilder cloudEventBuilder; - if (StringUtils.isNotEmpty(cluster)) { - cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("cluster", cluster); - cloudEvent = cloudEventBuilder.build(); - } - if (StringUtils.isNotEmpty(replyClient)) { - cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("replytoclient", replyClient); - cloudEvent = cloudEventBuilder.build(); + CloudEventBuilder cloudEventBuilder = null; + + for (String sysPropKey : MessageConst.STRING_HASH_SET) { + if (StringUtils.isNotEmpty(msg.getProperty(sysPropKey))) { + String prop = msg.getProperty(sysPropKey); + sysPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR); + cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension(sysPropKey, prop); + } } - if (StringUtils.isNotEmpty(correlationId)) { - cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("correlationid", correlationId); + if (cloudEventBuilder != null) { cloudEvent = cloudEventBuilder.build(); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java index 619fe63578..94e010bc02 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java @@ -22,6 +22,7 @@ import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.ConnectorRuntimeException; import org.apache.eventmesh.api.exception.OnExceptionContext; +import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.connector.rocketmq.cloudevent.RocketMQMessageFactory; import org.apache.eventmesh.connector.rocketmq.utils.OMSUtil; import org.apache.eventmesh.connector.rocketmq.utils.CloudEventUtils; @@ -75,6 +76,7 @@ public SendResult send(CloudEvent cloudEvent) { this.checkProducerServiceState(rocketmqProducer.getDefaultMQProducerImpl()); org.apache.rocketmq.common.message.Message msg = RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent); + msg = supplySysProp(msg, cloudEvent); String messageId = null; try { org.apache.rocketmq.client.producer.SendResult sendResultRmq = @@ -95,6 +97,7 @@ public void sendOneway(CloudEvent cloudEvent) { this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl()); org.apache.rocketmq.common.message.Message msg = RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent); + msg = supplySysProp(msg, cloudEvent); try { this.rocketmqProducer.sendOneway(msg); } catch (Exception e) { @@ -109,7 +112,7 @@ public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) { this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl()); org.apache.rocketmq.common.message.Message msg = RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent); - + msg = supplySysProp(msg, cloudEvent); try { this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback)); } catch (Exception e) { @@ -125,6 +128,9 @@ public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl()); org.apache.rocketmq.common.message.Message msg = RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent); + + msg = supplySysProp(msg, cloudEvent); + rocketmqProducer.request(msg, rrCallbackConvert(msg, rrCallback), timeout); } @@ -133,15 +139,7 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac org.apache.rocketmq.common.message.Message msg = RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG); - if (StringUtils.isNotEmpty(cloudEvent.getExtension("cluster").toString())) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CLUSTER, cloudEvent.getExtension("cluster").toString()); - } - if (StringUtils.isNotEmpty(cloudEvent.getExtension("replytoclient").toString())) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, cloudEvent.getExtension("replytoclient").toString()); - } - if (StringUtils.isNotEmpty(cloudEvent.getExtension("correlationid").toString())) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, cloudEvent.getExtension("correlationid").toString()); - } + msg = supplySysProp(msg, cloudEvent); try { this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback)); @@ -154,10 +152,30 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac } + private Message supplySysProp(Message msg, CloudEvent cloudEvent) { + for (String sysPropKey : MessageConst.STRING_HASH_SET) { + String ceKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR); + if (cloudEvent.getExtension(ceKey) != null && StringUtils.isNotEmpty(cloudEvent.getExtension(ceKey).toString())) { + MessageAccessor.putProperty(msg, sysPropKey, cloudEvent.getExtension(ceKey).toString()); + msg.getProperties().remove(ceKey); + } + } + return msg; + } + private RequestCallback rrCallbackConvert(final Message message, final RequestReplyCallback rrCallback) { return new RequestCallback() { @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) { + // clean the message property to lowercase + for (String sysPropKey : MessageConst.STRING_HASH_SET) { + if (StringUtils.isNotEmpty(message.getProperty(sysPropKey))) { + String prop = message.getProperty(sysPropKey); + String tmpPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR); + MessageAccessor.putProperty(message, tmpPropKey, prop); + message.getProperties().remove(sysPropKey); + } + } CloudEvent event = RocketMQMessageFactory.createReader(message).toEvent(); rrCallback.onSuccess(event); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java index a093265d97..80ec6f3f40 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java @@ -17,13 +17,13 @@ package org.apache.eventmesh.connector.rocketmq.utils; - import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.connector.rocketmq.cloudevent.impl.RocketMQHeaders; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import java.util.Map; @@ -90,13 +90,23 @@ public static Message msgConvert(MessageExt rmqMsg) { MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET), String.valueOf(rmqMsg.getQueueOffset())); + for (String sysPropKey : MessageConst.STRING_HASH_SET) { + if (StringUtils.isNotEmpty(message.getProperty(sysPropKey))) { + String prop = message.getProperty(sysPropKey); + String tmpPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR); + MessageAccessor.putProperty(message, tmpPropKey, prop); + message.getProperties().remove(sysPropKey); + } + } + return message; } private static String buildCloudEventPropertyKey(String propName) { - return RocketMQHeaders.CE_PREFIX + propName; +// return RocketMQHeaders.CE_PREFIX + propName; + return propName; } public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Message message) { @@ -121,8 +131,8 @@ public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Messag rmqMessageExt.setTopic(message.getTopic()); int queueId = - (int) Integer.valueOf(message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID))); - long queueOffset = (long) Long.valueOf( + Integer.parseInt(message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID))); + long queueOffset = Long.parseLong( message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET))); //use in manual ack rmqMessageExt.setQueueId(queueId); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java index 2f8bd634a0..bd702989f3 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java @@ -17,6 +17,8 @@ package org.apache.eventmesh.runtime.constants; +import org.apache.eventmesh.common.Constants; + public class EventMeshConstants { public static final String EVENT_STORE_PROPERTIES = "eventstore"; @@ -111,8 +113,10 @@ public class EventMeshConstants { public static final String PROPERTY_RR_REQUEST_ID = "RR_REQUEST_UNIQ_ID"; - public static final String LEAVE_TIME = "leavetime"; //leaveBrokerTime - public static final String ARRIVE_TIME = "arrivetime"; - public static final String STORE_TIME = "storetime"; + public static final String LEAVE_TIME = "leave" + Constants.MESSAGE_PROP_SEPARATOR + "time"; //leaveBrokerTime + public static final String ARRIVE_TIME = "arrive" + Constants.MESSAGE_PROP_SEPARATOR + "time"; + public static final String STORE_TIME = "store" + Constants.MESSAGE_PROP_SEPARATOR + "time"; + + } diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java index 4f33595ee0..cad2c5018b 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception { client.registerSubBusiHandler(new ReceiveMsgHook() { @Override public void handle(Package msg, ChannelHandlerContext ctx) { - if (msg.getHeader().getCommand() == Command.ASYNC_MESSAGE_TO_CLIENT || msg.getHeader().getCommand() == Command.BROADCAST_MESSAGE_TO_CLIENT) { + if (msg.getHeader().getCmd() == Command.ASYNC_MESSAGE_TO_CLIENT || msg.getHeader().getCmd() == Command.BROADCAST_MESSAGE_TO_CLIENT) { logger.error("receive message-------------------------------------" + msg.toString()); } }