Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloudevents/eventmesh message protocol pub/sub for sdk in rocketmq-connector #628

Merged
merged 24 commits into from
Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fee81c0
update java sdk
xwm1992 Nov 23, 2021
267d35b
Merge branch 'cloudevents' of https://github.com/apache/incubator-eve…
xwm1992 Nov 24, 2021
add1d1c
fix compile error
xwm1992 Nov 24, 2021
8c098e6
fix sdk error
xwm1992 Nov 24, 2021
b7d34b8
Merge branch 'cloudevents' of https://github.com/apache/incubator-eve…
xwm1992 Nov 25, 2021
9f6e155
1.remove the openmessage from connector-api
xwm1992 Nov 25, 2021
a27ad7e
1.fix the standalone connector
xwm1992 Nov 26, 2021
8503fd2
Merge branch 'cloudevents' of https://github.com/apache/incubator-eve…
xwm1992 Nov 26, 2021
34bc449
Merge branch 'cloudevents' of https://github.com/apache/incubator-eve…
xwm1992 Nov 29, 2021
76ae04f
1.fix the standalone connector
xwm1992 Nov 29, 2021
e8d8e4a
1.fix eventmeshmessage protocol adaptor for http with standalone conn…
xwm1992 Nov 30, 2021
e3fbfff
Merge branch 'cloudevents' of https://github.com/apache/incubator-eve…
xwm1992 Nov 30, 2021
be60a68
Merge branch 'cloudevents' of https://github.com/apache/incubator-eve…
xwm1992 Nov 30, 2021
0c40584
1.cloudevents protocol tcp pub/sub for sdk
xwm1992 Nov 30, 2021
ccaf841
Merge branch 'cloudevents' of https://github.com/apache/incubator-eve…
xwm1992 Dec 1, 2021
8068fc0
1.cloudevents protocol tcp pub/sub for sdk
xwm1992 Dec 1, 2021
d593d10
1.cloudevents protocol tcp pub/sub for sdk
xwm1992 Dec 2, 2021
1d3f752
Merge branch 'cloudevents' of https://github.com/apache/incubator-eve…
xwm1992 Dec 2, 2021
462881b
1.cloudevents protocol http pub/sub for sdk
xwm1992 Dec 2, 2021
613d31f
Merge branch 'cloudevents' of https://github.com/apache/incubator-eve…
xwm1992 Dec 2, 2021
f45f89a
fix cloudevents protocol pub/sub for http sdk
xwm1992 Dec 3, 2021
0a61fbd
Merge branch 'cloudevents' of https://github.com/apache/incubator-eve…
xwm1992 Dec 3, 2021
7de5b51
fix cloudevents/eventmesh message protocol pub/sub for sdk in rocketm…
xwm1992 Dec 6, 2021
0b95762
Merge branch 'cloudevents' of https://github.com/apache/incubator-eve…
xwm1992 Dec 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,28 @@ public class Constants {

public static final String KEY_CONSTANTS_INSTANCE_DESC_PID = "pid";

public static final String RMB_UNIQ_ID = "RMB_UNIQ_ID";
public static final String RMB_UNIQ_ID = "rmbuniqid";

public static final String IDC_SEPERATER = "-";

public static final String PROPERTY_MESSAGE_TIMEOUT = "TIMEOUT";
public static final String PROPERTY_MESSAGE_TIMEOUT = "timeout";

public static final String PROPERTY_MESSAGE_SEARCH_KEYS = "SEARCH_KEYS";
public static final String PROPERTY_MESSAGE_SEARCH_KEYS = "searchkeys";

public static final String PROPERTY_MESSAGE_QUEUE_ID = "QUEUE_ID";
public static final String PROPERTY_MESSAGE_QUEUE_ID = "queueid";

public static final String PROPERTY_MESSAGE_QUEUE_OFFSET = "QUEUE_OFFSET";
public static final String PROPERTY_MESSAGE_QUEUE_OFFSET = "queueoffset";

public static final String PROPERTY_MESSAGE_DESTINATION = "DESTINATION";
public static final String PROPERTY_MESSAGE_DESTINATION = "destination";

public static final String PROPERTY_MESSAGE_MESSAGE_ID = "MESSAGE_ID";
public static final String PROPERTY_MESSAGE_MESSAGE_ID = "messageid";

public static final String PROPERTY_MESSAGE_BORN_HOST = "BORN_HOST";
public static final String PROPERTY_MESSAGE_BORN_HOST = "bornhost";

public static final String PROPERTY_MESSAGE_BORN_TIMESTAMP = "BORN_TIMESTAMP";
public static final String PROPERTY_MESSAGE_BORN_TIMESTAMP = "borntimestamp";

public static final String PROPERTY_MESSAGE_STORE_HOST = "STORE_HOST";
public static final String PROPERTY_MESSAGE_STORE_HOST = "storehost";

public static final String PROPERTY_MESSAGE_STORE_TIMESTAMP = "STORE_TIMESTAMP";
public static final String PROPERTY_MESSAGE_STORE_TIMESTAMP = "storetimestamp";

}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private static Object deserializeBody(String bodyJsonString, Header header) thro
case REDIRECT_TO_CLIENT:
return OBJECT_MAPPER.readValue(bodyJsonString, RedirectInfo.class);
default:
log.error("Invalidate TCP command: {}", command);
log.warn("Invalidate TCP command: {}", command);
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,33 +69,33 @@ public class EventMeshConstants {
public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";


public static final String BORN_TIMESTAMP = "BORN_TIME";
public static final String STORE_TIMESTAMP = "STORE_TIME";
public static final String LEAVE_TIMESTAMP = "LEAVE_TIME";
public static final String ARRIVE_TIMESTAMP = "ARRIVE_TIME";
public static final String BORN_TIMESTAMP = "borntime";
public static final String STORE_TIMESTAMP = "storetime";
public static final String LEAVE_TIMESTAMP = "leavetime";
public static final String ARRIVE_TIMESTAMP = "arrivetime";

public static final String KEYS_UPPERCASE = "KEYS";
public static final String KEYS_LOWERCASE = "keys";
public static final String RR_REQUEST_UNIQ_ID = "RR_REQUEST_UNIQ_ID";
public static final String TTL = "TTL";
public static final String TTL = "ttl";

public static final String TAG = "TAG";
public static final String TAG = "tag";

public static final String MANAGE_SUBSYSTEM = "subSystem";
public static final String MANAGE_SUBSYSTEM = "subsystem";
public static final String MANAGE_IP = "ip";
public static final String MANAGE_PORT = "port";
public static final String MANAGE_DEST_IP = "destEventMeshIp";
public static final String MANAGE_DEST_PORT = "destEventMeshPort";
public static final String MANAGE_DEST_IP = "desteventmeship";
public static final String MANAGE_DEST_PORT = "desteventmeshport";
public static final String MANAGE_PATH = "path";
public static final String MANAGE_GROUP = "group";
public static final String MANAGE_PURPOSE = "purpose";
public static final String MANAGE_TOPIC = "topic";

public static final String EventMesh_SEND_BACK_TIMES = "EventMeshSendBackTimes";
public static final String EventMesh_SEND_BACK_TIMES = "eventmeshsendbacktimes";

public static final String EventMesh_SEND_BACK_IP = "EventMeshSendBackIp";
public static final String EventMesh_SEND_BACK_IP = "eventmeshsendbackip";

public static final String EventMesh_REGISTRY_ADDR_KEY = "EventMeshRegistryAddr";
public static final String EventMesh_REGISTRY_ADDR_KEY = "eventmeshregistryaddr";

public static int DEFAULT_TIME_OUT_MILLS = 5 * 1000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eventmesh.api.AsyncConsumeContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.rocketmq.cloudevent.RocketMQMessageFactory;
Expand Down Expand Up @@ -250,7 +251,7 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg,
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());

