Skip to content

Commit

Permalink
eventmeshmessage protocol adaptor for tcp (#618)
Browse files Browse the repository at this point in the history
1.fix the standalone connector
2.fix eventmeshmessage protocol adaptor for tcp
  • Loading branch information
xwm1992 authored Nov 29, 2021
1 parent 8cd26c6 commit 439f203
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public UserAgent(String env, String subsystem, String path, int pid, String host
@Override
public String toString() {
return String.format(
"UserAgent{env='%s'subsystem='%s', path='%s', pid=%d, host='%s', port=%d, version='%s', idc='%s', purpose='%s', unack='%d'}",
"UserAgent{env='%s', subsystem='%s', path='%s', pid=%d, host='%s', port=%d, version='%s', idc='%s', purpose='%s', unack='%d'}",
env, subsystem, path, pid, host, port, version, idc, purpose, unack);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ public void run() {
if (offset == null) {
CloudEvent message = standaloneBroker.getMessage(topicName);
if (message != null) {
offset = new AtomicInteger((int) message.getExtension("offset"));
if (message.getExtension("offset") != null) {
offset = new AtomicInteger((int) message.getExtension("offset"));
} else {
offset = new AtomicInteger(0);
}

}
}
if (offset != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public static EventMeshMessage generateSyncRRMqMsg() {
EventMeshMessage mqMsg = new EventMeshMessage();
mqMsg.setTopic(TOPIC_PRX_SyncSubscribeTest);
mqMsg.getProperties().put("msgType", "persistent");
mqMsg.getProperties().put("TTL", "300000");
mqMsg.getProperties().put("KEYS", generateRandomString(16));
mqMsg.getProperties().put("ttl", "300000");
mqMsg.getProperties().put("keys", generateRandomString(16));
mqMsg.setBody("testSyncRR");
return mqMsg;
}
Expand All @@ -121,29 +121,29 @@ public static EventMeshMessage generateSyncRRMqMsg() {
private static EventMeshMessage generateAsyncRRMqMsg() {
EventMeshMessage mqMsg = new EventMeshMessage();
mqMsg.setTopic(TOPIC_PRX_SyncSubscribeTest);
mqMsg.getProperties().put("REPLY_TO", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI");
mqMsg.getProperties().put("TTL", "300000");
mqMsg.getProperties().put("PROPERTY_MESSAGE_REPLY_TO", "notnull");
mqMsg.getProperties().put("replyto", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI");
mqMsg.getProperties().put("ttl", "300000");
mqMsg.getProperties().put("propertymessagereplyto", "notnull");
mqMsg.setBody("testAsyncRR");
return mqMsg;
}

public static EventMeshMessage generateAsyncEventMqMsg() {
EventMeshMessage mqMsg = new EventMeshMessage();
mqMsg.setTopic(TOPIC_PRX_WQ2ClientUniCast);
mqMsg.getProperties().put("REPLY_TO", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI");
mqMsg.getProperties().put("TTL", "30000");
mqMsg.getProperties().put("PROPERTY_MESSAGE_REPLY_TO", "notnull");
mqMsg.getProperties().put("replyto", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI");
mqMsg.getProperties().put("ttl", "30000");
mqMsg.getProperties().put("propertymessagereplyto", "notnull");
mqMsg.setBody("testAsyncMessage");
return mqMsg;
}

public static EventMeshMessage generateBroadcastMqMsg() {
EventMeshMessage mqMsg = new EventMeshMessage();
mqMsg.setTopic(TOPIC_PRX_WQ2ClientBroadCast);
mqMsg.getProperties().put("REPLY_TO", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI");
mqMsg.getProperties().put("TTL", "30000");
mqMsg.getProperties().put("PROPERTY_MESSAGE_REPLY_TO", "notnull");
mqMsg.getProperties().put("replyto", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI");
mqMsg.getProperties().put("ttl", "30000");
mqMsg.getProperties().put("propertymessagereplyto", "notnull");
mqMsg.setBody("testAsyncMessage");
return mqMsg;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.meshmessage.MeshMessageProtocolConstant;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -72,6 +73,8 @@ public static CloudEvent buildEvent(Header header, Object body) throws ProtocolH

cloudEventBuilder = cloudEventBuilder
.withId(header.getSeq())
.withSource(URI.create("/"))
.withType("eventmeshmessage")
.withSubject(topic)
.withData(content.getBytes(StandardCharsets.UTF_8));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ public class EventMeshConstants {

public static final String EVENTMESH_CONF_FILE = "eventmesh.properties";

public static final String REQ_C2EVENTMESH_TIMESTAMP = "req_c2eventMesh_timestamp";
public static final String REQ_EVENTMESH2MQ_TIMESTAMP = "req_eventMesh2mq_timestamp";
public static final String REQ_MQ2EVENTMESH_TIMESTAMP = "req_mq2eventMesh_timestamp";
public static final String REQ_EVENTMESH2C_TIMESTAMP = "req_eventMesh2c_timestamp";
public static final String RSP_C2EVENTMESH_TIMESTAMP = "rsp_c2eventMesh_timestamp";
public static final String RSP_EVENTMESH2MQ_TIMESTAMP = "rsp_eventMesh2mq_timestamp";
public static final String RSP_MQ2EVENTMESH_TIMESTAMP = "rsp_mq2eventMesh_timestamp";
public static final String RSP_EVENTMESH2C_TIMESTAMP = "rsp_eventMesh2c_timestamp";

public static final String REQ_SEND_EVENTMESH_IP = "req_send_eventMesh_ip";
public static final String REQ_RECEIVE_EVENTMESH_IP = "req_receive_eventMesh_ip";
public static final String RSP_SEND_EVENTMESH_IP = "rsp_send_eventMesh_ip";
public static final String RSP_RECEIVE_EVENTMESH_IP = "rsp_receive_eventMesh_ip";
public static final String REQ_C2EVENTMESH_TIMESTAMP = "reqc2eventmeshtimestamp";
public static final String REQ_EVENTMESH2MQ_TIMESTAMP = "reqeventmesh2mqtimestamp";
public static final String REQ_MQ2EVENTMESH_TIMESTAMP = "reqmq2eventmeshtimestamp";
public static final String REQ_EVENTMESH2C_TIMESTAMP = "reqeventmesh2ctimestamp";
public static final String RSP_C2EVENTMESH_TIMESTAMP = "rspc2eventmeshtimestamp";
public static final String RSP_EVENTMESH2MQ_TIMESTAMP = "rspeventmesh2mqtimestamp";
public static final String RSP_MQ2EVENTMESH_TIMESTAMP = "rspmq2eventmeshtimestamp";
public static final String RSP_EVENTMESH2C_TIMESTAMP = "rspeventmesh2ctimestamp";

public static final String REQ_SEND_EVENTMESH_IP = "reqsendeventmeship";
public static final String REQ_RECEIVE_EVENTMESH_IP = "reqreceiveeventmeship";
public static final String RSP_SEND_EVENTMESH_IP = "rspsendeventmeship";
public static final String RSP_RECEIVE_EVENTMESH_IP = "rspreceiveeventmeship";

//default TTL 4 hours
public static final Integer DEFAULT_MSG_TTL_MILLS = 14400000;
Expand Down Expand Up @@ -77,23 +77,23 @@ public class EventMeshConstants {
public static final String KEYS_UPPERCASE = "KEYS";
public static final String KEYS_LOWERCASE = "keys";
public static final String RR_REQUEST_UNIQ_ID = "RR_REQUEST_UNIQ_ID";
public static final String TTL = "TTL";
public static final String TTL = "ttl";

public static final String TAG = "TAG";

public static final String MANAGE_SUBSYSTEM = "subSystem";
public static final String MANAGE_SUBSYSTEM = "subsystem";
public static final String MANAGE_IP = "ip";
public static final String MANAGE_PORT = "port";
public static final String MANAGE_DEST_IP = "desteventMeshIp";
public static final String MANAGE_DEST_PORT = "desteventMeshPort";
public static final String MANAGE_DEST_IP = "desteventmeshIp";
public static final String MANAGE_DEST_PORT = "desteventmeshport";
public static final String MANAGE_PATH = "path";
public static final String MANAGE_GROUP = "group";
public static final String MANAGE_PURPOSE = "purpose";
public static final String MANAGE_TOPIC = "topic";

public static final String EVENTMESH_SEND_BACK_TIMES = "eventMeshSendBackTimes";
public static final String EVENTMESH_SEND_BACK_TIMES = "eventmeshdendbacktimes";

public static final String EVENTMESH_SEND_BACK_IP = "eventMeshSendBackIp";
public static final String EVENTMESH_SEND_BACK_IP = "eventmeshsendbackip";

public static final String EVENTMESH_REGISTRY_ADDR_KEY = "eventMeshRegistryAddr";

Expand All @@ -103,16 +103,16 @@ public class EventMeshConstants {

public static final String PROPERTY_MESSAGE_CLUSTER = "CLUSTER";

public static final String PROPERTY_MESSAGE_TTL = "TTL";
public static final String PROPERTY_MESSAGE_TTL = "ttl";

public static final String PROPERTY_MESSAGE_KEYS = "KEYS";
public static final String PROPERTY_MESSAGE_KEYS = "keys";

public static final String PROPERTY_MESSAGE_REPLY_TO = "REPLY_TO"; //requester clientId

public static final String PROPERTY_RR_REQUEST_ID = "RR_REQUEST_UNIQ_ID";

public static final String LEAVE_TIME = "LEAVE_TIME"; //leaveBrokerTime
public static final String ARRIVE_TIME = "ARRIVE_TIME";
public static final String STORE_TIME = "STORE_TIME";
public static final String LEAVE_TIME = "leavetime"; //leaveBrokerTime
public static final String ARRIVE_TIME = "arrivetime";
public static final String STORE_TIME = "storetime";

}
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ public void run() {
Iterator<Session> sessionIterator = sessionTable.values().iterator();
while (sessionIterator.hasNext()) {
Session tmp = sessionIterator.next();
if (System.currentTimeMillis() - tmp.getLastHeartbeatTime() >
eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) {
if (System.currentTimeMillis() - tmp.getLastHeartbeatTime()
> eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) {
try {
logger.warn("clean expired session,client:{}", tmp.getClient());
closeSession(tmp.getContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ public void init() throws EventMeshException {

@Override
public void heartbeat() throws EventMeshException {
// if (task != null) {
// synchronized (EventMeshMessageTCPPubClient.class) {
task = scheduler.scheduleAtFixedRate(() -> {
try {
if (!isActive()) {
Expand All @@ -86,8 +84,6 @@ public void heartbeat() throws EventMeshException {
}
}, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS);
}
// }
// }

@Override
public void reconnect() throws EventMeshException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,21 @@ public void init() throws EventMeshException {

@Override
public void heartbeat() throws EventMeshException {
// if (task == null) {
// synchronized (EventMeshMessageTCPSubClient.class) {
task = scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (!isActive()) {
reconnect();
}
Package msg = MessageUtils.heartBeat();
io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
} catch (Exception ignore) {
//
}
task = scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (!isActive()) {
reconnect();
}
}, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS);
Package msg = MessageUtils.heartBeat();
io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
} catch (Exception ignore) {
//
}
}
// }
// }
}, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS);
}

@Override
public void reconnect() throws EventMeshException {
Expand Down

0 comments on commit 439f203

Please sign in to comment.