Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug #646] Missing the rocketmq message properties during protocol conversion #647

Merged
merged 5 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,6 @@ public class Constants {

public static final String PROPERTY_MESSAGE_STORE_TIMESTAMP = "storetimestamp";

public static final String MESSAGE_PROP_SEPARATOR = "99";

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ protected boolean isContentTypeHeader(String key) {

@Override
protected boolean isCloudEventsHeader(String key) {
return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length())
.startsWith(RocketMQHeaders.CE_PREFIX);
// return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length())
//.startsWith(RocketMQHeaders.CE_PREFIX);
return true;
}

@Override
protected String toCloudEventsKey(String key) {
return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase();
//return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase();
return key.toLowerCase();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class RocketMQHeaders {
public static final String CE_PREFIX = "CE_";

protected static final Map<String, String> ATTRIBUTES_TO_HEADERS =
MessageUtils.generateAttributesToHeadersMapping(v -> CE_PREFIX + v);
MessageUtils.generateAttributesToHeadersMapping(v -> v);

public static final String CONTENT_TYPE =
ATTRIBUTES_TO_HEADERS.get(CloudEventV1.DATACONTENTTYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ public RocketMQMessageWriter(String topic, String keys, String tags) {
public CloudEventContextWriter withContextAttribute(String name, String value)
throws CloudEventRWException {

String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name);
if (propName == null) {
propName = RocketMQHeaders.CE_PREFIX + name;
}
message.putUserProperty(propName, value);
//String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name);
//if (propName == null) {
//propName = RocketMQHeaders.CE_PREFIX + name;
//}
//message.putUserProperty(propName, value);
message.putUserProperty(name, value);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,24 +183,18 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg,
String.valueOf(msg.getStoreTimestamp()));

//for rr request/reply
String cluster = msg.getProperty(MessageConst.PROPERTY_CLUSTER);
String replyClient = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);

CloudEvent cloudEvent =
RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent();

CloudEventBuilder cloudEventBuilder;
if (StringUtils.isNotEmpty(cluster)) {
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("cluster", cluster);
cloudEvent = cloudEventBuilder.build();
}
if (StringUtils.isNotEmpty(replyClient)) {
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("replytoclient", replyClient);
cloudEvent = cloudEventBuilder.build();
CloudEventBuilder cloudEventBuilder = null;
for (String sysPropKey : MessageConst.STRING_HASH_SET) {
if (StringUtils.isNotEmpty(msg.getProperty(sysPropKey))) {
String prop = msg.getProperty(sysPropKey);
sysPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR);
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension(sysPropKey, prop);
}
}
if (StringUtils.isNotEmpty(correlationId)) {
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("correlationid", correlationId);
if (cloudEventBuilder != null) {
cloudEvent = cloudEventBuilder.build();
}

Expand Down Expand Up @@ -259,25 +253,19 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg,
msg.putUserProperty(EventMeshConstants.STORE_TIMESTAMP,
String.valueOf(msg.getStoreTimestamp()));

//for rr request/reply
String cluster = msg.getProperty(MessageConst.PROPERTY_CLUSTER);
String replyClient = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);

CloudEvent cloudEvent =
RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent();

CloudEventBuilder cloudEventBuilder;
if (StringUtils.isNotEmpty(cluster)) {
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("cluster", cluster);
cloudEvent = cloudEventBuilder.build();
}
if (StringUtils.isNotEmpty(replyClient)) {
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("replytoclient", replyClient);
cloudEvent = cloudEventBuilder.build();
CloudEventBuilder cloudEventBuilder = null;

for (String sysPropKey : MessageConst.STRING_HASH_SET) {
if (StringUtils.isNotEmpty(msg.getProperty(sysPropKey))) {
String prop = msg.getProperty(sysPropKey);
sysPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR);
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension(sysPropKey, prop);
}
}
if (StringUtils.isNotEmpty(correlationId)) {
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("correlationid", correlationId);
if (cloudEventBuilder != null) {
cloudEvent = cloudEventBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.rocketmq.cloudevent.RocketMQMessageFactory;
import org.apache.eventmesh.connector.rocketmq.utils.OMSUtil;
import org.apache.eventmesh.connector.rocketmq.utils.CloudEventUtils;
Expand Down Expand Up @@ -75,6 +76,7 @@ public SendResult send(CloudEvent cloudEvent) {
this.checkProducerServiceState(rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
msg = supplySysProp(msg, cloudEvent);
String messageId = null;
try {
org.apache.rocketmq.client.producer.SendResult sendResultRmq =
Expand All @@ -95,6 +97,7 @@ public void sendOneway(CloudEvent cloudEvent) {
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
msg = supplySysProp(msg, cloudEvent);
try {
this.rocketmqProducer.sendOneway(msg);
} catch (Exception e) {
Expand All @@ -109,7 +112,7 @@ public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) {
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);

msg = supplySysProp(msg, cloudEvent);
try {
this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback));
} catch (Exception e) {
Expand All @@ -125,6 +128,9 @@ public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);

msg = supplySysProp(msg, cloudEvent);

rocketmqProducer.request(msg, rrCallbackConvert(msg, rrCallback), timeout);
}

Expand All @@ -133,15 +139,7 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac
org.apache.rocketmq.common.message.Message msg =
RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
if (StringUtils.isNotEmpty(cloudEvent.getExtension("cluster").toString())) {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CLUSTER, cloudEvent.getExtension("cluster").toString());
}
if (StringUtils.isNotEmpty(cloudEvent.getExtension("replytoclient").toString())) {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, cloudEvent.getExtension("replytoclient").toString());
}
if (StringUtils.isNotEmpty(cloudEvent.getExtension("correlationid").toString())) {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, cloudEvent.getExtension("correlationid").toString());
}
msg = supplySysProp(msg, cloudEvent);

try {
this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback));
Expand All @@ -154,10 +152,30 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac

}

private Message supplySysProp(Message msg, CloudEvent cloudEvent) {
for (String sysPropKey : MessageConst.STRING_HASH_SET) {
String ceKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR);
if (cloudEvent.getExtension(ceKey) != null && StringUtils.isNotEmpty(cloudEvent.getExtension(ceKey).toString())) {
MessageAccessor.putProperty(msg, sysPropKey, cloudEvent.getExtension(ceKey).toString());
msg.getProperties().remove(ceKey);
}
}
return msg;
}

