From 439f2030e4a4c740ed99179f1ebae235945d950c Mon Sep 17 00:00:00 2001 From: mike_xwm Date: Mon, 29 Nov 2021 19:39:11 +0800 Subject: [PATCH] eventmeshmessage protocol adaptor for tcp (#618) 1.fix the standalone connector 2.fix eventmeshmessage protocol adaptor for tcp --- .../common/protocol/tcp/UserAgent.java | 2 +- .../standalone/broker/task/SubScribeTask.java | 7 ++- .../tcp/common/EventMeshTestUtils.java | 22 ++++----- .../tcp/TcpMessageProtocolResolver.java | 3 ++ .../runtime/constants/EventMeshConstants.java | 48 +++++++++---------- .../group/ClientSessionGroupMapping.java | 4 +- .../EventMeshMessageTCPPubClient.java | 4 -- .../EventMeshMessageTCPSubClient.java | 30 +++++------- 8 files changed, 60 insertions(+), 60 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java index 6ad7a32072..1d0c8d3fad 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java @@ -67,7 +67,7 @@ public UserAgent(String env, String subsystem, String path, int pid, String host @Override public String toString() { return String.format( - "UserAgent{env='%s'subsystem='%s', path='%s', pid=%d, host='%s', port=%d, version='%s', idc='%s', purpose='%s', unack='%d'}", + "UserAgent{env='%s', subsystem='%s', path='%s', pid=%d, host='%s', port=%d, version='%s', idc='%s', purpose='%s', unack='%d'}", env, subsystem, path, pid, host, port, version, idc, purpose, unack); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/task/SubScribeTask.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/task/SubScribeTask.java index bfbf1671ee..f5058c111c 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/task/SubScribeTask.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/task/SubScribeTask.java @@ -57,7 +57,12 @@ public void run() { if (offset == null) { CloudEvent message = standaloneBroker.getMessage(topicName); if (message != null) { - offset = new AtomicInteger((int) message.getExtension("offset")); + if (message.getExtension("offset") != null) { + offset = new AtomicInteger((int) message.getExtension("offset")); + } else { + offset = new AtomicInteger(0); + } + } } if (offset != null) { diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java index d6f30e8502..db031171b7 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java @@ -111,8 +111,8 @@ public static EventMeshMessage generateSyncRRMqMsg() { EventMeshMessage mqMsg = new EventMeshMessage(); mqMsg.setTopic(TOPIC_PRX_SyncSubscribeTest); mqMsg.getProperties().put("msgType", "persistent"); - mqMsg.getProperties().put("TTL", "300000"); - mqMsg.getProperties().put("KEYS", generateRandomString(16)); + mqMsg.getProperties().put("ttl", "300000"); + mqMsg.getProperties().put("keys", generateRandomString(16)); mqMsg.setBody("testSyncRR"); return mqMsg; } @@ -121,9 +121,9 @@ public static EventMeshMessage generateSyncRRMqMsg() { private static EventMeshMessage generateAsyncRRMqMsg() { EventMeshMessage mqMsg = new EventMeshMessage(); mqMsg.setTopic(TOPIC_PRX_SyncSubscribeTest); - mqMsg.getProperties().put("REPLY_TO", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI"); - mqMsg.getProperties().put("TTL", "300000"); - mqMsg.getProperties().put("PROPERTY_MESSAGE_REPLY_TO", "notnull"); + mqMsg.getProperties().put("replyto", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI"); + mqMsg.getProperties().put("ttl", "300000"); + mqMsg.getProperties().put("propertymessagereplyto", "notnull"); mqMsg.setBody("testAsyncRR"); return mqMsg; } @@ -131,9 +131,9 @@ private static EventMeshMessage generateAsyncRRMqMsg() { public static EventMeshMessage generateAsyncEventMqMsg() { EventMeshMessage mqMsg = new EventMeshMessage(); mqMsg.setTopic(TOPIC_PRX_WQ2ClientUniCast); - mqMsg.getProperties().put("REPLY_TO", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI"); - mqMsg.getProperties().put("TTL", "30000"); - mqMsg.getProperties().put("PROPERTY_MESSAGE_REPLY_TO", "notnull"); + mqMsg.getProperties().put("replyto", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI"); + mqMsg.getProperties().put("ttl", "30000"); + mqMsg.getProperties().put("propertymessagereplyto", "notnull"); mqMsg.setBody("testAsyncMessage"); return mqMsg; } @@ -141,9 +141,9 @@ public static EventMeshMessage generateAsyncEventMqMsg() { public static EventMeshMessage generateBroadcastMqMsg() { EventMeshMessage mqMsg = new EventMeshMessage(); mqMsg.setTopic(TOPIC_PRX_WQ2ClientBroadCast); - mqMsg.getProperties().put("REPLY_TO", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI"); - mqMsg.getProperties().put("TTL", "30000"); - mqMsg.getProperties().put("PROPERTY_MESSAGE_REPLY_TO", "notnull"); + mqMsg.getProperties().put("replyto", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI"); + mqMsg.getProperties().put("ttl", "30000"); + mqMsg.getProperties().put("propertymessagereplyto", "notnull"); mqMsg.setBody("testAsyncMessage"); return mqMsg; } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/tcp/TcpMessageProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/tcp/TcpMessageProtocolResolver.java index 623575941b..3f69237fe7 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/tcp/TcpMessageProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/tcp/TcpMessageProtocolResolver.java @@ -28,6 +28,7 @@ import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; import org.apache.eventmesh.protocol.meshmessage.MeshMessageProtocolConstant; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -72,6 +73,8 @@ public static CloudEvent buildEvent(Header header, Object body) throws ProtocolH cloudEventBuilder = cloudEventBuilder .withId(header.getSeq()) + .withSource(URI.create("/")) + .withType("eventmeshmessage") .withSubject(topic) .withData(content.getBytes(StandardCharsets.UTF_8)); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java index 810b3f20ce..537aa9140a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java @@ -35,19 +35,19 @@ public class EventMeshConstants { public static final String EVENTMESH_CONF_FILE = "eventmesh.properties"; - public static final String REQ_C2EVENTMESH_TIMESTAMP = "req_c2eventMesh_timestamp"; - public static final String REQ_EVENTMESH2MQ_TIMESTAMP = "req_eventMesh2mq_timestamp"; - public static final String REQ_MQ2EVENTMESH_TIMESTAMP = "req_mq2eventMesh_timestamp"; - public static final String REQ_EVENTMESH2C_TIMESTAMP = "req_eventMesh2c_timestamp"; - public static final String RSP_C2EVENTMESH_TIMESTAMP = "rsp_c2eventMesh_timestamp"; - public static final String RSP_EVENTMESH2MQ_TIMESTAMP = "rsp_eventMesh2mq_timestamp"; - public static final String RSP_MQ2EVENTMESH_TIMESTAMP = "rsp_mq2eventMesh_timestamp"; - public static final String RSP_EVENTMESH2C_TIMESTAMP = "rsp_eventMesh2c_timestamp"; - - public static final String REQ_SEND_EVENTMESH_IP = "req_send_eventMesh_ip"; - public static final String REQ_RECEIVE_EVENTMESH_IP = "req_receive_eventMesh_ip"; - public static final String RSP_SEND_EVENTMESH_IP = "rsp_send_eventMesh_ip"; - public static final String RSP_RECEIVE_EVENTMESH_IP = "rsp_receive_eventMesh_ip"; + public static final String REQ_C2EVENTMESH_TIMESTAMP = "reqc2eventmeshtimestamp"; + public static final String REQ_EVENTMESH2MQ_TIMESTAMP = "reqeventmesh2mqtimestamp"; + public static final String REQ_MQ2EVENTMESH_TIMESTAMP = "reqmq2eventmeshtimestamp"; + public static final String REQ_EVENTMESH2C_TIMESTAMP = "reqeventmesh2ctimestamp"; + public static final String RSP_C2EVENTMESH_TIMESTAMP = "rspc2eventmeshtimestamp"; + public static final String RSP_EVENTMESH2MQ_TIMESTAMP = "rspeventmesh2mqtimestamp"; + public static final String RSP_MQ2EVENTMESH_TIMESTAMP = "rspmq2eventmeshtimestamp"; + public static final String RSP_EVENTMESH2C_TIMESTAMP = "rspeventmesh2ctimestamp"; + + public static final String REQ_SEND_EVENTMESH_IP = "reqsendeventmeship"; + public static final String REQ_RECEIVE_EVENTMESH_IP = "reqreceiveeventmeship"; + public static final String RSP_SEND_EVENTMESH_IP = "rspsendeventmeship"; + public static final String RSP_RECEIVE_EVENTMESH_IP = "rspreceiveeventmeship"; //default TTL 4 hours public static final Integer DEFAULT_MSG_TTL_MILLS = 14400000; @@ -77,23 +77,23 @@ public class EventMeshConstants { public static final String KEYS_UPPERCASE = "KEYS"; public static final String KEYS_LOWERCASE = "keys"; public static final String RR_REQUEST_UNIQ_ID = "RR_REQUEST_UNIQ_ID"; - public static final String TTL = "TTL"; + public static final String TTL = "ttl"; public static final String TAG = "TAG"; - public static final String MANAGE_SUBSYSTEM = "subSystem"; + public static final String MANAGE_SUBSYSTEM = "subsystem"; public static final String MANAGE_IP = "ip"; public static final String MANAGE_PORT = "port"; - public static final String MANAGE_DEST_IP = "desteventMeshIp"; - public static final String MANAGE_DEST_PORT = "desteventMeshPort"; + public static final String MANAGE_DEST_IP = "desteventmeshIp"; + public static final String MANAGE_DEST_PORT = "desteventmeshport"; public static final String MANAGE_PATH = "path"; public static final String MANAGE_GROUP = "group"; public static final String MANAGE_PURPOSE = "purpose"; public static final String MANAGE_TOPIC = "topic"; - public static final String EVENTMESH_SEND_BACK_TIMES = "eventMeshSendBackTimes"; + public static final String EVENTMESH_SEND_BACK_TIMES = "eventmeshdendbacktimes"; - public static final String EVENTMESH_SEND_BACK_IP = "eventMeshSendBackIp"; + public static final String EVENTMESH_SEND_BACK_IP = "eventmeshsendbackip"; public static final String EVENTMESH_REGISTRY_ADDR_KEY = "eventMeshRegistryAddr"; @@ -103,16 +103,16 @@ public class EventMeshConstants { public static final String PROPERTY_MESSAGE_CLUSTER = "CLUSTER"; - public static final String PROPERTY_MESSAGE_TTL = "TTL"; + public static final String PROPERTY_MESSAGE_TTL = "ttl"; - public static final String PROPERTY_MESSAGE_KEYS = "KEYS"; + public static final String PROPERTY_MESSAGE_KEYS = "keys"; public static final String PROPERTY_MESSAGE_REPLY_TO = "REPLY_TO"; //requester clientId public static final String PROPERTY_RR_REQUEST_ID = "RR_REQUEST_UNIQ_ID"; - public static final String LEAVE_TIME = "LEAVE_TIME"; //leaveBrokerTime - public static final String ARRIVE_TIME = "ARRIVE_TIME"; - public static final String STORE_TIME = "STORE_TIME"; + public static final String LEAVE_TIME = "leavetime"; //leaveBrokerTime + public static final String ARRIVE_TIME = "arrivetime"; + public static final String STORE_TIME = "storetime"; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java index 8f3e383eb4..071c104d19 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java @@ -367,8 +367,8 @@ public void run() { Iterator sessionIterator = sessionTable.values().iterator(); while (sessionIterator.hasNext()) { Session tmp = sessionIterator.next(); - if (System.currentTimeMillis() - tmp.getLastHeartbeatTime() > - eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) { + if (System.currentTimeMillis() - tmp.getLastHeartbeatTime() + > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) { try { logger.warn("clean expired session,client:{}", tmp.getClient()); closeSession(tmp.getContext()); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java index 37c0f159a7..bcf1e286cc 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java @@ -72,8 +72,6 @@ public void init() throws EventMeshException { @Override public void heartbeat() throws EventMeshException { -// if (task != null) { -// synchronized (EventMeshMessageTCPPubClient.class) { task = scheduler.scheduleAtFixedRate(() -> { try { if (!isActive()) { @@ -86,8 +84,6 @@ public void heartbeat() throws EventMeshException { } }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); } -// } -// } @Override public void reconnect() throws EventMeshException { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java index f9da554efd..4453249a52 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java @@ -71,25 +71,21 @@ public void init() throws EventMeshException { @Override public void heartbeat() throws EventMeshException { -// if (task == null) { -// synchronized (EventMeshMessageTCPSubClient.class) { - task = scheduler.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - if (!isActive()) { - reconnect(); - } - Package msg = MessageUtils.heartBeat(); - io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } catch (Exception ignore) { - // - } + task = scheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (!isActive()) { + reconnect(); } - }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); + Package msg = MessageUtils.heartBeat(); + io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ignore) { + // + } } -// } -// } + }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); + } @Override public void reconnect() throws EventMeshException {