diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java index afb27c1515..01768caf8d 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java @@ -23,10 +23,12 @@ import io.openmessaging.api.Message; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; +import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf; import org.slf4j.Logger; @@ -79,7 +81,8 @@ public HandleMsgContext(String msgRandomNo, String consumerGroup, EventMeshConsu this.bizSeqNo = bizSeqNo; this.uniqueId = uniqueId; this.consumeTopicConfig = consumeTopicConfig; - this.ttl = Integer.parseInt(msg.getUserProperties(Constants.PROPERTY_MESSAGE_TIMEOUT)); + String ttlStr = msg.getUserProperties(Constants.PROPERTY_MESSAGE_TIMEOUT); + this.ttl = StringUtils.isNumeric(ttlStr)? Integer.parseInt(ttlStr): EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS; } public void addProp(String key, String val) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java index 37d28b7d69..156cac23e3 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java @@ -188,7 +188,9 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext return; } - eventMeshHTTPServer.metrics.summaryMetrics.recordSendBatchMsg(Integer.parseInt(sendMessageBatchRequestBody.getSize())); + String sizeStr = sendMessageBatchRequestBody.getSize(); + long delta = StringUtils.isNumeric(sizeStr)? Integer.parseInt(sizeStr) : 0; + eventMeshHTTPServer.metrics.summaryMetrics.recordSendBatchMsg(delta); if (eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerBatchMsgBatchEnabled) { for (List batchMsgs : topicBatchMessageMappings.values()) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/ClientAckContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/ClientAckContext.java index 0f322d96e4..015fb467a6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/ClientAckContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/ClientAckContext.java @@ -22,6 +22,7 @@ import io.openmessaging.api.Message; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.common.Constants; @@ -53,7 +54,9 @@ public ClientAckContext(String seq, AbstractContext context, List msgs, this.msgs = msgs; this.consumer = consumer; this.createTime = System.currentTimeMillis(); - this.expireTime = System.currentTimeMillis() + Long.parseLong(msgs.get(0).getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL)); + String ttlStr = msgs.get(0).getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL); + long ttl = StringUtils.isNumeric(ttlStr)? Long.parseLong(ttlStr) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS; + this.expireTime = System.currentTimeMillis() + ttl; } public boolean isExpire() { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java index d8d5eb7421..01ba40e416 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java @@ -24,6 +24,7 @@ import io.openmessaging.api.Message; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.common.Constants; @@ -70,7 +71,9 @@ public DownStreamMsgContext(Message msgExt, Session session, MQConsumerWrapper c this.lastPushTime = System.currentTimeMillis(); this.executeTime = System.currentTimeMillis(); this.createTime = System.currentTimeMillis(); - this.expireTime = System.currentTimeMillis() + Long.parseLong(msgExt.getUserProperties("TTL")); + String ttlStr = msgExt.getUserProperties("TTL"); + long ttl = StringUtils.isNumeric(ttlStr) ? Long.parseLong(ttlStr) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS; + this.expireTime = System.currentTimeMillis() + ttl; this.msgFromOtherEventMesh = msgFromOtherEventMesh; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java index f11578c368..fc619b3d87 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java @@ -155,9 +155,11 @@ private void retryHandle(DownStreamMsgContext downStreamMsgContext) { private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) { boolean flag = false; - long ttl = Long.parseLong(downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL)); - //TODO 关注是否能取到 - long storeTimestamp = Long.parseLong(downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.STORE_TIME)); + String ttlStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL); + long ttl = StringUtils.isNumeric(ttlStr)? Long.parseLong(ttlStr) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;; + + String storeTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.STORE_TIME); + long storeTimestamp = StringUtils.isNumeric(storeTimeStr)? Long.parseLong(storeTimeStr) : 0; String leaveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.LEAVE_TIME); long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.parseLong(leaveTimeStr) - storeTimestamp : 0;