private RequestCallback rrCallbackConvert(final Message message, final RequestReplyCallback rrCallback) {
return new RequestCallback() {
@Override
public void onSuccess(org.apache.rocketmq.common.message.Message message) {
// clean the message property to lowercase
for (String sysPropKey : MessageConst.STRING_HASH_SET) {
if (StringUtils.isNotEmpty(message.getProperty(sysPropKey))) {
String prop = message.getProperty(sysPropKey);
String tmpPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR);
MessageAccessor.putProperty(message, tmpPropKey, prop);
message.getProperties().remove(sysPropKey);
}
}
CloudEvent event = RocketMQMessageFactory.createReader(message).toEvent();
rrCallback.onSuccess(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.eventmesh.connector.rocketmq.utils;


import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.rocketmq.cloudevent.impl.RocketMQHeaders;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Map;
Expand Down Expand Up @@ -90,13 +90,23 @@ public static Message msgConvert(MessageExt rmqMsg) {
MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET),
String.valueOf(rmqMsg.getQueueOffset()));

for (String sysPropKey : MessageConst.STRING_HASH_SET) {
if (StringUtils.isNotEmpty(message.getProperty(sysPropKey))) {
String prop = message.getProperty(sysPropKey);
String tmpPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR);
MessageAccessor.putProperty(message, tmpPropKey, prop);
message.getProperties().remove(sysPropKey);
}
}

return message;
}



private static String buildCloudEventPropertyKey(String propName) {
return RocketMQHeaders.CE_PREFIX + propName;
//return RocketMQHeaders.CE_PREFIX + propName;
return propName;
}

public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Message message) {
Expand All @@ -121,8 +131,8 @@ public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Messag
rmqMessageExt.setTopic(message.getTopic());

int queueId =
(int) Integer.valueOf(message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID)));
long queueOffset = (long) Long.valueOf(
Integer.parseInt(message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID)));
long queueOffset = Long.parseLong(
message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET)));
//use in manual ack
rmqMessageExt.setQueueId(queueId);
Expand Down
2 changes: 0 additions & 2 deletions eventmesh-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ dependencies {
implementation project(":eventmesh-security-plugin:eventmesh-security-api")
implementation project(":eventmesh-security-plugin:eventmesh-security-acl")
implementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
implementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv")
implementation project(":eventmesh-admin:eventmesh-admin-rocketmq")

implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
Expand All @@ -51,7 +50,6 @@ dependencies {
testImplementation project(":eventmesh-security-plugin:eventmesh-security-api")
testImplementation project(":eventmesh-security-plugin:eventmesh-security-acl")
testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv")
testImplementation project(":eventmesh-admin:eventmesh-admin-rocketmq")

testImplementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.eventmesh.runtime.constants;

import org.apache.eventmesh.common.Constants;

public class EventMeshConstants {

public static final String EVENT_STORE_PROPERTIES = "eventstore";
Expand Down Expand Up @@ -111,8 +113,10 @@ public class EventMeshConstants {

public static final String PROPERTY_RR_REQUEST_ID = "RR_REQUEST_UNIQ_ID";

public static final String LEAVE_TIME = "leavetime"; //leaveBrokerTime
public static final String ARRIVE_TIME = "arrivetime";
public static final String STORE_TIME = "storetime";
public static final String LEAVE_TIME = "leave" + Constants.MESSAGE_PROP_SEPARATOR + "time"; //leaveBrokerTime
public static final String ARRIVE_TIME = "arrive" + Constants.MESSAGE_PROP_SEPARATOR + "time";
public static final String STORE_TIME = "store" + Constants.MESSAGE_PROP_SEPARATOR + "time";



}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception {
client.registerSubBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
if (msg.getHeader().getCommand() == Command.ASYNC_MESSAGE_TO_CLIENT || msg.getHeader().getCommand() == Command.BROADCAST_MESSAGE_TO_CLIENT) {
if (msg.getHeader().getCmd() == Command.ASYNC_MESSAGE_TO_CLIENT || msg.getHeader().getCmd() == Command.BROADCAST_MESSAGE_TO_CLIENT) {
logger.error("receive message-------------------------------------" + msg.toString());
}
}
Expand Down