diff --git a/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/ProxyTcpRetryer.java b/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/ProxyTcpRetryer.java index f2d2ac4218..69c904015f 100644 --- a/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/ProxyTcpRetryer.java +++ b/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/ProxyTcpRetryer.java @@ -162,14 +162,14 @@ private void retryHandle(DownStreamMsgContext downStreamMsgContext){ private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext){ boolean flag =false; - long ttl = Long.valueOf(downStreamMsgContext.msgExt.sysHeaders().getString(ProxyConstants.PROPERTY_MESSAGE_TTL)); + long ttl = Long.parseLong(downStreamMsgContext.msgExt.userHeaders().getString(ProxyConstants.PROPERTY_MESSAGE_TTL)); //TODO 关注是否能取到 - long storeTimestamp = Long.valueOf(downStreamMsgContext.msgExt.sysHeaders().getString(DeFiBusConstant.STORE_TIME)); - String leaveTimeStr = downStreamMsgContext.msgExt.sysHeaders().getString(DeFiBusConstant.LEAVE_TIME); - long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.valueOf(leaveTimeStr) - storeTimestamp : 0; + long storeTimestamp = Long.parseLong(downStreamMsgContext.msgExt.userHeaders().getString(DeFiBusConstant.STORE_TIME)); + String leaveTimeStr = downStreamMsgContext.msgExt.userHeaders().getString(DeFiBusConstant.LEAVE_TIME); + long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.parseLong(leaveTimeStr) - storeTimestamp : 0; - String arriveTimeStr = downStreamMsgContext.msgExt.sysHeaders().getString(DeFiBusConstant.ARRIVE_TIME); - long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.valueOf(arriveTimeStr) : 0; + String arriveTimeStr = downStreamMsgContext.msgExt.userHeaders().getString(DeFiBusConstant.ARRIVE_TIME); + long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.parseLong(arriveTimeStr) : 0; double elapseTime = brokerCost + accessCost; if (elapseTime >= ttl) { logger.warn("discard the retry because timeout, seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, ProxyUtil.getMessageBizSeq(downStreamMsgContext.msgExt));