From 518c6fd90457411d129c85701e652b65666df5f3 Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Sat, 10 Jul 2021 16:05:05 +0800 Subject: [PATCH 01/16] The Chinese in the notes is translated into English. --- .../eventmesh/common/command/HttpCommand.java | 4 +- .../body/client/HeartbeatResponseBody.java | 6 +- .../message/ReplyMessageResponseBody.java | 6 +- .../message/SendMessageBatchResponseBody.java | 6 +- .../SendMessageBatchV2ResponseBody.java | 6 +- .../protocol/http/common/ClientRetCode.java | 1 + .../protocol/http/common/ProtocolKey.java | 6 +- .../header/client/HeartbeatRequestHeader.java | 20 +-- .../http/header/client/RegResponseHeader.java | 10 +- .../header/client/UnRegRequestHeader.java | 20 +-- .../message/PushMessageRequestHeader.java | 6 +- .../message/PushMessageResponseHeader.java | 20 +-- .../message/ReplyMessageRequestHeader.java | 20 +-- .../message/ReplyMessageResponseHeader.java | 10 +- .../SendMessageBatchRequestHeader.java | 20 +-- .../SendMessageBatchResponseHeader.java | 10 +- .../SendMessageBatchV2RequestHeader.java | 20 +-- .../SendMessageBatchV2ResponseHeader.java | 10 +- .../message/SendMessageRequestHeader.java | 20 +-- .../message/SendMessageResponseHeader.java | 10 +- .../common/protocol/tcp/Command.java | 124 +++++++++--------- 21 files changed, 178 insertions(+), 177 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java index d36a1ae613..65bc770efe 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java @@ -47,10 +47,10 @@ public class HttpCommand { public Body body; - //Command 请求时间 + //Command request time public long reqTime; - //Command 回复时间 + //Command response time public long resTime; public CmdType cmdType = CmdType.REQ; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatResponseBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatResponseBody.java index 0f05f512ce..9a9e95318e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatResponseBody.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatResponseBody.java @@ -27,13 +27,13 @@ public class HeartbeatResponseBody extends Body { - //响应码 + //react code private Integer retCode; - //响应信息 + //response message private String retMsg; - //回复时间 + //response time private long resTime = System.currentTimeMillis(); public Integer getRetCode() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageResponseBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageResponseBody.java index b5a2069481..7e24f5bef4 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageResponseBody.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageResponseBody.java @@ -27,13 +27,13 @@ public class ReplyMessageResponseBody extends Body { - //响应码 + //react code private Integer retCode; - //响应信息 + //react message private String retMsg; - //回复时间 + //response time private long resTime = System.currentTimeMillis(); public Integer getRetCode() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchResponseBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchResponseBody.java index cf5ad72906..a556598617 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchResponseBody.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchResponseBody.java @@ -27,13 +27,13 @@ public class SendMessageBatchResponseBody extends Body { - //响应码 + //react code private Integer retCode; - //响应信息 + //react message private String retMsg; - //回复时间 + //response time private long resTime = System.currentTimeMillis(); public Integer getRetCode() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchV2ResponseBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchV2ResponseBody.java index fd0b49a22c..abeae82e1f 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchV2ResponseBody.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchV2ResponseBody.java @@ -27,13 +27,13 @@ public class SendMessageBatchV2ResponseBody extends Body { - //响应码 + //react code private Integer retCode; - //响应信息 + //react code private String retMsg; - //回复时间 + //response time private long resTime = System.currentTimeMillis(); public Integer getRetCode() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ClientRetCode.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ClientRetCode.java index 30986efa9c..d223412f89 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ClientRetCode.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ClientRetCode.java @@ -21,6 +21,7 @@ public enum ClientRetCode { /** * 这个RETRY的意思是 客户端发现投递的消息它没有监听时, 告诉EventMesh 发往下一个, 重试几次以实现灰度 , 预留 + * The "RETRY" means:when the client finds the delivered message and it does not listen, tell EventMesh to send next,try again several times to achieve grayscale, reserve */ OK(1, "ok, SDK返回"), diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java index 12e79ad30d..28e944ea30 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java @@ -24,7 +24,7 @@ public class ProtocolKey { public static final String VERSION = "Version"; public static class ClientInstanceKey { - ////////////////////////////////////协议层请求方描述/////////// + ////////////////////////////////////Protocol layer requester description/////////// public static final String ENV = "Env"; public static final String IDC = "Idc"; public static final String SYS = "Sys"; @@ -36,7 +36,7 @@ public static class ClientInstanceKey { public static class EventMeshInstanceKey { - ///////////////////////////////////////////////协议层EventMesh描述 + ///////////////////////////////////////////////Protocol layer EventMesh description public static final String EVENTMESHCLUSTER = "EventMeshCluster"; public static final String EVENTMESHIP = "EventMeshIp"; public static final String EVENTMESHENV = "EventMeshEnv"; @@ -44,7 +44,7 @@ public static class EventMeshInstanceKey { } - //CLIENT <-> EventMesh 的 返回 + //return of CLIENT <-> EventMesh public static final String RETCODE = "retCode"; public static final String RETMSG = "retMsg"; public static final String RESTIME = "resTime"; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeader.java index 5cd07242e2..2713e8db29 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeader.java @@ -29,36 +29,36 @@ public class HeartbeatRequestHeader extends Header { - //请求码 + //request code private String code; - //请求方语言描述 + //requester language description private String language; - //请求方采用的协议版本, 默认1.0 + //protocol version adopted by requester, default:1.0 private ProtocolVersion version; - //请求方所在环境编号 + //the environment number of the requester private String env; - //请求方所在IDC + //the IDC of the requester private String idc; - //请求方的子系统 + //subsystem of the requester private String sys; - //请求方的进程号 + //PID of the requester private String pid; - //请求方的IP + //IP of the requester private String ip; private String producerGroup; - //请求方的USERNAME + //USERNAME of the requester private String username = "username"; - //请求方的PASSWD + //PASSWD of the requester private String passwd = "user@123"; public String getCode() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeader.java index 503a527ea1..54ca794d57 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeader.java @@ -26,19 +26,19 @@ public class RegResponseHeader extends Header { - //响应码, 与对应Request的code一致 + //response code, as same as the request code private int code; - //处理该次Request请求的eventMesh的集群名 + //The cluster name of the EventMesh that processes the request private String eventMeshCluster; - //处理该次Request请求的eventMesh的IP + //IP of the EventMesh that processes the request private String eventMeshIp; - //处理该次Request请求的eventMesh所在的环境编号 + //Environment number of the EventMesh that processes the request private String eventMeshEnv; - //处理该次Request请求的eventMesh所在IDC + //IDC of the EventMesh that processes the request private String eventMeshIdc; public int getCode() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeader.java index 09c3a6d378..7503c7c5ff 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeader.java @@ -29,34 +29,34 @@ public class UnRegRequestHeader extends Header { - //请求码 + //request code private String code; - //请求方语言描述 + //requester language description private String language; - //请求方采用的协议版本, 默认1.0 + //protocol version adopted by requester, default:1.0 private ProtocolVersion version; - //请求方所在环境编号 + //the environment number of the requester private String env; - //请求方所在IDC + //the IDC of the requester private String idc; - //请求方的子系统 + //subsystem of the requester private String sys; - //请求方的进程号 + //PID of the requester private String pid; - //请求方的IP + //IP of the requester private String ip; - //请求方的USERNAME + //USERNAME of the requester private String username = "username"; - //请求方的PASSWD + //PASSWD of the requester private String passwd = "user@123"; public static UnRegRequestHeader buildHeader(Map headerParam) { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageRequestHeader.java index 18f6cfc659..99a558796c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageRequestHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageRequestHeader.java @@ -29,13 +29,13 @@ public class PushMessageRequestHeader extends Header { - //请求码 + //request code private int code; - //请求方语言描述 + //requester language description private String language; - //请求方采用的协议版本, 默认1.0 + //protocol version adopted by requester, default:1.0 private ProtocolVersion version; private String eventMeshCluster; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageResponseHeader.java index 2f8fc63f70..6d61b8262e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageResponseHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageResponseHeader.java @@ -28,34 +28,34 @@ public class PushMessageResponseHeader extends Header { - //响应码 + //response code private int code; - //请求方语言描述 + //requester language description private String language; - //请求方采用的协议版本, 默认1.0 + //protocol version adopted by requester, default:1.0 private ProtocolVersion version; - //请求方所在环境编号 + //the environment number of the requester private String env; - //请求方所在IDC + //the IDC of the requester private String idc; - //请求方的子系统 + //subsystem of the requester private String sys; - //请求方的进程号 + //PID of the requester private String pid; - //请求方的IP + //IP of the requester private String ip; - //请求方的USERNAME + //USERNAME of the requester private String username = "username"; - //请求方的PASSWD + //PASSWD of the requester private String passwd = "user@123"; public String getUsername() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageRequestHeader.java index e7d347efa0..769fe4a470 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageRequestHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageRequestHeader.java @@ -29,34 +29,34 @@ public class ReplyMessageRequestHeader extends Header { - //请求码 + //request code private String code; - //请求方语言描述 + //requester language description private String language; - //请求方采用的协议版本, 默认1.0 + //protocol version adopted by requester, default:1.0 private ProtocolVersion version; - //请求方所在环境编号 + //the environment number of the requester private String env; - //请求方所在IDC + //the IDC of the requester private String idc; - //请求方的子系统 + //subsystem of the requester private String sys; - //请求方的进程号 + //PID of the requester private String pid; - //请求方的IP + //IP of the requester private String ip; - //请求方的USERNAME + //USERNAME of the requester private String username = "username"; - //请求方的PASSWD + //PASSWD of the requester private String passwd = "user@123"; public String getUsername() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageResponseHeader.java index 33fd373251..f52428e4ef 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageResponseHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageResponseHeader.java @@ -26,19 +26,19 @@ public class ReplyMessageResponseHeader extends Header { - //响应码, 与对应Request的code一致 + //response code, as same as the request code private int code; - //处理该次Request请求的eventMesh的集群名 + //The cluster name of the EventMesh that processes the request private String eventMeshCluster; - //处理该次Request请求的eventMesh的IP + //IP of the EventMesh that processes the request private String eventMeshIp; - //处理该次Request请求的eventMesh所在的环境编号 + //Environment number of the EventMesh that processes the request private String eventMeshEnv; - //处理该次Request请求的eventMesh所在IDC + //IDC of the EventMesh that processes the request private String eventMeshIdc; public int getCode() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchRequestHeader.java index f981ceb2bf..88d3c006ce 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchRequestHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchRequestHeader.java @@ -30,34 +30,34 @@ public class SendMessageBatchRequestHeader extends Header { - //请求码 + //request code private String code; - //请求方语言描述 + //requester language description private String language; - //请求方采用的协议版本, 默认1.0 + //protocol version adopted by requester, default:1.0 private ProtocolVersion version; - //请求方所在环境编号 + //the environment number of the requester private String env; - //请求方所在IDC + //the IDC of the requester private String idc; - //请求方的子系统 + //subsystem of the requester private String sys; - //请求方的进程号 + //PID of the requester private String pid; - //请求方的IP + //IP of the requester private String ip; - //请求方的USERNAME + //USERNAME of the requester private String username = "username"; - //请求方的PASSWD + //PASSWD of the requester private String passwd = "user@123"; public String getUsername() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchResponseHeader.java index d06660e5b5..8bee0b1158 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchResponseHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchResponseHeader.java @@ -26,19 +26,19 @@ public class SendMessageBatchResponseHeader extends Header { - //响应码, 与对应Request的code一致 + //response code, as same as the request code private int code; - //处理该次Request请求的eventMesh的集群名 + //The cluster name of the EventMesh that processes the request private String eventMeshCluster; - //处理该次Request请求的eventMesh的IP + //IP of the EventMesh that processes the request private String eventMeshIp; - //处理该次Request请求的eventMesh所在的环境编号 + //Environment number of the EventMesh that processes the request private String eventMeshEnv; - //处理该次Request请求的eventMesh所在IDC + //IDC of the EventMesh that processes the request private String eventMeshIdc; public int getCode() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2RequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2RequestHeader.java index 1e7468e45d..aabc2d68e5 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2RequestHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2RequestHeader.java @@ -29,34 +29,34 @@ public class SendMessageBatchV2RequestHeader extends Header { - //请求码 + //request code private String code; - //请求方语言描述 + //requester language description private String language; - //请求方采用的协议版本, 默认1.0 + //protocol version adopted by requester, default:1.0 private ProtocolVersion version; - //请求方所在环境编号 + //the environment number of the requester private String env; - //请求方所在IDC + //the IDC of the requester private String idc; - //请求方的子系统 + //subsystem of the requester private String sys; - //请求方的进程号 + //PID of the requester private String pid; - //请求方的IP + //IP of the requester private String ip; - //请求方的USERNAME + //USERNAME of the requester private String username = "username"; - //请求方的PASSWD + //PASSWD of the requester private String passwd = "user@123"; public String getUsername() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2ResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2ResponseHeader.java index 1544381b04..ed4898fd02 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2ResponseHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2ResponseHeader.java @@ -25,19 +25,19 @@ public class SendMessageBatchV2ResponseHeader extends Header { - //响应码, 与对应Request的code一致 + //response code, as same as the request code private int code; - //处理该次Request请求的eventMesh的集群名 + //The cluster name of the EventMesh that processes the request private String eventMeshCluster; - //处理该次Request请求的eventMesh的IP + //IP of the EventMesh that processes the request private String eventMeshIp; - //处理该次Request请求的eventMesh所在的环境编号 + //Environment number of the EventMesh that processes the request private String eventMeshEnv; - //处理该次Request请求的eventMesh所在IDC + //IDC of the EventMesh that processes the request private String eventMeshIdc; public int getCode() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageRequestHeader.java index 8512529baa..8745fe157c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageRequestHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageRequestHeader.java @@ -29,34 +29,34 @@ public class SendMessageRequestHeader extends Header { - //请求码 + //request code private String code; - //请求方语言描述 + //requester language description private String language; - //请求方采用的协议版本, 默认1.0 + //protocol version adopted by requester, default:1.0 private ProtocolVersion version; - //请求方所在环境编号 + //the environment number of the requester private String env; - //请求方所在IDC + //the IDC of the requester private String idc; - //请求方的子系统 + //subsystem of the requester private String sys; - //请求方的进程号 + //PID of the requester private String pid; - //请求方的IP + //IP of the requester private String ip; - //请求方的USERNAME + //USERNAME of the requester private String username = "username"; - //请求方的PASSWD + //PASSWD of the requester private String passwd = "user@123"; public String getUsername() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageResponseHeader.java index caa5eed8ad..aa10c2a8c3 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageResponseHeader.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageResponseHeader.java @@ -26,19 +26,19 @@ public class SendMessageResponseHeader extends Header { - //响应码, 与对应Request的code一致 + //response code, as same as the request code private int code; - //处理该次Request请求的eventMesh的集群名 + //The cluster name of the EventMesh that processes the request private String eventMeshCluster; - //处理该次Request请求的eventMesh的IP + //IP of the EventMesh that processes the request private String eventMeshIp; - //处理该次Request请求的eventMesh所在的环境编号 + //Environment number of the EventMesh that processes the request private String eventMeshEnv; - //处理该次Request请求的eventMesh所在IDC + //IDC of the EventMesh that processes the request private String eventMeshIdc; public int getCode() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Command.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Command.java index 54d75b4a61..6381f29413 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Command.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Command.java @@ -19,70 +19,70 @@ public enum Command { - //心跳 - HEARTBEAT_REQUEST(0), //client发给server的心跳包 - HEARTBEAT_RESPONSE(1), //server回复client的心跳包 - - //握手 - HELLO_REQUEST(2), //client发给server的握手请求 - HELLO_RESPONSE(3), //server回复client的握手请求 - - //断连 - CLIENT_GOODBYE_REQUEST(4), //client主动断连时通知server - CLIENT_GOODBYE_RESPONSE(5), //server回复client的主动断连通知 - SERVER_GOODBYE_REQUEST(6), //server主动断连时通知client - SERVER_GOODBYE_RESPONSE(7), //client回复server的主动断连通知 - - //订阅管理 - SUBSCRIBE_REQUEST(8), //client发给server的订阅请求 - SUBSCRIBE_RESPONSE(9), //server回复client的订阅请求 - UNSUBSCRIBE_REQUEST(10), //client发给server的取消订阅请求 - UNSUBSCRIBE_RESPONSE(11), //server回复client的取消订阅请求 - - //监听 - LISTEN_REQUEST(12), //client发给server的启动topic监听的请求 - LISTEN_RESPONSE(13), //server回复client的监听请求 + //heartbeat + HEARTBEAT_REQUEST(0), //client send heartbeat packet to server + HEARTBEAT_RESPONSE(1), //server response heartbeat packet of client + + //handshake + HELLO_REQUEST(2), //client send handshake request to server + HELLO_RESPONSE(3), //server response handshake request of client + + //disconnection + CLIENT_GOODBYE_REQUEST(4), //Notify server when client actively disconnects + CLIENT_GOODBYE_RESPONSE(5), //Server replies to client's active disconnection notification + SERVER_GOODBYE_REQUEST(6), //Notify client when server actively disconnects + SERVER_GOODBYE_RESPONSE(7), //Client replies to server's active disconnection notification + + //subscription management + SUBSCRIBE_REQUEST(8), //Subscription request sent by client to server + SUBSCRIBE_RESPONSE(9), //Server replies to client's subscription request + UNSUBSCRIBE_REQUEST(10), //Unsubscribe request from client to server + UNSUBSCRIBE_RESPONSE(11), //Server replies to client's unsubscribe request + + //monitor + LISTEN_REQUEST(12), //Request from client to server to start topic listening + LISTEN_RESPONSE(13), //The server replies to the client's listening request //RR - REQUEST_TO_SERVER(14), //client将RR请求发送给server - REQUEST_TO_CLIENT(15), //server将RR请求推送给client - REQUEST_TO_CLIENT_ACK(16), //client收到RR请求后ACK给server - RESPONSE_TO_SERVER(17), //client将RR回包发送给server - RESPONSE_TO_CLIENT(18), //server将RR回包推送给client - RESPONSE_TO_CLIENT_ACK(19), //client收到回包后ACK给server - - //异步事件 - ASYNC_MESSAGE_TO_SERVER(20), //client将异步事件发送给server - ASYNC_MESSAGE_TO_SERVER_ACK(21), //server收到异步事件后ACK给client - ASYNC_MESSAGE_TO_CLIENT(22), //server将异步事件推送给client - ASYNC_MESSAGE_TO_CLIENT_ACK(23), //client收到异步事件后ACK给server - - //广播 - BROADCAST_MESSAGE_TO_SERVER(24), //client将广播消息发送给server - BROADCAST_MESSAGE_TO_SERVER_ACK(25), //server收到广播消息后ACK给client - BROADCAST_MESSAGE_TO_CLIENT(26), //server将广播消息推送给client - BROADCAST_MESSAGE_TO_CLIENT_ACK(27), //client收到广播消息后ACK给server - - //日志上报 - SYS_LOG_TO_LOGSERVER(28), //业务日志上报 - - //RMB跟踪日志上报 - TRACE_LOG_TO_LOGSERVER(29), //RMB跟踪日志上报 - - //重定向指令 - REDIRECT_TO_CLIENT(30), //server将重定向指令推动给client - - //服务注册 - REGISTER_REQUEST(31), //client发送注册请求给server - REGISTER_RESPONSE(32), //server将注册结果给client - - //服务去注册 - UNREGISTER_REQUEST(33), //client发送去注册请求给server - UNREGISTER_RESPONSE(34), //server将去注册结果给client - - //client询问EventMesh推荐连哪个EventMesh - RECOMMEND_REQUEST(35), //client发送推荐请求给server - RECOMMEND_RESPONSE(36); //server将推荐结果给client + REQUEST_TO_SERVER(14), //The client sends the RR request to the server + REQUEST_TO_CLIENT(15), //The server pushes the RR request to the client + REQUEST_TO_CLIENT_ACK(16), //After receiving RR request, the client sends ACK to the server + RESPONSE_TO_SERVER(17), //The client sends the RR packet back to the server + RESPONSE_TO_CLIENT(18), //The server pushes the RR packet back to the client + RESPONSE_TO_CLIENT_ACK(19), //After receiving the return packet, the client sends ACK to the server + + //Asynchronous events + ASYNC_MESSAGE_TO_SERVER(20), //The client sends asynchronous events to the server + ASYNC_MESSAGE_TO_SERVER_ACK(21), //After receiving the asynchronous event, the server sends ack to the client + ASYNC_MESSAGE_TO_CLIENT(22), //The server pushes asynchronous events to the client + ASYNC_MESSAGE_TO_CLIENT_ACK(23), //After the client receives the asynchronous event, the ACK is sent to the server + + //radio broadcast + BROADCAST_MESSAGE_TO_SERVER(24), //The client sends the broadcast message to the server + BROADCAST_MESSAGE_TO_SERVER_ACK(25), //After receiving the broadcast message, the server sends ACK to the client + BROADCAST_MESSAGE_TO_CLIENT(26), //The server pushes the broadcast message to the client + BROADCAST_MESSAGE_TO_CLIENT_ACK(27), //After the client receives the broadcast message, the ACK is sent to the server + + //Log reporting + SYS_LOG_TO_LOGSERVER(28), //Business log reporting + + //RMB tracking log reporting + TRACE_LOG_TO_LOGSERVER(29), //RMB tracking log reporting + + //Redirecting instruction + REDIRECT_TO_CLIENT(30), //The server pushes the redirection instruction to the client + + //service register + REGISTER_REQUEST(31), //Client sends registration request to server + REGISTER_RESPONSE(32), //The server sends the registration result to the client + + //service unregister + UNREGISTER_REQUEST(33), //The client sends a de registration request to the server + UNREGISTER_RESPONSE(34), //The server will register the result to the client + + //The client asks which EventMesh to recommend + RECOMMEND_REQUEST(35), //Client sends recommendation request to server + RECOMMEND_RESPONSE(36); //The server will recommend the results to the client private final byte value; From 64e1331f32623e89f9eb4caec0c2daad15351d08 Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Mon, 12 Jul 2021 17:17:08 +0800 Subject: [PATCH 02/16] Translation improvement --- .../protocol/http/body/client/HeartbeatResponseBody.java | 2 +- .../protocol/http/body/message/ReplyMessageResponseBody.java | 4 ++-- .../http/body/message/SendMessageBatchResponseBody.java | 4 ++-- .../http/body/message/SendMessageBatchV2ResponseBody.java | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatResponseBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatResponseBody.java index 9a9e95318e..2d213ce017 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatResponseBody.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatResponseBody.java @@ -27,7 +27,7 @@ public class HeartbeatResponseBody extends Body { - //react code + //return code private Integer retCode; //response message diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageResponseBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageResponseBody.java index 7e24f5bef4..2f9ba960d3 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageResponseBody.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageResponseBody.java @@ -27,10 +27,10 @@ public class ReplyMessageResponseBody extends Body { - //react code + //return code private Integer retCode; - //react message + //response message private String retMsg; //response time diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchResponseBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchResponseBody.java index a556598617..bdf8004de0 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchResponseBody.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchResponseBody.java @@ -27,10 +27,10 @@ public class SendMessageBatchResponseBody extends Body { - //react code + //return code private Integer retCode; - //react message + //response message private String retMsg; //response time diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchV2ResponseBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchV2ResponseBody.java index abeae82e1f..a2f69fb84f 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchV2ResponseBody.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchV2ResponseBody.java @@ -27,10 +27,10 @@ public class SendMessageBatchV2ResponseBody extends Body { - //react code + //return code private Integer retCode; - //react code + //response message private String retMsg; //response time From 4b82183c8b85ccdbb6a10af7245d17e6294495bf Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Wed, 14 Jul 2021 13:39:36 +0800 Subject: [PATCH 03/16] Translation improvement --- .../eventmesh/common/protocol/http/common/ClientRetCode.java | 1 - 1 file changed, 1 deletion(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ClientRetCode.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ClientRetCode.java index d223412f89..47f781b940 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ClientRetCode.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ClientRetCode.java @@ -20,7 +20,6 @@ public enum ClientRetCode { /** - * 这个RETRY的意思是 客户端发现投递的消息它没有监听时, 告诉EventMesh 发往下一个, 重试几次以实现灰度 , 预留 * The "RETRY" means:when the client finds the delivered message and it does not listen, tell EventMesh to send next,try again several times to achieve grayscale, reserve */ From fad038172f17d8bc41bb3be38e10bfa909ef73ca Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Wed, 14 Jul 2021 14:27:41 +0800 Subject: [PATCH 04/16] fix Chinese annotation on runtime module --- eventmesh-runtime/bin/start.sh | 8 ++++---- eventmesh-runtime/bin/stop.sh | 6 +++--- .../core/protocol/http/consumer/ConsumerManager.java | 4 ++-- .../http/processor/ReplyMessageProcessor.java | 2 +- .../protocol/http/processor/SubscribeProcessor.java | 6 +++--- .../http/processor/UnSubscribeProcessor.java | 8 ++++---- .../group/dispatch/DownstreamDispatchStrategy.java | 2 +- .../session/push/retry/EventMeshTcpRetryer.java | 4 ++-- .../runtime/metrics/http/SummaryMetrics.java | 4 ++-- .../runtime/metrics/http/TcpSummaryMetrics.java | 2 +- .../apache/eventmesh/runtime/util/EventMeshUtil.java | 4 ++-- .../org/apache/eventmesh/runtime/util/Utils.java | 12 ++++++------ .../src/test/java/client/common/TCPClient.java | 4 ++-- .../src/test/java/client/hook/ReceiveMsgHook.java | 2 +- .../src/test/java/client/impl/PubClientImpl.java | 10 +++++----- .../src/test/java/client/impl/SubClientImpl.java | 2 +- .../src/test/java/demo/CClientDemo.java | 6 +++--- 17 files changed, 43 insertions(+), 43 deletions(-) diff --git a/eventmesh-runtime/bin/start.sh b/eventmesh-runtime/bin/start.sh index 9c0724bdd0..5be0636166 100644 --- a/eventmesh-runtime/bin/start.sh +++ b/eventmesh-runtime/bin/start.sh @@ -35,7 +35,7 @@ # Java Environment Setting #=========================================================================================== set -e -#服务器配置可能不一致,增加这些配置避免乱码问题 +#Server configuration may be inconsistent, add these configurations to avoid garbled code problems export LANG=en_US.UTF-8 export LC_CTYPE=en_US.UTF-8 export LC_ALL=en_US.UTF-8 @@ -69,13 +69,13 @@ function get_pid { ppid=$(cat ${EVENTMESH_HOME}/bin/pid.file) else if [[ $OS =~ Msys ]]; then - # 在Msys上存在可能无法kill识别出的进程的BUG + # There is a Bug on Msys that may not be able to kill the identified process ppid=`jps -v | grep -i "org.apache.eventmesh.runtime.boot.EventMeshStartup" | grep java | grep -v grep | awk -F ' ' {'print $1'}` elif [[ $OS =~ Darwin ]]; then - # 已知问题:grep java 可能无法精确识别java进程 + # Known problem: grep Java may not be able to accurately identify Java processes ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.runtime.boot.EventMeshStartup" | grep -Ev "^root" |awk -F ' ' {'print $2'}) else - #在Linux服务器上要求尽可能精确识别进程 + # It is required to identify the process as accurately as possible on Linux ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_HOME | grep -i "org.apache.eventmesh.runtime.boot.EventMeshStartup" | grep -Ev "^root" |awk -F ' ' {'print $2'}) fi fi diff --git a/eventmesh-runtime/bin/stop.sh b/eventmesh-runtime/bin/stop.sh index 4897c77612..ff5b27ccda 100644 --- a/eventmesh-runtime/bin/stop.sh +++ b/eventmesh-runtime/bin/stop.sh @@ -30,13 +30,13 @@ function get_pid { ppid=$(cat ${EVENTMESH_HOME}/bin/pid.file) else if [[ $OS =~ Msys ]]; then - # 在Msys上存在可能无法kill识别出的进程的BUG + # There is a Bug on Msys that may not be able to kill the identified process ppid=`jps -v | grep -i "org.apache.eventmesh.runtime.boot.EventMeshStartup" | grep java | grep -v grep | awk -F ' ' {'print $1'}` elif [[ $OS =~ Darwin ]]; then - # 已知问题:grep java 可能无法精确识别java进程 + # Known problem: grep Java may not be able to accurately identify Java processes ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.runtime.boot.EventMeshStartup" | grep -Ev "^root" |awk -F ' ' {'print $2'}) else - #在Linux服务器上要求尽可能精确识别进程 + # It is required to identify the process as accurately as possible on Linux ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_HOME | grep -i "org.apache.eventmesh.runtime.boot.EventMeshStartup" | grep -Ev "^root" |awk -F ' ' {'print $2'}) fi fi diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java index 51ede0bdd8..88574811b2 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java @@ -81,7 +81,7 @@ public void run() { boolean isChange = false; while (clientIterator.hasNext()) { Client client = clientIterator.next(); - //时间差大于3次心跳周期 + //The time difference is greater than 3 heartbeat cycles if (System.currentTimeMillis() - client.lastUpTime.getTime() > DEFAULT_UPDATE_TIME) { logger.warn("client {} lastUpdate time {} over three heartbeat cycles", JSONObject.toJSONString(client), client.lastUpTime); @@ -152,7 +152,7 @@ public void run() { } /** - * notify ConsumerManager 组级别 + * notify ConsumerManager groupLevel */ public void notifyConsumerManager(String consumerGroup, ConsumerGroupConf latestConsumerGroupConfig, ConcurrentHashMap localConsumerGroupMapping) throws Exception { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java index 5d8745b611..6b86b2ecd4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java @@ -80,7 +80,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC); - //HEADER校验 + //validate HEADER if (StringUtils.isBlank(replyMessageRequestHeader.getIdc()) || StringUtils.isBlank(replyMessageRequestHeader.getPid()) || !StringUtils.isNumeric(replyMessageRequestHeader.getPid()) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java index e22a86c7c1..1e8daf0ee9 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java @@ -126,7 +126,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext } ConsumerGroupConf consumerGroupConf = eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup); if (consumerGroupConf == null) { - // 新订阅 + // new subscription consumerGroupConf = new ConsumerGroupConf(consumerGroup); ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf(); consumeTopicConfig.setConsumerGroup(consumerGroup); @@ -140,7 +140,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext map.put(subTopic.getTopic(), consumeTopicConfig); consumerGroupConf.setConsumerGroupTopicConf(map); } else { - // 已有订阅 + // already subscribed Map map = consumerGroupConf.getConsumerGroupTopicConf(); for (String key : map.keySet()) { if (StringUtils.equals(subTopic.getTopic(), key)) { @@ -163,7 +163,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext long startTime = System.currentTimeMillis(); try { - // 订阅关系变化通知 + // subscription relationship change notification eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup), eventMeshHTTPServer.localConsumerGroupMapping); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java index d45781293b..e5feb7619b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java @@ -141,7 +141,7 @@ public void onResponse(HttpCommand httpCommand) { Map> idcUrls = new HashMap<>(); Set clientUrls = new HashSet<>(); for (Client client : groupTopicClients) { - // 去除订阅的url + // remove subscribed url if (!StringUtils.equals(unSubscribeUrl, client.url)) { clientUrls.add(client.url); if (idcUrls.containsKey(client.idc)) { @@ -158,7 +158,7 @@ public void onResponse(HttpCommand httpCommand) { ConsumerGroupConf consumerGroupConf = eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup); Map map = consumerGroupConf.getConsumerGroupTopicConf(); for (String topicKey : map.keySet()) { - // 仅修改去订阅的topic + // only modify the topic to subscribe if (StringUtils.equals(unSubTopic, topicKey)) { ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf(); latestTopicConf.setConsumerGroup(consumerGroup); @@ -209,9 +209,9 @@ public void onResponse(HttpCommand httpCommand) { responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg()); asyncContext.onComplete(responseEventMeshCommand, handler); - // 清理ClientInfo + // clean ClientInfo eventMeshHTTPServer.localClientInfoMapping.keySet().removeIf(s -> StringUtils.contains(s, consumerGroup)); - // 清理ConsumerGroupInfo + // clean ConsumerGroupInfo eventMeshHTTPServer.localConsumerGroupMapping.keySet().removeIf(s -> StringUtils.equals(consumerGroup, s)); } catch (Exception e) { HttpCommand err = asyncContext.getRequest().createHttpCommandResponse( diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/dispatch/DownstreamDispatchStrategy.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/dispatch/DownstreamDispatchStrategy.java index 27169ff47a..fe0e589620 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/dispatch/DownstreamDispatchStrategy.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/dispatch/DownstreamDispatchStrategy.java @@ -24,7 +24,7 @@ public interface DownstreamDispatchStrategy { /** - * 选择一个SESSION + * select a SESSION * * @param group * @param consumeSessions diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java index 57165a9d85..aeec1dbb32 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java @@ -132,10 +132,10 @@ private void retryHandle(DownStreamMsgContext downStreamMsgContext) { if (rechoosen == null) { logger.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)); -// //需要手动ack掉没有下发成功的消息 +// //Need to manually ack the message that did not send a successful message // eventMeshAckMsg(downStreamMsgContext); -// //重试找不到下发session不再回发broker或者重试其它eventMesh +// //Retry cannot find the delivered session, no longer post back to the broker or retry other event Mesh // String bizSeqNo = finalDownStreamMsgContext.msgExt.getKeys(); // String uniqueId = MapUtils.getString(finalDownStreamMsgContext.msgExt.getProperties(), WeMQConstant.RMB_UNIQ_ID, ""); // if(EventMeshTCPServer.getAccessConfiguration().eventMeshTcpSendBackEnabled){ diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/SummaryMetrics.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/SummaryMetrics.java index 01ef0439cd..8a8a0e7611 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/SummaryMetrics.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/SummaryMetrics.java @@ -59,14 +59,14 @@ private float avg(LinkedList linkedList) { } //////////////////////////////////////////////////////////////////////////////// - public static final String EVENTMESH_MONITOR_FORMAT_HTTP = "{\"maxHTTPTPS\":\"%.1f\",\"avgHTTPTPS\":\"%.1f\"," + //EVENTMESH 接受外部HTTP 请求的TPS相关 + public static final String EVENTMESH_MONITOR_FORMAT_HTTP = "{\"maxHTTPTPS\":\"%.1f\",\"avgHTTPTPS\":\"%.1f\"," + //EVENTMESH tps related to accepting external http requests "\"maxHTTPCOST\":\"%s\",\"avgHTTPCOST\":\"%.1f\",\"avgHTTPBodyDecodeCost\":\"%.1f\", \"httpDiscard\":\"%s\"}"; private float wholeCost = 0f; private AtomicLong wholeRequestNum = new AtomicLong(0); - //累计值 + //cumulative value private AtomicLong httpDiscard = new AtomicLong(0); private AtomicLong maxCost = new AtomicLong(0); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/TcpSummaryMetrics.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/TcpSummaryMetrics.java index 4c69c67729..ce5944d46c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/TcpSummaryMetrics.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/TcpSummaryMetrics.java @@ -58,7 +58,7 @@ private float avg(LinkedList linkedList) { } //////////////////////////////////////////////////////////////////////////////// - public static final String EVENTMESH_MONITOR_FORMAT_HTTP = "%15s : {\"maxHTTPTPS\":\"%.1f\",\"avgHTTPTPS\":\"%.1f\"," + //EVENTMESH 接受外部HTTP 请求的TPS相关 + public static final String EVENTMESH_MONITOR_FORMAT_HTTP = "%15s : {\"maxHTTPTPS\":\"%.1f\",\"avgHTTPTPS\":\"%.1f\"," + //EVENTMESH tps related to accepting external http requests "\"maxHTTPCOST\":\"%s\",\"avgHTTPCOST\":\"%.1f\",\"avgHTTPBodyDecodeCost\":\"%.1f\"}"; private float wholeCost = 0f; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java index eca44b9ee6..1d0cd9cb85 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java @@ -80,7 +80,7 @@ public static String buildClientGroup(String systemId) { } /** - * 自定义取堆栈 + * custom fetch stack * * @param e * @return @@ -119,7 +119,7 @@ public static ObjectMapper createJsoner() { /** - * 打印mq消息的一部分内容 + * print part of the mq message * * @param eventMeshMessage * @return diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java index f3596e6a74..cf97d05a8f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java @@ -38,7 +38,7 @@ public class Utils { private final static Logger messageLogger = LoggerFactory.getLogger("message"); /** - * 用于向客户端发送消息 + * used to send messages to the client * * @param pkg * @param startTime @@ -75,7 +75,7 @@ public void operationComplete(ChannelFuture future) throws Exception { } /** - * 打印发送失败的消息流水 + * print the message flow of failed sending * * @param future * @param pkg @@ -97,7 +97,7 @@ private static void logFailedMessageFlow(Package pkg, UserAgent user, long start } /** - * 打印发送发成的消息流水 + * print the message flow of successful sending * * @param pkg * @param user @@ -147,7 +147,7 @@ public static void logSucceedMessageFlow(Package pkg, UserAgent user, long start // } /** - * 打印mq消息的一部分内容 + * print part of the mq message * * @param eventMeshMessage * @return @@ -166,7 +166,7 @@ public static String printMqMessage(EventMeshMessage eventMeshMessage) { } /** - * 打印mq消息的一部分内容 + * print part of the mq message * * @param message * @return @@ -180,7 +180,7 @@ public static String printMqMessage(EventMeshMessage eventMeshMessage) { // } /** - * 根据topic获取serviceId + * get serviceId according to topic */ public static String getServiceId(String topic) { String[] topicStrArr = topic.split("-"); diff --git a/eventmesh-runtime/src/test/java/client/common/TCPClient.java b/eventmesh-runtime/src/test/java/client/common/TCPClient.java index c0068de8eb..410cf3d4b9 100644 --- a/eventmesh-runtime/src/test/java/client/common/TCPClient.java +++ b/eventmesh-runtime/src/test/java/client/common/TCPClient.java @@ -49,8 +49,8 @@ /** - * 一个Client连一个ACCESS - * 提供最基础的连接, send能力, 不能提供断线重连能力, 该业务是具备请求依赖的,如果提供了断线重连能力,会引起业务上的无感知,即不会走业务上的重连逻辑 + * one Client connects one ACCESS + * Provides the most basic connection, send capability, and cannot provide disconnected reconnection capability, The service is request-dependent. If the disconnection and reconnection capability is provided, it will cause business insensitivity, that is, it will not follow the business reconnection logic. */ public abstract class TCPClient implements Closeable { diff --git a/eventmesh-runtime/src/test/java/client/hook/ReceiveMsgHook.java b/eventmesh-runtime/src/test/java/client/hook/ReceiveMsgHook.java index 6aca270f46..4239bcec34 100644 --- a/eventmesh-runtime/src/test/java/client/hook/ReceiveMsgHook.java +++ b/eventmesh-runtime/src/test/java/client/hook/ReceiveMsgHook.java @@ -22,7 +22,7 @@ import org.apache.eventmesh.common.protocol.tcp.Package; /** - * 业务回调钩子, 这是针对所有类型的消息都会进行的回调 + * Business callback hook, which is a callback for all types of messages */ public interface ReceiveMsgHook { void handle(Package msg, ChannelHandlerContext ctx); diff --git a/eventmesh-runtime/src/test/java/client/impl/PubClientImpl.java b/eventmesh-runtime/src/test/java/client/impl/PubClientImpl.java index ee01069dc1..a723471076 100644 --- a/eventmesh-runtime/src/test/java/client/impl/PubClientImpl.java +++ b/eventmesh-runtime/src/test/java/client/impl/PubClientImpl.java @@ -111,7 +111,7 @@ private void hello() throws Exception { } /** - * 发送RR消息 + * send RR message */ @Override public Package rr(Package msg, long timeout) throws Exception { @@ -120,7 +120,7 @@ public Package rr(Package msg, long timeout) throws Exception { } /** - * 在原本的IO基础上增加测试用例的断言 + * Add test case assertions on the basis of the original IO */ public Package dispatcher(Package request, long timeout) throws Exception { Assert.assertNotNull(request); @@ -157,7 +157,7 @@ public Package dispatcher(Package request, long timeout) throws Exception { } /** - * 发送事件消息, 有返回值是ACCESS 给了ACK + * Send an event message, the return value is ACCESS and ACK is given */ public Package publish(Package msg, long timeout) throws Exception { publogger.info("PubClientImpl|{}|publish|send|command={}|msg={}", clientNo, msg.getHeader().getCommand(), msg); @@ -165,7 +165,7 @@ public Package publish(Package msg, long timeout) throws Exception { } /** - * 发送广播消息 + * send broadcast message */ public Package broadcast(Package msg, long timeout) throws Exception { publogger.info("PubClientImpl|{}|broadcast|send|type={}|msg={}", clientNo, msg.getHeader().getCommand(), msg); @@ -187,7 +187,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep callback.handle(msg, ctx); } /** - * RR发送接受回包, 并Ack + * RR send and accept the return packet ,and Ack */ if (cmd == Command.RESPONSE_TO_CLIENT) { Package responseToClientAck = MessageUtils.responseToClientAck(msg); diff --git a/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java b/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java index e77171558c..f62f43c58b 100644 --- a/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java +++ b/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java @@ -224,7 +224,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep System.err.println("server goodby request: ---------------------------" + msg.toString()); close(); } else { - //控制指令集 + //control instruction set RequestContext context = contexts.get(RequestContext._key(msg)); if (context != null) { contexts.remove(context.getKey()); diff --git a/eventmesh-runtime/src/test/java/demo/CClientDemo.java b/eventmesh-runtime/src/test/java/demo/CClientDemo.java index a0ade78c96..c03fff972e 100644 --- a/eventmesh-runtime/src/test/java/demo/CClientDemo.java +++ b/eventmesh-runtime/src/test/java/demo/CClientDemo.java @@ -31,7 +31,7 @@ import client.impl.EventMeshClientImpl; /** - * SIMPLE客户端使用样例 + * simple client usage example */ public class CClientDemo { @@ -71,9 +71,9 @@ public void handle(Package msg, ChannelHandlerContext ctx) { }); for (int i = 0; i < 10000; i++) { // ThreadUtil.randomSleep(0,200); - //广播消息 + //broadcast message client.broadcast(MessageUtils.broadcastMessage("TEST-TOPIC-TCP-BROADCAST", i), 5000); - //异步消息 + //asynchronous message client.publish(MessageUtils.asyncMessage(ASYNC_TOPIC, i), 5000); } // From 0cab2ce2f0bee4d7dc001baedebb17fb94414372 Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Sun, 25 Jul 2021 17:18:09 +0800 Subject: [PATCH 05/16] Export metrics data with open telemetry and use Prometheus for visual observation --- eventmesh-runtime/build.gradle | 13 ++++- .../metrics/http/HTTPMetricsServer.java | 8 +++ .../openTelemetry/OpenTelemetryExporter.java | 56 ++++++++++++++++++ .../OpenTelemetryExporterConfiguration.java | 57 +++++++++++++++++++ .../runtime/metrics/openTelemetry/README.md | 41 +++++++++++++ .../metrics/openTelemetry/prometheus.yml | 30 ++++++++++ 6 files changed, 202 insertions(+), 3 deletions(-) create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/prometheus.yml diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index e5bb065659..a4aebf066d 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -23,7 +23,14 @@ List metrics = [ "io.dropwizard.metrics:metrics-json:4.1.0" ] - +List open_telemetry = [ + "io.opentelemetry:opentelemetry-api:1.3.0", + "io.opentelemetry:opentelemetry-sdk:1.3.0", + "io.opentelemetry:opentelemetry-sdk-metrics:1.3.0-alpha", + "io.opentelemetry:opentelemetry-exporter-prometheus:1.3.0-alpha", + "io.prometheus:simpleclient:0.8.1", + "io.prometheus:simpleclient_httpserver:0.8.1" +] List open_message = [ "io.openmessaging:openmessaging-api:2.2.1-pubsub" @@ -31,6 +38,6 @@ List open_message = [ dependencies { - implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common"),project(":eventmesh-spi") - testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api"),project(":eventmesh-spi") + implementation metrics,open_telemetry,open_message,project(":eventmesh-connector-api"),project(":eventmesh-common"),project(":eventmesh-spi") + testImplementation metrics,open_telemetry,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api"),project(":eventmesh-spi") } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java index 3e333a0446..f9d38e9ea2 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java @@ -26,6 +26,7 @@ import com.codahale.metrics.MetricRegistry; import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; +import org.apache.eventmesh.runtime.metrics.openTelemetry.OpenTelemetryExporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +44,8 @@ public class HTTPMetricsServer { public GroupMetrics groupMetrics; + public OpenTelemetryExporter openTelemetryExporter; + private Logger httpLogger = LoggerFactory.getLogger("httpMonitor"); private Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -56,10 +59,14 @@ public void init() throws Exception { topicMetrics = new TopicMetrics(this.eventMeshHTTPServer, this.metricRegistry); groupMetrics = new GroupMetrics(this.eventMeshHTTPServer, this.metricRegistry); healthMetrics = new HealthMetrics(this.eventMeshHTTPServer, this.metricRegistry); + + openTelemetryExporter = new OpenTelemetryExporter(summaryMetrics); + logger.info("HTTPMetricsServer inited......"); } public void start() throws Exception { + openTelemetryExporter.start(); metricsSchedule.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -90,6 +97,7 @@ public void run() { public void shutdown() throws Exception { metricsSchedule.shutdown(); + openTelemetryExporter.shutdown(); logger.info("HTTPMetricsServer shutdown......"); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java new file mode 100644 index 0000000000..e4b4934eb5 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java @@ -0,0 +1,56 @@ +package org.apache.eventmesh.runtime.metrics.openTelemetry; + +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.api.metrics.common.Labels; +import org.apache.eventmesh.runtime.metrics.http.SummaryMetrics; + +/** + * test + */ +public class OpenTelemetryExporter { + OpenTelemetryExporterConfiguration configuration = new OpenTelemetryExporterConfiguration(); + + private SummaryMetrics summaryMetrics; + + private Meter meter; + + public OpenTelemetryExporter(SummaryMetrics summaryMetrics) { + this.summaryMetrics = summaryMetrics; + + // it is important to initialize the OpenTelemetry SDK as early as possible in your process. + MeterProvider meterProvider = configuration.initializeOpenTelemetry(); + + meter = meterProvider.get("OpenTelemetryExporter", "0.13.1"); + } + + public void start(){ + //maxHTTPTPS + meter + .doubleValueObserverBuilder("max.HTTP.TPS") + .setDescription("max TPS of HTTP") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.maxHTTPTPS(),Labels.empty())) + .build(); + + //maxHTTPCost + meter + .longValueObserverBuilder("max.HTTPCost") + .setDescription("max cost of HTTP") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.maxHTTPCost(), Labels.empty())) + .build(); + + //avgHTTPCost + meter + .doubleValueObserverBuilder("avg.HTTPCost") + .setDescription("avg cost of HTTP") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.avgHTTPCost(), Labels.empty())) + .build(); + } + + public void shutdown(){ + configuration.shutdownPrometheusEndpoint(); + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java new file mode 100644 index 0000000000..41f96148ae --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.metrics.openTelemetry; + +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.prometheus.PrometheusCollector; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.prometheus.client.exporter.HTTPServer; + +import java.io.IOException; + +//ues openTelemetry to export metrics data +public class OpenTelemetryExporterConfiguration { + + private HTTPServer server;//Prometheus server + + int prometheusPort = 19090;//the endpoint to export metrics + + /** + * Initializes the Meter SDK and configures the prometheus collector with all default settings. + * + * + * @return A MeterProvider for use in instrumentation. + */ + public MeterProvider initializeOpenTelemetry() { + SdkMeterProvider meterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); + + PrometheusCollector.builder().setMetricProducer(meterProvider).buildAndRegister(); + + try { + server = new HTTPServer(prometheusPort,true);//使用守护线程启动一个 HTTP 服务器,为默认的 Prometheus 注册表提供服务。 + } catch (IOException e) { + e.printStackTrace(); + } + + return meterProvider; + } + + public void shutdownPrometheusEndpoint() { + server.stop(); + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md new file mode 100644 index 0000000000..a5e77c339e --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md @@ -0,0 +1,41 @@ +# Open Telemetry exporter + +we can use Prometheus UI to see the metrics exported by openTelemetry + +# How to run Prometheus + +download Prometheus from https://prometheus.io/download/ +remember to fix the [prometheus.yml](prometheus.yml) + +--- +or use docker +Start Prometheus instance with a configuration that sets up a HTTP collection job for ```127.0.0.1:19090``` + +See [prometheus.yml](prometheus.yml) + +```shell script +docker run --network="host" --rm -it \ + --name prometheus \ + -v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml \ + prom/prometheus + +``` + +you can run the quickstart and open the Prometheus UI: +http://localhost:9090/graph?g0.expr=max_HTTPCost&g0.tab=1&g0.stacked=0&g0.show_exemplars=0&g0.range_input=1h + + +search the key word: + +*max_HTTP_TPS* + +*max_HTTPCost* + +*avg_HTTPCost* + +## special explanation +Prometheus runs on port 9090,Open telemetry exports data to port 19090,Prometheus will collect data from port 19090 + +the exporter is exporting the data in 'SummaryMetrics'(package org.apache.eventmesh.runtime.metrics.http;) + +The export mechanism I set is to export every 3 seconds, because QuickStart only has httpcost at a short time. If the interval is set too long, it will always be 0. In practical application, I think it should be set to more than 30 seconds diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/prometheus.yml b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/prometheus.yml new file mode 100644 index 0000000000..9bedde5b93 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/prometheus.yml @@ -0,0 +1,30 @@ +global: + scrape_interval: 15s + scrape_timeout: 10s + evaluation_interval: 15s +alerting: + alertmanagers: + - static_configs: + - targets: [] + scheme: http + timeout: 10s + api_version: v1 +scrape_configs: +- job_name: prometheus + honor_timestamps: true + scrape_interval: 15s + scrape_timeout: 10s + metrics_path: /metrics + scheme: http + static_configs: + - targets: + - localhost:9090 +- job_name: EventMesh_HTTP_export_test + honor_timestamps: true + scrape_interval: 15s + scrape_timeout: 10s + metrics_path: /metrics + scheme: http + static_configs: + - targets: + - 127.0.0.1:19090 From f5a79c2031ffe1d6a2ec20d84efd43ab542fed58 Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Sun, 25 Jul 2021 19:42:19 +0800 Subject: [PATCH 06/16] Export metrics data with open telemetry and use Prometheus for visual observation --- .../openTelemetry/OpenTelemetryExporter.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java index e4b4934eb5..21eba556c5 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.runtime.metrics.openTelemetry; import io.opentelemetry.api.metrics.Meter; From 16d9bca95a45673ad518575f2328a7976552b88e Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Sun, 25 Jul 2021 19:48:16 +0800 Subject: [PATCH 07/16] Export metrics data with open telemetry and use Prometheus for visual observation --- .../openTelemetry/OpenTelemetryExporterConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java index 41f96148ae..ff9653650b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java @@ -43,7 +43,7 @@ public MeterProvider initializeOpenTelemetry() { PrometheusCollector.builder().setMetricProducer(meterProvider).buildAndRegister(); try { - server = new HTTPServer(prometheusPort,true);//使用守护线程启动一个 HTTP 服务器,为默认的 Prometheus 注册表提供服务。 + server = new HTTPServer(prometheusPort,true);//Use the daemon thread to start an HTTP server to serve the default Prometheus registry. } catch (IOException e) { e.printStackTrace(); } From 02df827c70d2441de5ba2534aa26a355c3742be5 Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Sun, 25 Jul 2021 19:54:28 +0800 Subject: [PATCH 08/16] Export metrics data with open telemetry and use Prometheus for visual observation --- .../runtime/metrics/openTelemetry/prometheus.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/prometheus.yml b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/prometheus.yml index 9bedde5b93..ea48021cc2 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/prometheus.yml +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/prometheus.yml @@ -1,3 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# global: scrape_interval: 15s scrape_timeout: 10s From a7b4c08fc7862bbf96205f879cb2d5d9e6845bd2 Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Sun, 25 Jul 2021 23:11:08 +0800 Subject: [PATCH 09/16] Improper modification --- .../metrics/openTelemetry/OpenTelemetryExporter.java | 6 +++--- .../eventmesh/runtime/metrics/openTelemetry/README.md | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java index 21eba556c5..1012ee2779 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java @@ -44,7 +44,7 @@ public OpenTelemetryExporter(SummaryMetrics summaryMetrics) { public void start(){ //maxHTTPTPS meter - .doubleValueObserverBuilder("max.HTTP.TPS") + .doubleValueObserverBuilder("eventmesh.http.request.tps.elapsed.max") .setDescription("max TPS of HTTP") .setUnit("HTTP") .setUpdater(result -> result.observe(summaryMetrics.maxHTTPTPS(),Labels.empty())) @@ -52,7 +52,7 @@ public void start(){ //maxHTTPCost meter - .longValueObserverBuilder("max.HTTPCost") + .longValueObserverBuilder("eventmesh.http.request.elapsed.max") .setDescription("max cost of HTTP") .setUnit("HTTP") .setUpdater(result -> result.observe(summaryMetrics.maxHTTPCost(), Labels.empty())) @@ -60,7 +60,7 @@ public void start(){ //avgHTTPCost meter - .doubleValueObserverBuilder("avg.HTTPCost") + .doubleValueObserverBuilder("eventmesh.http.request.elapsed.avg") .setDescription("avg cost of HTTP") .setUnit("HTTP") .setUpdater(result -> result.observe(summaryMetrics.avgHTTPCost(), Labels.empty())) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md index a5e77c339e..dd76ce2443 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md @@ -29,9 +29,9 @@ search the key word: *max_HTTP_TPS* -*max_HTTPCost* +*eventmesh_http_request_elapsed_max* -*avg_HTTPCost* +*eventmesh_http_request_elapsed_avg* ## special explanation Prometheus runs on port 9090,Open telemetry exports data to port 19090,Prometheus will collect data from port 19090 From 33cf1b78ffff1a4fa660829f631770c0a2f1cac4 Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Sun, 25 Jul 2021 23:12:44 +0800 Subject: [PATCH 10/16] Improper modification --- .../apache/eventmesh/runtime/metrics/openTelemetry/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md index dd76ce2443..890104dcbf 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md @@ -27,7 +27,7 @@ http://localhost:9090/graph?g0.expr=max_HTTPCost&g0.tab=1&g0.stacked=0&g0.show_e search the key word: -*max_HTTP_TPS* +*eventmesh_http_request_tps_elapsed_max* *eventmesh_http_request_elapsed_max* From da8e552fe19cafce1476021200914d1a6a12f963 Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Tue, 3 Aug 2021 20:09:59 +0800 Subject: [PATCH 11/16] improve --- .../eventmesh-metrics-export-design.md | 47 ++++ eventmesh-runtime/conf/eventmesh.properties | 5 +- .../openTelemetry => conf}/prometheus.yml | 0 .../EventMeshHTTPConfiguration.java | 9 + .../metrics/http/HTTPMetricsServer.java | 2 +- .../openTelemetry/OpenTelemetryExporter.java | 228 +++++++++++++++++- .../OpenTelemetryExporterConfiguration.java | 10 +- .../runtime/metrics/openTelemetry/README.md | 41 ---- 8 files changed, 291 insertions(+), 51 deletions(-) create mode 100644 docs/en/features/eventmesh-metrics-export-design.md rename eventmesh-runtime/{src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry => conf}/prometheus.yml (100%) delete mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md diff --git a/docs/en/features/eventmesh-metrics-export-design.md b/docs/en/features/eventmesh-metrics-export-design.md new file mode 100644 index 0000000000..a8f5958f8a --- /dev/null +++ b/docs/en/features/eventmesh-metrics-export-design.md @@ -0,0 +1,47 @@ +# EventMesh Metrics (OpenTelemetry+Prometheus) + +## Introduction + +[EventMesh(incubating)](https://github.com/apache/incubator-eventmesh) is a dynamic cloud-native eventing infrastructure. + +## An overview of OpenTelemetry + +OpenTelemetry is a collection of tools, APIs, and SDKs. You can use it to instrument, generate, collect, and export telemetry data (metrics, logs, and traces) for analysis in order to understand your software's performance and behavior. + +## An overview of Prometheus + +Power your metrics and alerting with a leading open-source monitoring solution. + +- Dimensional data +- Powerful queries +- Great visualization +- Efficient storage +- Simple operation +- Precise alerting +- Many client libraries +- Many integrations + +## Requirements + +### Functional Requirements + +| Requirement ID | Requirement Description | Comments | +| :------------- | ------------------------------------------------------------ | ------------- | +| F-1 | EventMesh users should be able to observe HTTP metrics from Prometheus | Functionality | +| F-2 | EventMesh users should be able to observe TCP metrics from Prometheus | Functionality | + +## Design Details + +use the meter instrument provided by OpenTelemetry to observe the metrics exist in EventMesh then export to Prometheus. + +1、Initialize a meter instrument + +2、set the Prometheus server + +3、different metrics observer built + +## Appendix + +#### References + +https://github.com/open-telemetry/docs-cn/blob/main/QUICKSTART.md#%E5%88%9B%E5%BB%BA%E5%9F%BA%E7%A1%80Span \ No newline at end of file diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 45fc193b4f..52e1ad27db 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -54,4 +54,7 @@ eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000 #eventMesh.server.defibus.client.comsumeTimeoutInMin=5 #connector plugin -eventMesh.connector.plugin.type=rocketmq \ No newline at end of file +eventMesh.connector.plugin.type=rocketmq + +#prometheusPort +eventMesh.metrics.prometheus.port=19090 \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/prometheus.yml b/eventmesh-runtime/conf/prometheus.yml similarity index 100% rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/prometheus.yml rename to eventmesh-runtime/conf/prometheus.yml diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java index c5ec9cd30d..dc5adefe05 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java @@ -68,6 +68,8 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration { public boolean eventMeshServerUseTls = false; + public int eventMeshPrometheusPort = 19090; + public EventMeshHTTPConfiguration(ConfigurationWrapper configurationWrapper) { super(configurationWrapper); } @@ -180,6 +182,11 @@ public void init() { if (StringUtils.isNotEmpty(eventMeshServerUseTlsStr)) { eventMeshServerUseTls = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerUseTlsStr)); } + + String eventMeshPrometheusPortStr = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_METRICS_PROMETHEUS_PORT); + if (StringUtils.isNotEmpty(eventMeshPrometheusPortStr)) { + eventMeshPrometheusPort = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshPrometheusPortStr)); + } } } @@ -226,5 +233,7 @@ static class ConfKeys { public static String KEY_EVENTMESH_CONSUMER_ENABLED = "eventMesh.server.consumer.enabled"; public static String KEY_EVENTMESH_HTTPS_ENABLED = "eventMesh.server.useTls.enabled"; + + public static String KEY_EVENTMESH_METRICS_PROMETHEUS_PORT = "eventMesh.metrics.prometheus.port"; } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java index f9d38e9ea2..69090fbcf5 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java @@ -60,7 +60,7 @@ public void init() throws Exception { groupMetrics = new GroupMetrics(this.eventMeshHTTPServer, this.metricRegistry); healthMetrics = new HealthMetrics(this.eventMeshHTTPServer, this.metricRegistry); - openTelemetryExporter = new OpenTelemetryExporter(summaryMetrics); + openTelemetryExporter = new OpenTelemetryExporter(summaryMetrics,this.eventMeshHTTPServer.getEventMeshHttpConfiguration()); logger.info("HTTPMetricsServer inited......"); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java index 1012ee2779..7bbe2edcd0 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java @@ -20,11 +20,9 @@ import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.api.metrics.common.Labels; +import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; import org.apache.eventmesh.runtime.metrics.http.SummaryMetrics; -/** - * test - */ public class OpenTelemetryExporter { OpenTelemetryExporterConfiguration configuration = new OpenTelemetryExporterConfiguration(); @@ -32,11 +30,11 @@ public class OpenTelemetryExporter { private Meter meter; - public OpenTelemetryExporter(SummaryMetrics summaryMetrics) { + public OpenTelemetryExporter(SummaryMetrics summaryMetrics, EventMeshHTTPConfiguration eventMeshHTTPConfiguration) { this.summaryMetrics = summaryMetrics; // it is important to initialize the OpenTelemetry SDK as early as possible in your process. - MeterProvider meterProvider = configuration.initializeOpenTelemetry(); + MeterProvider meterProvider = configuration.initializeOpenTelemetry(eventMeshHTTPConfiguration); meter = meterProvider.get("OpenTelemetryExporter", "0.13.1"); } @@ -50,6 +48,14 @@ public void start(){ .setUpdater(result -> result.observe(summaryMetrics.maxHTTPTPS(),Labels.empty())) .build(); + //avgHTTPTPS + meter + .doubleValueObserverBuilder("eventmesh.http.request.tps.elapsed.avg") + .setDescription("avg TPS of HTTP") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.avgHTTPTPS(),Labels.empty())) + .build(); + //maxHTTPCost meter .longValueObserverBuilder("eventmesh.http.request.elapsed.max") @@ -65,6 +71,218 @@ public void start(){ .setUnit("HTTP") .setUpdater(result -> result.observe(summaryMetrics.avgHTTPCost(), Labels.empty())) .build(); + + //avgHTTPBodyDecodeCost + meter + .doubleValueObserverBuilder("eventmesh.http.body.decode.cost.elapsed.avg") + .setDescription("avg body decode cost of HTTP") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.avgHTTPBodyDecodeCost(), Labels.empty())) + .build(); + + //httpDiscard + meter + .longValueObserverBuilder("eventmesh.http.request.discard.elapsed") + .setDescription("http request discard") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getHttpDiscard(), Labels.empty())) + .build(); + + //maxBatchSendMsgTPS + meter + .doubleValueObserverBuilder("eventmesh.batch.send.message.tps.elapsed.max") + .setDescription("max of batch send message tps") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.maxSendBatchMsgTPS(), Labels.empty())) + .build(); + + //avgBatchSendMsgTPS + meter + .doubleValueObserverBuilder("eventmesh.batch.send.message.tps.elapsed.avg") + .setDescription("avg of batch send message tps") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.avgSendBatchMsgTPS(), Labels.empty())) + .build(); + + //sum + meter + .doubleValueObserverBuilder("eventmesh.batch.send.message.elapsed.sum") + .setDescription("sum of batch send message number") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgNumSum(), Labels.empty())) + .build(); + + //sumFail + meter + .doubleValueObserverBuilder("eventmesh.batch.send.message.fail.elapsed.sum") + .setDescription("sum of batch send message fail message number") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgFailNumSum(), Labels.empty())) + .build(); + + //sumFailRate + meter + .doubleValueObserverBuilder("eventmesh.batch.send.message.fail.rate.elapsed") + .setDescription("send batch message fail rate") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgFailRate(), Labels.empty())) + .build(); + + //discard + meter + .doubleValueObserverBuilder("eventmesh.batch.send.message.discard.elapsed.sum") + .setDescription("sum of send batch message discard number") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgDiscardNumSum(), Labels.empty())) + .build(); + + //maxSendMsgTPS + meter + .doubleValueObserverBuilder("eventmesh.send.message.tps.elapsed.max") + .setDescription("max of send message tps") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.maxSendMsgTPS(), Labels.empty())) + .build(); + + //avgSendMsgTPS + meter + .doubleValueObserverBuilder("eventmesh.send.message.tps.elapsed.avg") + .setDescription("avg of send message tps") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.avgSendMsgTPS(), Labels.empty())) + .build(); + + //sum + meter + .doubleValueObserverBuilder("eventmesh.send.message.elapsed.sum") + .setDescription("sum of send message number") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getSendMsgNumSum(), Labels.empty())) + .build(); + + //sumFail + meter + .doubleValueObserverBuilder("eventmesh.send.message.fail.elapsed.sum") + .setDescription("sum of send message fail number") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getSendMsgFailNumSum(), Labels.empty())) + .build(); + + //sumFailRate + meter + .doubleValueObserverBuilder("eventmesh.send.message.fail.rate.elapsed") + .setDescription("send message fail rate") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getSendMsgFailRate(), Labels.empty())) + .build(); + + //replyMsg + meter + .doubleValueObserverBuilder("eventmesh.reply.message.elapsed.sum") + .setDescription("sum of reply message number") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getReplyMsgNumSum(), Labels.empty())) + .build(); + + //replyFail + meter + .doubleValueObserverBuilder("eventmesh.reply.message.fail.elapsed.sum") + .setDescription("sum of reply message fail number") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getReplyMsgFailNumSum(), Labels.empty())) + .build(); + + //maxPushMsgTPS + meter + .doubleValueObserverBuilder("eventmesh.push.message.tps.elapsed.max") + .setDescription("max of push message tps") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.maxPushMsgTPS(), Labels.empty())) + .build(); + + //avgPushMsgTPS + meter + .doubleValueObserverBuilder("eventmesh.push.message.tps.elapsed.avg") + .setDescription("avg of push message tps") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.avgPushMsgTPS(), Labels.empty())) + .build(); + + //sum + meter + .doubleValueObserverBuilder("eventmesh.http.push.message.elapsed.sum") + .setDescription("sum of http push message number") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getHttpPushMsgNumSum(), Labels.empty())) + .build(); + + //sumFail + meter + .doubleValueObserverBuilder("eventmesh.http.push.message.fail.elapsed.sum") + .setDescription("sum of http push message fail number") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getHttpPushFailNumSum(), Labels.empty())) + .build(); + + //sumFailRate + meter + .doubleValueObserverBuilder("eventmesh.http.push.message.fail.rate.elapsed") + .setDescription("http push message fail rate") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.getHttpPushMsgFailRate(), Labels.empty())) + .build(); + + //maxClientLatency + meter + .doubleValueObserverBuilder("eventmesh.http.push,latency.elapsed.max") + .setDescription("max of http push latency") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.maxHTTPPushLatency(), Labels.empty())) + .build(); + + //avgClientLatency + meter + .doubleValueObserverBuilder("eventmesh.http.push,latency.elapsed.avg") + .setDescription("avg of http push latency") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.avgHTTPPushLatency(), Labels.empty())) + .build(); + + //batchMsgQ + + + //sendMsgQ + + + //pushMsgQ + + + //httpRetryQ + + + //batchAvgSend2MQCost + meter + .doubleValueObserverBuilder("eventmesh.batch.send.message.cost.elapsed.avg") + .setDescription("avg of batch send message cost") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.avgBatchSendMsgCost(), Labels.empty())) + .build(); + + //avgSend2MQCost + meter + .doubleValueObserverBuilder("eventmesh.send.message.cost.elapsed.avg") + .setDescription("avg of send message cost") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.avgSendMsgCost(), Labels.empty())) + .build(); + + //avgReply2MQCost + meter + .doubleValueObserverBuilder("eventmesh.reply.message.cost.elapsed.avg") + .setDescription("avg of reply message cost") + .setUnit("HTTP") + .setUpdater(result -> result.observe(summaryMetrics.avgReplyMsgCost(), Labels.empty())) + .build(); } public void shutdown(){ diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java index ff9653650b..292407581b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java @@ -21,15 +21,18 @@ import io.opentelemetry.exporter.prometheus.PrometheusCollector; import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.prometheus.client.exporter.HTTPServer; +import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; import java.io.IOException; //ues openTelemetry to export metrics data public class OpenTelemetryExporterConfiguration { - private HTTPServer server;//Prometheus server + private HTTPServer server;//Prometheus server - int prometheusPort = 19090;//the endpoint to export metrics + private EventMeshHTTPConfiguration eventMeshHTTPConfiguration; + + int prometheusPort = eventMeshHTTPConfiguration.eventMeshPrometheusPort;//the endpoint to export metrics /** * Initializes the Meter SDK and configures the prometheus collector with all default settings. @@ -37,7 +40,8 @@ public class OpenTelemetryExporterConfiguration { * * @return A MeterProvider for use in instrumentation. */ - public MeterProvider initializeOpenTelemetry() { + public MeterProvider initializeOpenTelemetry(EventMeshHTTPConfiguration eventMeshHTTPConfiguration) { + this.eventMeshHTTPConfiguration = eventMeshHTTPConfiguration; SdkMeterProvider meterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); PrometheusCollector.builder().setMetricProducer(meterProvider).buildAndRegister(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md deleted file mode 100644 index 890104dcbf..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/README.md +++ /dev/null @@ -1,41 +0,0 @@ -# Open Telemetry exporter - -we can use Prometheus UI to see the metrics exported by openTelemetry - -# How to run Prometheus - -download Prometheus from https://prometheus.io/download/ -remember to fix the [prometheus.yml](prometheus.yml) - ---- -or use docker -Start Prometheus instance with a configuration that sets up a HTTP collection job for ```127.0.0.1:19090``` - -See [prometheus.yml](prometheus.yml) - -```shell script -docker run --network="host" --rm -it \ - --name prometheus \ - -v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml \ - prom/prometheus - -``` - -you can run the quickstart and open the Prometheus UI: -http://localhost:9090/graph?g0.expr=max_HTTPCost&g0.tab=1&g0.stacked=0&g0.show_exemplars=0&g0.range_input=1h - - -search the key word: - -*eventmesh_http_request_tps_elapsed_max* - -*eventmesh_http_request_elapsed_max* - -*eventmesh_http_request_elapsed_avg* - -## special explanation -Prometheus runs on port 9090,Open telemetry exports data to port 19090,Prometheus will collect data from port 19090 - -the exporter is exporting the data in 'SummaryMetrics'(package org.apache.eventmesh.runtime.metrics.http;) - -The export mechanism I set is to export every 3 seconds, because QuickStart only has httpcost at a short time. If the interval is set too long, it will always be 0. In practical application, I think it should be set to more than 30 seconds From 45a063ddae2415b2b9cd1d24fc59824604ec0fba Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Tue, 3 Aug 2021 20:36:59 +0800 Subject: [PATCH 12/16] improve --- .../runtime/metrics/openTelemetry/OpenTelemetryExporter.java | 4 ++-- .../openTelemetry/OpenTelemetryExporterConfiguration.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java index 7bbe2edcd0..b634f9cb15 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java @@ -234,7 +234,7 @@ public void start(){ //maxClientLatency meter - .doubleValueObserverBuilder("eventmesh.http.push,latency.elapsed.max") + .doubleValueObserverBuilder("eventmesh.http.push.latency.elapsed.max") .setDescription("max of http push latency") .setUnit("HTTP") .setUpdater(result -> result.observe(summaryMetrics.maxHTTPPushLatency(), Labels.empty())) @@ -242,7 +242,7 @@ public void start(){ //avgClientLatency meter - .doubleValueObserverBuilder("eventmesh.http.push,latency.elapsed.avg") + .doubleValueObserverBuilder("eventmesh.http.push.latency.elapsed.avg") .setDescription("avg of http push latency") .setUnit("HTTP") .setUpdater(result -> result.observe(summaryMetrics.avgHTTPPushLatency(), Labels.empty())) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java index 292407581b..ff378476c7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java @@ -32,7 +32,7 @@ public class OpenTelemetryExporterConfiguration { private EventMeshHTTPConfiguration eventMeshHTTPConfiguration; - int prometheusPort = eventMeshHTTPConfiguration.eventMeshPrometheusPort;//the endpoint to export metrics + int prometheusPort;//the endpoint to export metrics /** * Initializes the Meter SDK and configures the prometheus collector with all default settings. @@ -42,6 +42,7 @@ public class OpenTelemetryExporterConfiguration { */ public MeterProvider initializeOpenTelemetry(EventMeshHTTPConfiguration eventMeshHTTPConfiguration) { this.eventMeshHTTPConfiguration = eventMeshHTTPConfiguration; + prometheusPort = eventMeshHTTPConfiguration.eventMeshPrometheusPort; SdkMeterProvider meterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); PrometheusCollector.builder().setMetricProducer(meterProvider).buildAndRegister(); From d7b6ae2e03f2c9e4881a9894fc36500c96a76c48 Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Wed, 4 Aug 2021 19:45:55 +0800 Subject: [PATCH 13/16] improve --- .../metrics/http/HTTPMetricsServer.java | 17 ++++++++- .../openTelemetry/OpenTelemetryExporter.java | 36 +++++++++++++++---- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java index 69090fbcf5..e2e41a333c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java @@ -60,7 +60,7 @@ public void init() throws Exception { groupMetrics = new GroupMetrics(this.eventMeshHTTPServer, this.metricRegistry); healthMetrics = new HealthMetrics(this.eventMeshHTTPServer, this.metricRegistry); - openTelemetryExporter = new OpenTelemetryExporter(summaryMetrics,this.eventMeshHTTPServer.getEventMeshHttpConfiguration()); + openTelemetryExporter = new OpenTelemetryExporter(this,this.eventMeshHTTPServer.getEventMeshHttpConfiguration()); logger.info("HTTPMetricsServer inited......"); } @@ -170,6 +170,21 @@ private void logPrintServerMetrics() { summaryMetrics.send2MQStatInfoClear(); } + public int getBatchMsgQ(){ + return eventMeshHTTPServer.getBatchMsgExecutor().getQueue().size(); + } + + public int getSendMsgQ(){ + return eventMeshHTTPServer.getSendMsgExecutor().getQueue().size(); + } + + public int getPushMsgQ(){ + return eventMeshHTTPServer.getPushMsgExecutor().getQueue().size(); + } + + public int getHttpRetryQ(){ + return eventMeshHTTPServer.getHttpRetryer().size(); + } public HealthMetrics getHealthMetrics() { return healthMetrics; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java index b634f9cb15..39f235d791 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java @@ -21,6 +21,7 @@ import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.api.metrics.common.Labels; import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; +import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer; import org.apache.eventmesh.runtime.metrics.http.SummaryMetrics; public class OpenTelemetryExporter { @@ -28,10 +29,13 @@ public class OpenTelemetryExporter { private SummaryMetrics summaryMetrics; + private HTTPMetricsServer httpMetricsServer; + private Meter meter; - public OpenTelemetryExporter(SummaryMetrics summaryMetrics, EventMeshHTTPConfiguration eventMeshHTTPConfiguration) { - this.summaryMetrics = summaryMetrics; + public OpenTelemetryExporter(HTTPMetricsServer httpMetricsServer, EventMeshHTTPConfiguration eventMeshHTTPConfiguration) { + this.httpMetricsServer = httpMetricsServer; + summaryMetrics = httpMetricsServer.summaryMetrics; // it is important to initialize the OpenTelemetry SDK as early as possible in your process. MeterProvider meterProvider = configuration.initializeOpenTelemetry(eventMeshHTTPConfiguration); @@ -249,16 +253,36 @@ public void start(){ .build(); //batchMsgQ - + meter + .longValueObserverBuilder("eventmesh.batch.message.queue.elapsed.size") + .setDescription("size of batch message queue") + .setUnit("HTTP") + .setUpdater(result -> result.observe(httpMetricsServer.getBatchMsgQ(), Labels.empty())) + .build(); //sendMsgQ - + meter + .longValueObserverBuilder("eventmesh.send.message.queue.elapsed.size") + .setDescription("size of send message queue") + .setUnit("HTTP") + .setUpdater(result -> result.observe(httpMetricsServer.getSendMsgQ(), Labels.empty())) + .build(); //pushMsgQ - + meter + .longValueObserverBuilder("eventmesh.push.message.queue.elapsed.size") + .setDescription("size of push message queue") + .setUnit("HTTP") + .setUpdater(result -> result.observe(httpMetricsServer.getPushMsgQ(), Labels.empty())) + .build(); //httpRetryQ - + meter + .longValueObserverBuilder("eventmesh.http.retry.queue.elapsed.size") + .setDescription("size of http retry queue") + .setUnit("HTTP") + .setUpdater(result -> result.observe(httpMetricsServer.getHttpRetryQ(), Labels.empty())) + .build(); //batchAvgSend2MQCost meter From 0ce93162526d7fd706a8eb2b40865753aaaa0718 Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Thu, 5 Aug 2021 21:17:05 +0800 Subject: [PATCH 14/16] tcp metrics export --- .../common/config/CommonConfiguration.java | 8 ++ .../EventMeshHTTPConfiguration.java | 8 -- .../metrics/http/HTTPMetricsServer.java | 10 +- .../OpenTelemetryExporterConfiguration.java | 20 ++-- ... => OpenTelemetryHTTPMetricsExporter.java} | 10 +- .../OpenTelemetryTCPMetricsExporter.java | 113 ++++++++++++++++++ .../metrics/tcp/EventMeshTcpMonitor.java | 33 +++++ 7 files changed, 175 insertions(+), 27 deletions(-) rename eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/{OpenTelemetryExporter.java => OpenTelemetryHTTPMetricsExporter.java} (98%) create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java index 321ebe0dd3..42a5db1198 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java @@ -29,6 +29,7 @@ public class CommonConfiguration { public String eventMeshName = ""; public String sysID = "5477"; public String eventMeshConnectorPluginType = "rocketmq"; + public int eventMeshPrometheusPort = 19090; public String namesrvAddr = ""; public String clientUserName = "username"; @@ -75,6 +76,11 @@ public void init() { Preconditions.checkState(StringUtils.isNotEmpty(eventMeshIDCStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_IDC)); eventMeshIDC = StringUtils.deleteWhitespace(eventMeshIDCStr); + String eventMeshPrometheusPortStr = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_METRICS_PROMETHEUS_PORT); + if (StringUtils.isNotEmpty(eventMeshPrometheusPortStr)) { + eventMeshPrometheusPort = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshPrometheusPortStr)); + } + eventMeshServerIp = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP); if (StringUtils.isBlank(eventMeshServerIp)) { eventMeshServerIp = IPUtil.getLocalAddress(); @@ -103,5 +109,7 @@ static class ConfKeys { public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills"; public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type"; + + public static String KEY_EVENTMESH_METRICS_PROMETHEUS_PORT = "eventMesh.metrics.prometheus.port"; } } \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java index dc5adefe05..ffa6cbd9b2 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java @@ -68,8 +68,6 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration { public boolean eventMeshServerUseTls = false; - public int eventMeshPrometheusPort = 19090; - public EventMeshHTTPConfiguration(ConfigurationWrapper configurationWrapper) { super(configurationWrapper); } @@ -182,11 +180,6 @@ public void init() { if (StringUtils.isNotEmpty(eventMeshServerUseTlsStr)) { eventMeshServerUseTls = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerUseTlsStr)); } - - String eventMeshPrometheusPortStr = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_METRICS_PROMETHEUS_PORT); - if (StringUtils.isNotEmpty(eventMeshPrometheusPortStr)) { - eventMeshPrometheusPort = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshPrometheusPortStr)); - } } } @@ -234,6 +227,5 @@ static class ConfKeys { public static String KEY_EVENTMESH_HTTPS_ENABLED = "eventMesh.server.useTls.enabled"; - public static String KEY_EVENTMESH_METRICS_PROMETHEUS_PORT = "eventMesh.metrics.prometheus.port"; } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java index e2e41a333c..a71e42f915 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java @@ -26,7 +26,7 @@ import com.codahale.metrics.MetricRegistry; import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; -import org.apache.eventmesh.runtime.metrics.openTelemetry.OpenTelemetryExporter; +import org.apache.eventmesh.runtime.metrics.openTelemetry.OpenTelemetryHTTPMetricsExporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +44,7 @@ public class HTTPMetricsServer { public GroupMetrics groupMetrics; - public OpenTelemetryExporter openTelemetryExporter; + public OpenTelemetryHTTPMetricsExporter openTelemetryHTTPMetricsExporter; private Logger httpLogger = LoggerFactory.getLogger("httpMonitor"); @@ -60,13 +60,13 @@ public void init() throws Exception { groupMetrics = new GroupMetrics(this.eventMeshHTTPServer, this.metricRegistry); healthMetrics = new HealthMetrics(this.eventMeshHTTPServer, this.metricRegistry); - openTelemetryExporter = new OpenTelemetryExporter(this,this.eventMeshHTTPServer.getEventMeshHttpConfiguration()); + openTelemetryHTTPMetricsExporter = new OpenTelemetryHTTPMetricsExporter(this,this.eventMeshHTTPServer.getEventMeshHttpConfiguration()); logger.info("HTTPMetricsServer inited......"); } public void start() throws Exception { - openTelemetryExporter.start(); + openTelemetryHTTPMetricsExporter.start(); metricsSchedule.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -97,7 +97,7 @@ public void run() { public void shutdown() throws Exception { metricsSchedule.shutdown(); - openTelemetryExporter.shutdown(); + openTelemetryHTTPMetricsExporter.shutdown(); logger.info("HTTPMetricsServer shutdown......"); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java index ff378476c7..4afd6442bf 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java @@ -21,6 +21,7 @@ import io.opentelemetry.exporter.prometheus.PrometheusCollector; import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.prometheus.client.exporter.HTTPServer; +import org.apache.eventmesh.common.config.CommonConfiguration; import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; import java.io.IOException; @@ -30,23 +31,20 @@ public class OpenTelemetryExporterConfiguration { private HTTPServer server;//Prometheus server - private EventMeshHTTPConfiguration eventMeshHTTPConfiguration; - int prometheusPort;//the endpoint to export metrics + static MeterProvider meterProvider; /** * Initializes the Meter SDK and configures the prometheus collector with all default settings. * - * * @return A MeterProvider for use in instrumentation. */ - public MeterProvider initializeOpenTelemetry(EventMeshHTTPConfiguration eventMeshHTTPConfiguration) { - this.eventMeshHTTPConfiguration = eventMeshHTTPConfiguration; - prometheusPort = eventMeshHTTPConfiguration.eventMeshPrometheusPort; - SdkMeterProvider meterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); - - PrometheusCollector.builder().setMetricProducer(meterProvider).buildAndRegister(); + public MeterProvider initializeOpenTelemetry(CommonConfiguration configuration) { + prometheusPort = configuration.eventMeshPrometheusPort; + SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); + PrometheusCollector.builder().setMetricProducer(sdkMeterProvider).buildAndRegister(); + this.meterProvider = sdkMeterProvider; try { server = new HTTPServer(prometheusPort,true);//Use the daemon thread to start an HTTP server to serve the default Prometheus registry. } catch (IOException e) { @@ -56,6 +54,10 @@ public MeterProvider initializeOpenTelemetry(EventMeshHTTPConfiguration eventMes return meterProvider; } + public static MeterProvider getMeterProvider(){//for tcp to get the initialized =meterProvider + return meterProvider; + } + public void shutdownPrometheusEndpoint() { server.stop(); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java similarity index 98% rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java rename to eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java index 39f235d791..97e0d3b3d8 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporter.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java @@ -24,23 +24,23 @@ import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer; import org.apache.eventmesh.runtime.metrics.http.SummaryMetrics; -public class OpenTelemetryExporter { +public class OpenTelemetryHTTPMetricsExporter { OpenTelemetryExporterConfiguration configuration = new OpenTelemetryExporterConfiguration(); + private Meter meter; + private SummaryMetrics summaryMetrics; private HTTPMetricsServer httpMetricsServer; - private Meter meter; - - public OpenTelemetryExporter(HTTPMetricsServer httpMetricsServer, EventMeshHTTPConfiguration eventMeshHTTPConfiguration) { + public OpenTelemetryHTTPMetricsExporter(HTTPMetricsServer httpMetricsServer, EventMeshHTTPConfiguration eventMeshHTTPConfiguration) { this.httpMetricsServer = httpMetricsServer; summaryMetrics = httpMetricsServer.summaryMetrics; // it is important to initialize the OpenTelemetry SDK as early as possible in your process. MeterProvider meterProvider = configuration.initializeOpenTelemetry(eventMeshHTTPConfiguration); - meter = meterProvider.get("OpenTelemetryExporter", "0.13.1"); + meter = meterProvider.get("OpenTelemetryHTTPExporter", "0.13.1"); } public void start(){ diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java new file mode 100644 index 0000000000..82adb2cd00 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.metrics.openTelemetry; + +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.api.metrics.common.Labels; +import org.apache.eventmesh.common.config.CommonConfiguration; +import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcpConnectionHandler; +import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor; + +public class OpenTelemetryTCPMetricsExporter { + OpenTelemetryExporterConfiguration configuration = new OpenTelemetryExporterConfiguration(); + + private Meter meter; + + private EventMeshTcpMonitor eventMeshTcpMonitor; + + public OpenTelemetryTCPMetricsExporter(EventMeshTcpMonitor eventMeshTcpMonitor){ + this.eventMeshTcpMonitor = eventMeshTcpMonitor; + + // it is important to initialize the OpenTelemetry SDK as early as possible in your process. + MeterProvider meterProvider = OpenTelemetryExporterConfiguration.getMeterProvider(); + if (meterProvider != null){ + meter = meterProvider.get("OpenTelemetryTCPExporter", "0.13.1"); + } + } + + public void start(){ + if (meter==null){ + return; + } + //retryQueueSize + meter + .doubleValueObserverBuilder("eventmesh.tcp.retry.queue.size") + .setDescription("get size of retry queue") + .setUnit("TCP") + .setUpdater(result -> result.observe(eventMeshTcpMonitor.getEventMeshTCPServer().getEventMeshTcpRetryer().getRetrySize(), Labels.empty())) + .build(); + + //client2eventMeshTPS + meter + .doubleValueObserverBuilder("eventmesh.tcp.client2.tps") + .setDescription("get tps of client to eventMesh") + .setUnit("TCP") + .setUpdater(result -> result.observe(eventMeshTcpMonitor.getClient2eventMeshTPS(), Labels.empty())) + .build(); + + //eventMesh2mqTPS + meter + .doubleValueObserverBuilder("eventmesh.tcp.2mq.tps") + .setDescription("get tps of eventMesh to mq") + .setUnit("TCP") + .setUpdater(result -> result.observe(eventMeshTcpMonitor.getEventMesh2mqTPS(), Labels.empty())) + .build(); + + //mq2eventMeshTPS + meter + .doubleValueObserverBuilder("eventmesh.tcp.mq2.tps") + .setDescription("get tps of mq to eventMesh") + .setUnit("TCP") + .setUpdater(result -> result.observe(eventMeshTcpMonitor.getMq2eventMeshTPS(), Labels.empty())) + .build(); + + //eventMesh2clientTPS + meter + .doubleValueObserverBuilder("eventmesh.tcp.2client.tps") + .setDescription("get tps of eventMesh to client") + .setUnit("TCP") + .setUpdater(result -> result.observe(eventMeshTcpMonitor.getEventMesh2clientTPS(), Labels.empty())) + .build(); + + //allTPS + meter + .doubleValueObserverBuilder("eventmesh.tcp.all.tps") + .setDescription("get all TPS") + .setUnit("TCP") + .setUpdater(result -> result.observe(eventMeshTcpMonitor.getAllTPS(), Labels.empty())) + .build(); + + //EventMeshTcpConnectionHandler.connections + meter + .doubleValueObserverBuilder("eventmesh.tcp.connection.handler.connections") + .setDescription("EventMeshTcpConnectionHandler.connections") + .setUnit("TCP") + .setUpdater(result -> result.observe(EventMeshTcpConnectionHandler.connections.doubleValue(), Labels.empty())) + .build(); + + //subTopicNum + meter + .doubleValueObserverBuilder("eventmesh.tcp.sub.topic.num") + .setDescription("get sub topic num") + .setUnit("TCP") + .setUpdater(result -> result.observe(eventMeshTcpMonitor.getSubTopicNum(), Labels.empty())) + .build(); + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java index 7120218517..0de1f2efd9 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java @@ -32,6 +32,7 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcpConnectionHandler; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; import org.apache.eventmesh.runtime.metrics.MonitorMetricConstants; +import org.apache.eventmesh.runtime.metrics.openTelemetry.OpenTelemetryTCPMetricsExporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,10 @@ public class EventMeshTcpMonitor { private EventMeshTCPServer eventMeshTCPServer; + public EventMeshTCPServer getEventMeshTCPServer() { + return eventMeshTCPServer; + } + private final Logger tcpLogger = LoggerFactory.getLogger("tcpMonitor"); private final Logger appLogger = LoggerFactory.getLogger("appMonitor"); @@ -63,6 +68,8 @@ public class EventMeshTcpMonitor { private int allTPS; private int subTopicNum; + private OpenTelemetryTCPMetricsExporter metricsExporter; + public ScheduledFuture monitorTpsTask; public ScheduledFuture monitorThreadPoolTask; @@ -76,10 +83,12 @@ public void init() throws Exception { this.eventMesh2mqMsgNum = new AtomicInteger(0); this.mq2eventMeshMsgNum = new AtomicInteger(0); this.eventMesh2clientMsgNum = new AtomicInteger(0); + this.metricsExporter = new OpenTelemetryTCPMetricsExporter(this); logger.info("EventMeshTcpMonitor inited......"); } public void start() throws Exception { + metricsExporter.start(); monitorTpsTask = eventMeshTCPServer.getScheduler().scheduleAtFixedRate((new Runnable() { @Override public void run() { @@ -166,4 +175,28 @@ public AtomicInteger getMq2EventMeshMsgNum() { public AtomicInteger getEventMesh2clientMsgNum() { return eventMesh2clientMsgNum; } + + public int getClient2eventMeshTPS() { + return client2eventMeshTPS; + } + + public int getEventMesh2clientTPS() { + return eventMesh2clientTPS; + } + + public int getEventMesh2mqTPS() { + return eventMesh2mqTPS; + } + + public int getMq2eventMeshTPS() { + return mq2eventMeshTPS; + } + + public int getAllTPS() { + return allTPS; + } + + public int getSubTopicNum() { + return subTopicNum; + } } From 5ca22bd19832e401d3c8fb52e6efa6651170ebe4 Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Sun, 8 Aug 2021 14:29:43 +0800 Subject: [PATCH 15/16] improve --- .../OpenTelemetryExporterConfiguration.java | 14 +++++++++----- .../OpenTelemetryHTTPMetricsExporter.java | 4 +++- .../OpenTelemetryTCPMetricsExporter.java | 11 ++++++----- .../runtime/metrics/tcp/EventMeshTcpMonitor.java | 3 ++- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java index 4afd6442bf..40752eab99 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java @@ -29,9 +29,9 @@ //ues openTelemetry to export metrics data public class OpenTelemetryExporterConfiguration { - private HTTPServer server;//Prometheus server + private static HTTPServer server;//Prometheus server - int prometheusPort;//the endpoint to export metrics + static int prometheusPort;//the endpoint to export metrics static MeterProvider meterProvider; /** @@ -40,9 +40,12 @@ public class OpenTelemetryExporterConfiguration { * @return A MeterProvider for use in instrumentation. */ public MeterProvider initializeOpenTelemetry(CommonConfiguration configuration) { + if (server!=null){//the sever already start + return meterProvider; + } + prometheusPort = configuration.eventMeshPrometheusPort; SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); - PrometheusCollector.builder().setMetricProducer(sdkMeterProvider).buildAndRegister(); this.meterProvider = sdkMeterProvider; try { @@ -50,15 +53,16 @@ public MeterProvider initializeOpenTelemetry(CommonConfiguration configuration) } catch (IOException e) { e.printStackTrace(); } - return meterProvider; } - public static MeterProvider getMeterProvider(){//for tcp to get the initialized =meterProvider + public static MeterProvider getMeterProvider(){//for tcp or http to get the initialized meterProvider return meterProvider; } public void shutdownPrometheusEndpoint() { + if (server==null) + return; server.stop(); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java index 97e0d3b3d8..ff2a31e4a0 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java @@ -39,11 +39,13 @@ public OpenTelemetryHTTPMetricsExporter(HTTPMetricsServer httpMetricsServer, Eve // it is important to initialize the OpenTelemetry SDK as early as possible in your process. MeterProvider meterProvider = configuration.initializeOpenTelemetry(eventMeshHTTPConfiguration); - meter = meterProvider.get("OpenTelemetryHTTPExporter", "0.13.1"); } public void start(){ + if (meter==null){ + return; + } //maxHTTPTPS meter .doubleValueObserverBuilder("eventmesh.http.request.tps.elapsed.max") diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java index 82adb2cd00..b49cef9118 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java @@ -32,14 +32,12 @@ public class OpenTelemetryTCPMetricsExporter { private EventMeshTcpMonitor eventMeshTcpMonitor; - public OpenTelemetryTCPMetricsExporter(EventMeshTcpMonitor eventMeshTcpMonitor){ + public OpenTelemetryTCPMetricsExporter(EventMeshTcpMonitor eventMeshTcpMonitor , EventMeshTCPConfiguration eventMeshTCPConfiguration){ this.eventMeshTcpMonitor = eventMeshTcpMonitor; // it is important to initialize the OpenTelemetry SDK as early as possible in your process. - MeterProvider meterProvider = OpenTelemetryExporterConfiguration.getMeterProvider(); - if (meterProvider != null){ - meter = meterProvider.get("OpenTelemetryTCPExporter", "0.13.1"); - } + MeterProvider meterProvider = configuration.initializeOpenTelemetry(eventMeshTCPConfiguration); + meter = meterProvider.get("OpenTelemetryTCPExporter", "0.13.1"); } public void start(){ @@ -110,4 +108,7 @@ public void start(){ .setUpdater(result -> result.observe(eventMeshTcpMonitor.getSubTopicNum(), Labels.empty())) .build(); } + public void shutdown(){ + configuration.shutdownPrometheusEndpoint(); + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java index 0de1f2efd9..05a70b8e9a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java @@ -83,7 +83,7 @@ public void init() throws Exception { this.eventMesh2mqMsgNum = new AtomicInteger(0); this.mq2eventMeshMsgNum = new AtomicInteger(0); this.eventMesh2clientMsgNum = new AtomicInteger(0); - this.metricsExporter = new OpenTelemetryTCPMetricsExporter(this); + this.metricsExporter = new OpenTelemetryTCPMetricsExporter(this,eventMeshTCPServer.getEventMeshTCPConfiguration()); logger.info("EventMeshTcpMonitor inited......"); } @@ -157,6 +157,7 @@ public void run() { public void shutdown() throws Exception { monitorTpsTask.cancel(true); monitorThreadPoolTask.cancel(true); + metricsExporter.shutdown(); logger.info("EventMeshTcpMonitor shutdown......"); } From e29832b9bbfe2f3e85263b6f379d6da0ee963623 Mon Sep 17 00:00:00 2001 From: Roc <872364519@qq.com> Date: Tue, 10 Aug 2021 17:51:49 +0800 Subject: [PATCH 16/16] improve --- eventmesh-runtime/build.gradle | 1 - eventmesh-runtime/conf/eventmesh.properties | 2 -- 2 files changed, 3 deletions(-) diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index fb7a093127..a74514a3a4 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -36,7 +36,6 @@ List open_message = [ "io.openmessaging:openmessaging-api:2.2.1-pubsub" ] - dependencies { implementation metrics, open_telemetry, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api") testImplementation metrics, open_telemetry,open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api") diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 52e1ad27db..486fb71097 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -52,9 +52,7 @@ eventMesh.server.registry.registerIntervalInMills=10000 eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000 #auto-ack #eventMesh.server.defibus.client.comsumeTimeoutInMin=5 - #connector plugin eventMesh.connector.plugin.type=rocketmq - #prometheusPort eventMesh.metrics.prometheus.port=19090 \ No newline at end of file