AsyncConsumeContext asyncConsumeContext = new AsyncConsumeContext() {
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() {
@Override
public void commit(EventMeshAction action) {
switch (action) {
Expand All @@ -272,7 +273,9 @@ public void commit(EventMeshAction action) {
}
};

listener.consume(cloudEvent, asyncConsumeContext);
eventMeshAsyncConsumeContext.setAbstractContext(context);

listener.consume(cloudEvent, eventMeshAsyncConsumeContext);

return EventMeshConsumeConcurrentlyStatus.valueOf(
contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static Message msgConvert(MessageExt rmqMsg) {
String.valueOf(rmqMsg.getBornTimestamp()));
MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_STORE_HOST),
String.valueOf(rmqMsg.getStoreHost()));
MessageAccessor.putProperty(message, buildCloudEventPropertyKey("STORE_TIMESTAMP"),
MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_STORE_TIMESTAMP),
String.valueOf(rmqMsg.getStoreTimestamp()));

//use in manual ack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ public static void main(String[] args) throws Exception {

try (EventMeshHttpProducer eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig)) {
for (int i = 0; i < messageSize; i++) {
EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
.bizSeqNo(RandomStringUtils.generateNum(30))
.content("testPublishMessage")
.topic(topic)
.uniqueId(RandomStringUtils.generateNum(30))
.build()
.addProp(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000));

Map<String, String> content = new HashMap<>();
content.put("content", "testAsyncMessage");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.EventMeshMessage;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
import org.apache.eventmesh.common.utils.ThreadUtils;
import org.apache.eventmesh.util.Utils;

import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public class AsyncPublishInstance {

Expand Down Expand Up @@ -63,13 +63,19 @@ public static void main(String[] args) throws Exception {
.idc("idc")
.ip(IPUtils.getLocalAddress())
.sys("1234")
.pid(String.valueOf(ThreadUtils.getPID())).build();
.pid(String.valueOf(ThreadUtils.getPID()))
.userName("eventmesh")
.password("pass")
.build();

try (EventMeshHttpProducer eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig);) {
for (int i = 0; i < messageSize; i++) {
Map<String, String> content = new HashMap<>();
content.put("content", "testPublishMessage");

EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
.bizSeqNo(RandomStringUtils.generateNum(30))
.content("testPublishMessage")
.content(JsonUtils.serialize(content))
.topic(topic)
.uniqueId(RandomStringUtils.generateNum(30))
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;

Expand Down Expand Up @@ -55,7 +56,7 @@ public static void main(String[] agrs) throws Exception {
client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
client.init();

client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.subscribe(EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);

client.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v1();

event = cloudEventBuilder.withId(sendMessageBatchV2RequestBody.getBizSeqNo())
cloudEventBuilder = cloudEventBuilder.withId(sendMessageBatchV2RequestBody.getBizSeqNo())
.withSubject(sendMessageBatchV2RequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
Expand All @@ -81,12 +81,14 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageBatchV2RequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageBatchV2RequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageBatchV2RequestBody.getTtl())
.withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageBatchV2RequestBody.getTag())
.build();
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageBatchV2RequestBody.getTtl());
if (StringUtils.isNotEmpty(sendMessageBatchV2RequestBody.getTag())) {
cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageBatchV2RequestBody.getTag());
}
event = cloudEventBuilder.build();
} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v03();
event = cloudEventBuilder.withId(sendMessageBatchV2RequestBody.getBizSeqNo())
cloudEventBuilder = cloudEventBuilder.withId(sendMessageBatchV2RequestBody.getBizSeqNo())
.withSubject(sendMessageBatchV2RequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
Expand All @@ -106,9 +108,11 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageBatchV2RequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageBatchV2RequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageBatchV2RequestBody.getTtl())
.withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageBatchV2RequestBody.getTag())
.build();
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageBatchV2RequestBody.getTtl());
if (StringUtils.isNotEmpty(sendMessageBatchV2RequestBody.getTag())) {
cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageBatchV2RequestBody.getTag());
}
event = cloudEventBuilder.build();
}
return event;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v1();

event = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
cloudEventBuilder = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
.withSubject(sendMessageRequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
Expand All @@ -86,12 +86,14 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
.withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId())
.withExtension(SendMessageRequestBody.PRODUCERGROUP,
sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageRequestBody.TAG, sendMessageRequestBody.getTag())
.build();
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl());
if (StringUtils.isNotEmpty(sendMessageRequestBody.getTag())) {
cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageRequestBody.getTag());
}
event = cloudEventBuilder.build();
} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v03();
event = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
cloudEventBuilder = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
.withSubject(sendMessageRequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
Expand All @@ -113,9 +115,11 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
.withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId())
.withExtension(SendMessageRequestBody.PRODUCERGROUP,
sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageRequestBody.TAG, sendMessageRequestBody.getTag())
.build();
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl());
if (StringUtils.isNotEmpty(sendMessageRequestBody.getTag())) {
cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageRequestBody.getTag());
}
event = cloudEventBuilder.build();
}
return event;
} catch (Exception e) {
Expand Down