From dff703139a9e54d81842d52e762d03f0355dd4a1 Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Wed, 22 Jun 2022 20:01:44 +0800 Subject: [PATCH 1/3] add trace buried point for EventmeshTcpServer in eventmesh-runtime --- .../runtime/boot/EventMeshServer.java | 50 ++-- .../runtime/constants/EventMeshConstants.java | 11 + .../client/EventMeshTcpMessageDispatcher.java | 69 ++++- .../tcp/client/group/ClientGroupWrapper.java | 241 ++++++++++-------- .../client/session/push/SessionPusher.java | 48 ++-- .../client/session/send/SessionSender.java | 75 ++++-- .../tcp/client/task/MessageTransferTask.java | 187 +++++++++----- .../runtime/trace/AttributeKeys.java | 13 +- .../apache/eventmesh/runtime/trace/Trace.java | 236 +++++++++++++++++ .../eventmesh/runtime/trace/TraceUtils.java | 127 +++++++++ .../util/EventMeshCloudEventWriter.java | 43 ++++ .../eventmesh/runtime/util/EventMeshUtil.java | 19 ++ 12 files changed, 886 insertions(+), 233 deletions(-) create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/Trace.java create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/TraceUtils.java create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshCloudEventWriter.java diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java index 695cd776ac..82c355af50 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java @@ -26,6 +26,7 @@ import org.apache.eventmesh.runtime.connector.ConnectorResource; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.registry.Registry; +import org.apache.eventmesh.runtime.trace.Trace; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,42 +45,44 @@ public class EventMeshServer { private EventMeshHTTPConfiguration eventMeshHttpConfiguration; - private EventMeshTCPConfiguration eventMeshTcpConfiguration; + private EventMeshTCPConfiguration eventMeshTCPConfiguration; private Acl acl; private Registry registry; + private static Trace trace; + private ConnectorResource connectorResource; private ServiceState serviceState; public EventMeshServer(EventMeshHTTPConfiguration eventMeshHttpConfiguration, - EventMeshTCPConfiguration eventMeshTcpConfiguration, + EventMeshTCPConfiguration eventMeshTCPConfiguration, EventMeshGrpcConfiguration eventMeshGrpcConfiguration) { this.eventMeshHttpConfiguration = eventMeshHttpConfiguration; - this.eventMeshTcpConfiguration = eventMeshTcpConfiguration; + this.eventMeshTCPConfiguration = eventMeshTCPConfiguration; this.eventMeshGrpcConfiguration = eventMeshGrpcConfiguration; this.acl = new Acl(); this.registry = new Registry(); + this.trace = new Trace(eventMeshHttpConfiguration.eventMeshServerTraceEnable); this.connectorResource = new ConnectorResource(); - ConfigurationContextUtil.putIfAbsent(ConfigurationContextUtil.TCP, eventMeshTcpConfiguration); + ConfigurationContextUtil.putIfAbsent(ConfigurationContextUtil.TCP, eventMeshTCPConfiguration); ConfigurationContextUtil.putIfAbsent(ConfigurationContextUtil.GRPC, eventMeshGrpcConfiguration); ConfigurationContextUtil.putIfAbsent(ConfigurationContextUtil.HTTP, eventMeshHttpConfiguration); } public void init() throws Exception { - if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerSecurityEnable) { acl.init(eventMeshHttpConfiguration.eventMeshSecurityPluginType); } // registry init - if (eventMeshTcpConfiguration != null - && eventMeshTcpConfiguration.eventMeshTcpServerEnabled - && eventMeshTcpConfiguration.eventMeshServerRegistryEnable) { - registry.init(eventMeshTcpConfiguration.eventMeshRegistryPluginType); + if (eventMeshTCPConfiguration != null + && eventMeshTCPConfiguration.eventMeshTcpServerEnabled + && eventMeshTCPConfiguration.eventMeshServerRegistryEnable) { + registry.init(eventMeshTCPConfiguration.eventMeshRegistryPluginType); } if (eventMeshGrpcConfiguration != null && eventMeshGrpcConfiguration.eventMeshServerRegistryEnable) { @@ -90,6 +93,10 @@ public void init() throws Exception { registry.init(eventMeshHttpConfiguration.eventMeshRegistryPluginType); } + if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerTraceEnable) { + trace.init(eventMeshHttpConfiguration.eventMeshTracePluginType); + } + connectorResource.init(eventMeshHttpConfiguration.eventMeshConnectorPluginType); // server init @@ -103,9 +110,9 @@ public void init() throws Exception { eventMeshHTTPServer.init(); } - if (eventMeshTcpConfiguration != null) { - eventMeshTCPServer = new EventMeshTCPServer(this, eventMeshTcpConfiguration, registry); - if (eventMeshTcpConfiguration.eventMeshTcpServerEnabled) { + if (eventMeshTCPConfiguration != null) { + eventMeshTCPServer = new EventMeshTCPServer(this, eventMeshTCPConfiguration, registry); + if (eventMeshTCPConfiguration.eventMeshTcpServerEnabled) { eventMeshTCPServer.init(); } } @@ -123,9 +130,9 @@ public void start() throws Exception { acl.start(); } // registry start - if (eventMeshTcpConfiguration != null - && eventMeshTcpConfiguration.eventMeshTcpServerEnabled - && eventMeshTcpConfiguration.eventMeshServerRegistryEnable) { + if (eventMeshTCPConfiguration != null + && eventMeshTCPConfiguration.eventMeshTcpServerEnabled + && eventMeshTCPConfiguration.eventMeshServerRegistryEnable) { registry.start(); } if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerRegistryEnable) { @@ -142,7 +149,7 @@ public void start() throws Exception { if (eventMeshHttpConfiguration != null) { eventMeshHTTPServer.start(); } - if (eventMeshTcpConfiguration != null && eventMeshTcpConfiguration.eventMeshTcpServerEnabled) { + if (eventMeshTCPConfiguration != null && eventMeshTCPConfiguration.eventMeshTcpServerEnabled) { eventMeshTCPServer.start(); } serviceState = ServiceState.RUNNING; @@ -153,7 +160,7 @@ public void shutdown() throws Exception { serviceState = ServiceState.STOPING; logger.info("server state:{}", serviceState); eventMeshHTTPServer.shutdown(); - if (eventMeshTcpConfiguration != null && eventMeshTcpConfiguration.eventMeshTcpServerEnabled) { + if (eventMeshTCPConfiguration != null && eventMeshTCPConfiguration.eventMeshTcpServerEnabled) { eventMeshTCPServer.shutdown(); } @@ -167,6 +174,11 @@ public void shutdown() throws Exception { acl.shutdown(); } + if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerTraceEnable) { + trace.shutdown(); + } + + ConfigurationContextUtil.clear(); serviceState = ServiceState.STOPED; logger.info("server state:{}", serviceState); @@ -184,6 +196,10 @@ public EventMeshTCPServer getEventMeshTCPServer() { return eventMeshTCPServer; } + public static Trace getTrace() { + return trace; + } + public ServiceState getServiceState() { return serviceState; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java index 64166a1ce1..72a2e49d07 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java @@ -53,6 +53,17 @@ public class EventMeshConstants { public static final String RSP_SEND_EVENTMESH_IP = "rspsendeventmeship"; public static final String RSP_RECEIVE_EVENTMESH_IP = "rspreceiveeventmeship"; + public static final String RSP_SYS = "rsp0sys"; + public static final String RSP_IP = "rsp0ip"; + public static final String RSP_IDC = "rsp0idc"; + public static final String RSP_GROUP = "rsp0group"; + public static final String RSP_URL = "rsp0url"; + + public static final String REQ_SYS = "req0sys"; + public static final String REQ_IP = "req0ip"; + public static final String REQ_IDC = "req0idc"; + public static final String REQ_GROUP = "req0group"; + //default TTL 4 hours public static final Integer DEFAULT_MSG_TTL_MILLS = 14400000; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java index 9b39b65c80..29594ab56e 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java @@ -23,6 +23,8 @@ import org.apache.eventmesh.common.protocol.tcp.OPStatus; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.SessionState; import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.GoodbyeTask; import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.HeartBeatTask; @@ -33,13 +35,18 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.RecommendTask; import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.SubscribeTask; import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.UnSubscribeTask; +import org.apache.eventmesh.runtime.trace.TraceUtils; import org.apache.eventmesh.runtime.util.EventMeshUtil; +import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import io.opentelemetry.api.trace.Span; + +import java.util.concurrent.TimeUnit; public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler { @@ -55,11 +62,27 @@ public EventMeshTcpMessageDispatcher(EventMeshTCPServer eventMeshTCPServer) { protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Exception { long startTime = System.currentTimeMillis(); validateMsg(pkg); - eventMeshTCPServer.getEventMeshTcpMonitor().getTcpSummaryMetrics().getClient2eventMeshMsgNum().incrementAndGet(); - Command cmd = null; + + eventMeshTCPServer.getEventMeshTcpMonitor().getTcpSummaryMetrics() + .getClient2eventMeshMsgNum().incrementAndGet(); + + Command cmd = pkg.getHeader().getCmd(); try { Runnable task; - cmd = pkg.getHeader().getCmd(); + + if (isNeedTrace(cmd)) { + pkg.getHeader().getProperties() + .put(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, startTime); + pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SEND_EVENTMESH_IP, + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp); + Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx); + + pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SYS, session.getClient().getSubsystem()); + pkg.getHeader().getProperties().put(EventMeshConstants.REQ_IP, session.getClient().getHost()); + pkg.getHeader().getProperties().put(EventMeshConstants.REQ_IDC, session.getClient().getIdc()); + pkg.getHeader().getProperties().put(EventMeshConstants.REQ_GROUP, session.getClient().getGroup()); + } + if (cmd.equals(Command.RECOMMEND_REQUEST)) { messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg); task = new RecommendTask(pkg, ctx, startTime, eventMeshTCPServer); @@ -80,22 +103,43 @@ protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Excep logMessageFlow(ctx, pkg, cmd); - if (eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getSessionState() == SessionState.CLOSED) { - throw new Exception("this eventMesh tcp session will be closed, may be reboot or version change!"); + if (eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx) + .getSessionState() == SessionState.CLOSED) { + throw new Exception( + "this eventMesh tcp session will be closed, may be reboot or version change!"); } dispatch(ctx, pkg, startTime, cmd); } catch (Exception e) { logger.error("exception occurred while pkg|cmd={}|pkg={}", cmd, pkg, e); + + if (isNeedTrace(cmd)) { + Span span = TraceUtils.prepareServerSpan(pkg.getHeader().getProperties(), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, startTime, + TimeUnit.MILLISECONDS, false); + TraceUtils.finishSpanWithException(span, pkg.getHeader().getProperties(), + "exception occurred while dispatch pkg", e); + } + writeToClient(cmd, pkg, ctx, e); } } + private boolean isNeedTrace(Command cmd) { + if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerTraceEnable + && cmd != null && (Command.REQUEST_TO_SERVER == cmd + || Command.ASYNC_MESSAGE_TO_SERVER == cmd + || Command.BROADCAST_MESSAGE_TO_SERVER == cmd)) { + return true; + } + return false; + } + private void writeToClient(Command cmd, Package pkg, ChannelHandlerContext ctx, Exception e) { try { Package res = new Package(); - res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(), pkg.getHeader() - .getSeq())); + res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(), + pkg.getHeader().getSeq())); ctx.writeAndFlush(res); } catch (Exception ex) { logger.warn("writeToClient failed", ex); @@ -131,11 +175,12 @@ private Command getReplyCommand(Command cmd) { private void logMessageFlow(ChannelHandlerContext ctx, Package pkg, Command cmd) { if (pkg.getBody() instanceof EventMeshMessage) { - messageLogger.info("pkg|c2eventMesh|cmd={}|Msg={}|user={}", cmd, EventMeshUtil.printMqMessage((EventMeshMessage) pkg - .getBody()), eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient()); + messageLogger.info("pkg|c2eventMesh|cmd={}|Msg={}|user={}", cmd, + EventMeshUtil.printMqMessage((EventMeshMessage) pkg.getBody()), + eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient()); } else { messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}|user={}", cmd, pkg, - eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient()); + eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient()); } } @@ -153,8 +198,8 @@ private void validateMsg(Package pkg) throws Exception { } } - private void dispatch(ChannelHandlerContext ctx, Package pkg, long startTime, Command cmd) throws - Exception { + private void dispatch(ChannelHandlerContext ctx, Package pkg, long startTime, Command cmd) + throws Exception { Runnable task; switch (cmd) { case HEARTBEAT_REQUEST: diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java index e5bfc1d62a..5da0acac21 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java @@ -24,6 +24,7 @@ import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; +import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.utils.JsonUtils; @@ -38,8 +39,10 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext; import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor; +import org.apache.eventmesh.runtime.trace.TraceUtils; import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.HttpTinyClient; +import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -61,6 +64,8 @@ import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Scope; import com.google.common.base.Preconditions; @@ -181,7 +186,8 @@ public MQProducerWrapper getMqProducerWrapper() { return mqProducerWrapper; } - public boolean addSubscription(SubscriptionItem subscriptionItem, Session session) throws Exception { + public boolean addSubscription(SubscriptionItem subscriptionItem, Session session) + throws Exception { if (subscriptionItem == null) { logger.error("addSubscription param error,subscriptionItem is null", session); return false; @@ -416,75 +422,86 @@ public synchronized void initClientGroupPersistentConsumer() throws Exception { persistentMsgConsumer.init(keyValue); EventListener listener = (event, context) -> { - eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum() - .incrementAndGet(); - event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, - String.valueOf(System.currentTimeMillis())) - .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, - eventMeshTCPConfiguration.eventMeshServerIp).build(); - String topic = event.getSubject(); - - EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = - (EventMeshAsyncConsumeContext) context; - Session session = downstreamDispatchStrategy - .select(group, topic, groupConsumerSessions); - String bizSeqNo = EventMeshUtil.getMessageBizSeq(event); - if (session == null) { - try { - Integer sendBackTimes = 0; - String sendBackFromEventMeshIp = ""; - if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension( - EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) { - sendBackTimes = (Integer) event.getExtension( - EventMeshConstants.EVENTMESH_SEND_BACK_TIMES); - } - if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension( - EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) { - sendBackFromEventMeshIp = (String) event.getExtension( - EventMeshConstants.EVENTMESH_SEND_BACK_IP); - } - - logger.error( - "found no session to downstream msg,groupName:{}, topic:{}, " - + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}", - group, topic, bizSeqNo, sendBackTimes, - sendBackFromEventMeshIp); + String protocolVersion = + Objects.requireNonNull(event.getExtension(Constants.PROTOCOL_VERSION)).toString(); + + Span span = TraceUtils.prepareServerSpan( + EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event), + EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false); + + try (Scope scope = span.makeCurrent()) { + eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum() + .incrementAndGet(); + event = CloudEventBuilder.from(event) + .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, + String.valueOf(System.currentTimeMillis())) + .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, + eventMeshTCPConfiguration.eventMeshServerIp).build(); + String topic = event.getSubject(); + + EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = + (EventMeshAsyncConsumeContext) context; + Session session = downstreamDispatchStrategy + .select(group, topic, groupConsumerSessions); + String bizSeqNo = EventMeshUtil.getMessageBizSeq(event); + if (session == null) { + try { + Integer sendBackTimes = 0; + String sendBackFromEventMeshIp = ""; + if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension( + EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) { + sendBackTimes = (Integer) event.getExtension( + EventMeshConstants.EVENTMESH_SEND_BACK_TIMES); + } + if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension( + EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) { + sendBackFromEventMeshIp = (String) event.getExtension( + EventMeshConstants.EVENTMESH_SEND_BACK_IP); + } - if (sendBackTimes >= eventMeshTCPServer - .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) { logger.error( - "sendBack to broker over max times:{}, groupName:{}, topic:{}, " - + "bizSeqNo:{}", eventMeshTCPServer - .getEventMeshTCPConfiguration() - .eventMeshTcpSendBackMaxTimes, - group, topic, bizSeqNo); - } else { - sendBackTimes++; - event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, - sendBackTimes.toString()) - .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP, - eventMeshTCPConfiguration.eventMeshServerIp).build(); - sendMsgBackToBroker(event, bizSeqNo); + "found no session to downstream msg,groupName:{}, topic:{}, " + + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}", + group, topic, bizSeqNo, sendBackTimes, + sendBackFromEventMeshIp); + + if (sendBackTimes >= eventMeshTCPServer + .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) { + logger.error( + "sendBack to broker over max times:{}, groupName:{}, topic:{}, " + + "bizSeqNo:{}", eventMeshTCPServer + .getEventMeshTCPConfiguration() + .eventMeshTcpSendBackMaxTimes, + group, topic, bizSeqNo); + } else { + sendBackTimes++; + event = CloudEventBuilder.from(event) + .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, + sendBackTimes.toString()) + .withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP, + eventMeshTCPConfiguration.eventMeshServerIp).build(); + sendMsgBackToBroker(event, bizSeqNo); + } + } catch (Exception e) { + logger.warn("handle msg exception when no session found", e); } - } catch (Exception e) { - logger.warn("handle msg exception when no session found", e); + + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + return; } - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); - return; + SubscriptionItem subscriptionItem = subscriptions.get(topic); + DownStreamMsgContext downStreamMsgContext = + new DownStreamMsgContext(event, session, persistentMsgConsumer, + eventMeshAsyncConsumeContext.getAbstractContext(), false, + subscriptionItem); + //msg put in eventmesh,waiting client ack + session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); + session.downstreamMsg(downStreamMsgContext); + eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); + } finally { + TraceUtils.finishSpan(span, event); } - - SubscriptionItem subscriptionItem = subscriptions.get(topic); - DownStreamMsgContext downStreamMsgContext = - new DownStreamMsgContext(event, session, persistentMsgConsumer, - eventMeshAsyncConsumeContext.getAbstractContext(), false, - subscriptionItem); - //msg put in eventmesh,waiting client ack - session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); - session.downstreamMsg(downStreamMsgContext); - eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); }; persistentMsgConsumer.registerEventListener(listener); @@ -515,57 +532,67 @@ public synchronized void initClientGroupBroadcastConsumer() throws Exception { broadCastMsgConsumer.init(keyValue); EventListener listener = (event, context) -> { + String protocolVersion = + Objects.requireNonNull(event.getExtension(Constants.PROTOCOL_VERSION)).toString(); + + Span span = TraceUtils.prepareServerSpan( + EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event), + EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false); + try (Scope scope = span.makeCurrent()) { + eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum() + .incrementAndGet(); + event = CloudEventBuilder.from(event) + .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, + String.valueOf(System.currentTimeMillis())) + .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, + eventMeshTCPConfiguration.eventMeshServerIp).build(); + String topic = event.getSubject(); + + EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = + (EventMeshAsyncConsumeContext) context; + if (CollectionUtils.isEmpty(groupConsumerSessions)) { + logger.warn("found no session to downstream broadcast msg"); + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + return; + } - eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum() - .incrementAndGet(); - event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, - String.valueOf(System.currentTimeMillis())) - .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, - eventMeshTCPConfiguration.eventMeshServerIp).build(); - String topic = event.getSubject(); - - EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = - (EventMeshAsyncConsumeContext) context; - if (CollectionUtils.isEmpty(groupConsumerSessions)) { - logger.warn("found no session to downstream broadcast msg"); - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); - return; - } + Iterator sessionsItr = groupConsumerSessions.iterator(); - Iterator sessionsItr = groupConsumerSessions.iterator(); + SubscriptionItem subscriptionItem = subscriptions.get(topic); + DownStreamMsgContext downStreamMsgContext = + new DownStreamMsgContext(event, null, broadCastMsgConsumer, + eventMeshAsyncConsumeContext.getAbstractContext(), false, + subscriptionItem); - SubscriptionItem subscriptionItem = subscriptions.get(topic); - DownStreamMsgContext downStreamMsgContext = - new DownStreamMsgContext(event, null, broadCastMsgConsumer, - eventMeshAsyncConsumeContext.getAbstractContext(), false, - subscriptionItem); + while (sessionsItr.hasNext()) { + Session session = sessionsItr.next(); - while (sessionsItr.hasNext()) { - Session session = sessionsItr.next(); + if (!session.isAvailable(topic)) { + logger + .warn("downstream broadcast msg,session is not available,client:{}", + session.getClient()); + continue; + } - if (!session.isAvailable(topic)) { - logger - .warn("downstream broadcast msg,session is not available,client:{}", - session.getClient()); - continue; + downStreamMsgContext.session = session; + + //downstream broadcast msg asynchronously + eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService() + .submit(new Runnable() { + @Override + public void run() { + //msg put in eventmesh,waiting client ack + session.getPusher() + .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); + session.downstreamMsg(downStreamMsgContext); + } + }); } - downStreamMsgContext.session = session; - - //downstream broadcast msg asynchronously - eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService() - .submit(new Runnable() { - @Override - public void run() { - //msg put in eventmesh,waiting client ack - session.getPusher() - .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); - session.downstreamMsg(downStreamMsgContext); - } - }); + eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); + } finally { + TraceUtils.finishSpan(span, event); } - eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); }; broadCastMsgConsumer.registerEventListener(listener); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java index 1ac0a2a95a..8668d6b9fa 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java @@ -29,7 +29,9 @@ import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.trace.TraceUtils; import org.apache.eventmesh.runtime.util.EventMeshUtil; +import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; import org.apache.commons.collections4.CollectionUtils; @@ -43,6 +45,7 @@ import io.cloudevents.core.builder.CloudEventBuilder; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.opentelemetry.api.trace.Span; public class SessionPusher { @@ -65,12 +68,12 @@ public SessionPusher(Session session) { @Override public String toString() { return "SessionPusher{" - + - "deliveredMsgsCount=" + deliveredMsgsCount.longValue() - + - ",deliverFailCount=" + deliverFailMsgsCount.longValue() - + - ",unAckMsg=" + CollectionUtils.size(downStreamMap) + '}'; + + + "deliveredMsgsCount=" + deliveredMsgsCount.longValue() + + + ",deliverFailCount=" + deliverFailMsgsCount.longValue() + + + ",unAckMsg=" + CollectionUtils.size(downStreamMap) + '}'; } public void push(final DownStreamMsgContext downStreamMsgContext) { @@ -90,8 +93,12 @@ public void push(final DownStreamMsgContext downStreamMsgContext) { Package pkg = new Package(); downStreamMsgContext.event = CloudEventBuilder.from(downStreamMsgContext.event) - .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis())) - .build(); + .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis())) + .withExtension(EventMeshConstants.RSP_SYS, session.getClient().getSubsystem()) + .withExtension(EventMeshConstants.RSP_GROUP, session.getClient().getGroup()) + .withExtension(EventMeshConstants.RSP_IDC, session.getClient().getIdc()) + .withExtension(EventMeshConstants.RSP_IP, session.getClient().getHost()) + .build(); EventMeshMessage body = null; int retCode = 0; String retMsg = null; @@ -107,31 +114,38 @@ public void push(final DownStreamMsgContext downStreamMsgContext) { } finally { session.getClientGroupWrapper().get().getEventMeshTcpMonitor().getTcpSummaryMetrics().getEventMesh2clientMsgNum().incrementAndGet(); - session.getContext().writeAndFlush(pkg).addListener( + //TODO uploadTrace + String protocolVersion = Objects.requireNonNull(downStreamMsgContext.event.getExtension(Constants.PROTOCOL_VERSION)).toString(); + + Span span = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, downStreamMsgContext.event), + EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_CLIENT_SPAN, false); + + try { + session.getContext().writeAndFlush(pkg).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { logger.error("downstreamMsg fail,seq:{}, retryTimes:{}, event:{}", downStreamMsgContext.seq, - downStreamMsgContext.retryTimes, downStreamMsgContext.event); + downStreamMsgContext.retryTimes, downStreamMsgContext.event); deliverFailMsgsCount.incrementAndGet(); //how long to isolate client when push fail long isolateTime = System.currentTimeMillis() - + session.getEventMeshTCPConfiguration().eventMeshTcpPushFailIsolateTimeInMills; + + session.getEventMeshTCPConfiguration().eventMeshTcpPushFailIsolateTimeInMills; session.setIsolateTime(isolateTime); logger.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime); //retry long delayTime = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) - ? session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetrySyncDelayInMills - : session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryAsyncDelayInMills; + ? session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetrySyncDelayInMills + : session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryAsyncDelayInMills; downStreamMsgContext.delay(delayTime); session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(downStreamMsgContext); } else { deliveredMsgsCount.incrementAndGet(); logger.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, - downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event)); + downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event)); if (session.isIsolated()) { logger.info("cancel isolated,client:{}", session.getClient()); @@ -140,7 +154,11 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } - ); + ); + } finally { + TraceUtils.finishSpan(span, downStreamMsgContext.event); + } + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java index 7ff0d6a446..b3191fe326 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java @@ -26,9 +26,18 @@ import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; +import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; +import org.apache.eventmesh.runtime.boot.EventMeshServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.trace.TraceUtils; +import org.apache.eventmesh.runtime.util.EventMeshCloudEventWriter; +import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.Utils; +import org.apache.eventmesh.trace.api.EventMeshSpan; +import org.apache.eventmesh.trace.api.EventMeshTraceContext; +import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; +import org.apache.eventmesh.trace.api.propagation.EventMeshContextCarrier; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; @@ -42,7 +51,16 @@ import org.slf4j.LoggerFactory; import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.v03.CloudEventV03; +import io.cloudevents.core.v1.CloudEventV1; +import io.netty.channel.ChannelHandlerContext; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; public class SessionSender { @@ -62,12 +80,12 @@ public class SessionSender { @Override public String toString() { return "SessionSender{upstreamBuff=" + upstreamBuff.availablePermits() - + - ",upMsgs=" + upMsgs.longValue() - + - ",failMsgCount=" + failMsgCount.longValue() - + - ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + '}'; + + + ",upMsgs=" + upMsgs.longValue() + + + ",failMsgCount=" + failMsgCount.longValue() + + + ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + '}'; } public Semaphore getUpstreamBuff() { @@ -87,16 +105,27 @@ public EventMeshTcpSendResult send(Header header, CloudEvent event, SendCallback upMsgs.incrementAndGet(); UpStreamMsgContext upStreamMsgContext = null; Command cmd = header.getCmd(); + + String protocolVersion = header.getProperty(Constants.PROTOCOL_VERSION).toString(); + long ttl = EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS; if (Command.REQUEST_TO_SERVER == cmd) { if (event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_TTL) != null) { ttl = Long.parseLong((String) Objects.requireNonNull( - event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_TTL))); + event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_TTL))); } upStreamMsgContext = new UpStreamMsgContext(session, event, header, startTime, taskExecuteTime); - session.getClientGroupWrapper().get().request(upStreamMsgContext, initSyncRRCallback(header, - startTime, taskExecuteTime), ttl); - upstreamBuff.release(); + + Span span = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_CLIENT_SPAN, false); + try { + session.getClientGroupWrapper().get() + .request(upStreamMsgContext, initSyncRRCallback(header, + startTime, taskExecuteTime, event), ttl); + upstreamBuff.release(); + } finally { + TraceUtils.finishSpan(span, event); + } } else if (Command.RESPONSE_TO_SERVER == cmd) { String cluster = (String) event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_CLUSTER); if (!StringUtils.isEmpty(cluster)) { @@ -110,7 +139,15 @@ public EventMeshTcpSendResult send(Header header, CloudEvent event, SendCallback upstreamBuff.release(); } else { upStreamMsgContext = new UpStreamMsgContext(session, event, header, startTime, taskExecuteTime); - session.getClientGroupWrapper().get().send(upStreamMsgContext, sendCallback); + + Span span = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_CLIENT_SPAN, false); + try { + session.getClientGroupWrapper().get() + .send(upStreamMsgContext, sendCallback); + } finally { + TraceUtils.finishSpan(span, event); + } } session.getClientGroupWrapper().get().getEventMeshTcpMonitor().getTcpSummaryMetrics().getEventMesh2mqMsgNum().incrementAndGet(); @@ -129,16 +166,16 @@ public EventMeshTcpSendResult send(Header header, CloudEvent event, SendCallback return new EventMeshTcpSendResult(header.getSeq(), EventMeshTcpSendStatus.SUCCESS, EventMeshTcpSendStatus.SUCCESS.name()); } - private RequestReplyCallback initSyncRRCallback(Header header, long startTime, long taskExecuteTime) { + private RequestReplyCallback initSyncRRCallback(Header header, long startTime, long taskExecuteTime, CloudEvent cloudEvent) { return new RequestReplyCallback() { @Override public void onSuccess(CloudEvent event) { String seq = header.getSeq(); // TODO: How to assign values here event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())) - .withExtension(EventMeshConstants.RSP_RECEIVE_EVENTMESH_IP, session.getEventMeshTCPConfiguration().eventMeshServerIp) - .build(); + .withExtension(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())) + .withExtension(EventMeshConstants.RSP_RECEIVE_EVENTMESH_IP, session.getEventMeshTCPConfiguration().eventMeshServerIp) + .build(); session.getClientGroupWrapper().get().getEventMeshTcpMonitor().getTcpSummaryMetrics().getMq2eventMeshMsgNum().incrementAndGet(); Command cmd; @@ -149,8 +186,8 @@ public void onSuccess(CloudEvent event) { return; } event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis())) - .build(); + .withExtension(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis())) + .build(); String protocolType = Objects.requireNonNull(event.getExtension(Constants.PROTOCOL_TYPE)).toString(); ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType); @@ -165,12 +202,16 @@ public void onSuccess(CloudEvent event) { pkg.setHeader(new Header(cmd, OPStatus.FAIL.getCode(), null, seq)); } finally { Utils.writeAndFlush(pkg, startTime, taskExecuteTime, session.getContext(), session); + + TraceUtils.finishSpan(session.getContext(), event); } } @Override public void onException(Throwable e) { messageLogger.error("exception occur while sending RR message|user={}", session.getClient(), new Exception(e)); + + TraceUtils.finishSpanWithException(session.getContext(), cloudEvent, "exception occur while sending RR message", e); } }; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java index af4d26a250..944b2cb699 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java @@ -31,13 +31,18 @@ import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; import org.apache.eventmesh.runtime.acl.Acl; +import org.apache.eventmesh.runtime.boot.EventMeshServer; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendResult; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendStatus; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext; +import org.apache.eventmesh.runtime.trace.AttributeKeys; +import org.apache.eventmesh.runtime.trace.SpanKey; +import org.apache.eventmesh.runtime.trace.TraceUtils; import org.apache.eventmesh.runtime.util.RemotingHelper; import org.apache.eventmesh.runtime.util.Utils; +import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; import org.apache.commons.lang3.StringUtils; @@ -52,6 +57,9 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; public class MessageTransferTask extends AbstractTask { @@ -68,21 +76,39 @@ public MessageTransferTask(Package pkg, ChannelHandlerContext ctx, long startTim public void run() { long taskExecuteTime = System.currentTimeMillis(); Command cmd = pkg.getHeader().getCmd(); - Command replyCmd = getReplyCmd(cmd); - String protocolType = "EventMeshMessage"; - if (pkg.getHeader().getProperties() != null && pkg.getHeader().getProperty(Constants.PROTOCOL_TYPE) != null) { - protocolType = (String) pkg.getHeader().getProperty(Constants.PROTOCOL_TYPE); + + try { + if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerTraceEnable && !RESPONSE_TO_SERVER.equals(cmd)) { + //attach the span to the server context + Span span = TraceUtils.prepareServerSpan(pkg.getHeader().getProperties(), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, + startTime, TimeUnit.MILLISECONDS, true); + Context context = Context.current().with(SpanKey.SERVER_KEY, span); + //put the context in channel + ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).set(context); + } + } catch (Throwable ex) { + logger.warn("upload trace fail in MessageTransferTask[server-span-start]", ex); } - ProtocolAdaptor protocolAdaptor = - ProtocolPluginFactory.getProtocolAdaptor(protocolType); + + + Command replyCmd = getReplyCmd(cmd); Package msg = new Package(); - //EventMeshMessage eventMeshMessage = (EventMeshMessage) pkg.getBody(); int retCode = 0; EventMeshTcpSendResult sendStatus; CloudEvent event = null; + try { + String protocolType = "EventMeshMessage"; + if (pkg.getHeader().getProperties() != null + && pkg.getHeader().getProperty(Constants.PROTOCOL_TYPE) != null) { + protocolType = (String) pkg.getHeader().getProperty(Constants.PROTOCOL_TYPE); + } + ProtocolAdaptor protocolAdaptor = + ProtocolPluginFactory.getProtocolAdaptor(protocolType); event = protocolAdaptor.toCloudEvent(pkg); + if (event == null) { throw new Exception("event is null"); } @@ -96,49 +122,66 @@ public void run() { //do acl check in sending msg if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable) { String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); - Acl.doAclCheckInTcpSend(remoteAddr, session.getClient(), event.getSubject(), cmd.value()); + Acl.doAclCheckInTcpSend(remoteAddr, session.getClient(), event.getSubject(), + cmd.value()); } - if (eventMeshTCPServer.getRateLimiter().tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) { - synchronized (session) { - long sendTime = System.currentTimeMillis(); - event = addTimestamp(event, cmd, sendTime); + if (!eventMeshTCPServer.getRateLimiter() + .tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) { - sendStatus = session - .upstreamMsg(pkg.getHeader(), event, createSendCallback(replyCmd, taskExecuteTime, event), - startTime, taskExecuteTime); - - if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) { - messageLogger.info("pkg|eventMesh2mq|cmd={}|Msg={}|user={}|wait={}ms|cost={}ms", cmd, event, - session.getClient(), taskExecuteTime - startTime, sendTime - startTime); - } else { - throw new Exception(sendStatus.getDetail()); - } - } - } else { - msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), "Tps overload, global flow control", - pkg.getHeader().getSeq())); + msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), + "Tps overload, global flow control", + pkg.getHeader().getSeq())); ctx.writeAndFlush(msg).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, taskExecuteTime); - } + new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, + taskExecuteTime); } + } ); - logger.warn("======Tps overload, global flow control, rate:{}! PLEASE CHECK!========", - eventMeshTCPServer.getRateLimiter().getRate()); + + TraceUtils.finishSpanWithException(ctx, event, "Tps overload, global flow control", + null); + + logger.warn( + "======Tps overload, global flow control, rate:{}! PLEASE CHECK!========", + eventMeshTCPServer.getRateLimiter().getRate()); return; } + + synchronized (session) { + long sendTime = System.currentTimeMillis(); + event = addTimestamp(event, cmd, sendTime); + + sendStatus = session + .upstreamMsg(pkg.getHeader(), event, + createSendCallback(replyCmd, taskExecuteTime, event), + startTime, taskExecuteTime); + + if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), + sendStatus.getSendStatus().name())) { + messageLogger.info("pkg|eventMesh2mq|cmd={}|Msg={}|user={}|wait={}ms|cost={}ms", + cmd, event, + session.getClient(), taskExecuteTime - startTime, sendTime - startTime); + } else { + throw new Exception(sendStatus.getDetail()); + } + } } catch (Exception e) { - logger - .error("MessageTransferTask failed|cmd={}|event={}|user={}|errMsg={}", cmd, event, session.getClient(), - e); + logger.error("MessageTransferTask failed|cmd={}|event={}|user={}", cmd, event, + session.getClient(), + e); + if (!cmd.equals(RESPONSE_TO_SERVER)) { msg.setHeader( - new Header(replyCmd, OPStatus.FAIL.getCode(), e.getStackTrace().toString(), pkg.getHeader() - .getSeq())); + new Header(replyCmd, OPStatus.FAIL.getCode(), e.toString(), + pkg.getHeader() + .getSeq())); Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session); + + TraceUtils.finishSpanWithException(ctx, event, "MessageTransferTask failed", e); } } } @@ -146,18 +189,22 @@ public void operationComplete(ChannelFuture future) throws Exception { private CloudEvent addTimestamp(CloudEvent event, Command cmd, long sendTime) { if (cmd.equals(RESPONSE_TO_SERVER)) { event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.RSP_C2EVENTMESH_TIMESTAMP, String.valueOf(startTime)) - .withExtension(EventMeshConstants.RSP_EVENTMESH2MQ_TIMESTAMP, String.valueOf(sendTime)) - .withExtension(EventMeshConstants.RSP_SEND_EVENTMESH_IP, - eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp) - .build(); + .withExtension(EventMeshConstants.RSP_C2EVENTMESH_TIMESTAMP, + String.valueOf(startTime)) + .withExtension(EventMeshConstants.RSP_EVENTMESH2MQ_TIMESTAMP, + String.valueOf(sendTime)) + .withExtension(EventMeshConstants.RSP_SEND_EVENTMESH_IP, + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp) + .build(); } else { event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(startTime)) - .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(sendTime)) - .withExtension(EventMeshConstants.REQ_SEND_EVENTMESH_IP, - eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp) - .build(); + .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, + String.valueOf(startTime)) + .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, + String.valueOf(sendTime)) + .withExtension(EventMeshConstants.REQ_SEND_EVENTMESH_IP, + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp) + .build(); } return event; } @@ -175,7 +222,8 @@ private Command getReplyCmd(Command cmd) { } } - protected SendCallback createSendCallback(Command replyCmd, long taskExecuteTime, CloudEvent event) { + protected SendCallback createSendCallback(Command replyCmd, long taskExecuteTime, + CloudEvent event) { final long createTime = System.currentTimeMillis(); Package msg = new Package(); @@ -183,14 +231,20 @@ protected SendCallback createSendCallback(Command replyCmd, long taskExecuteTime @Override public void onSuccess(SendResult sendResult) { session.getSender().getUpstreamBuff().release(); - messageLogger.info("upstreamMsg message success|user={}|callback cost={}", session.getClient(), - System.currentTimeMillis() - createTime); - if (replyCmd.equals(Command.BROADCAST_MESSAGE_TO_SERVER_ACK) || replyCmd.equals(Command - .ASYNC_MESSAGE_TO_SERVER_ACK)) { - msg.setHeader(new Header(replyCmd, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(), + messageLogger.info("upstreamMsg message success|user={}|callback cost={}", + session.getClient(), + System.currentTimeMillis() - createTime); + if (replyCmd.equals(Command.BROADCAST_MESSAGE_TO_SERVER_ACK) + || replyCmd.equals(Command.ASYNC_MESSAGE_TO_SERVER_ACK)) { + msg.setHeader( + new Header(replyCmd, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(), pkg.getHeader().getSeq())); msg.setBody(event); - Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session); + Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), + session); + + //async request need finish span when callback, rr request will finish span when rrCallback + TraceUtils.finishSpan(ctx, event); } } @@ -200,20 +254,31 @@ public void onException(OnExceptionContext context) { // retry UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext( - session, event, pkg.getHeader(), startTime, taskExecuteTime); + session, event, pkg.getHeader(), startTime, taskExecuteTime); upStreamMsgContext.delay(10000); - session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(upStreamMsgContext); + session.getClientGroupWrapper().get().getEventMeshTcpRetryer() + .pushRetry(upStreamMsgContext); session.getSender().failMsgCount.incrementAndGet(); messageLogger - .error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", session.getClient(), - (System.currentTimeMillis() - createTime), new Exception(context.getException())); - msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), context.getException().toString(), + .error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", + session.getClient(), + (System.currentTimeMillis() - createTime), + new Exception(context.getException())); + msg.setHeader( + new Header(replyCmd, OPStatus.FAIL.getCode(), context.getException().toString(), pkg.getHeader().getSeq())); msg.setBody(event); Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session); - } + //both rr request and async request need finish span when reqeust fail + if (!replyCmd.equals(RESPONSE_TO_SERVER)) { + //upload trace + TraceUtils.finishSpanWithException(ctx, event, + "upload trace fail in MessageTransferTask.createSendCallback.onException", + context.getException()); + } + } }; } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/AttributeKeys.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/AttributeKeys.java index 10214a8aa1..018e367522 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/AttributeKeys.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/AttributeKeys.java @@ -20,24 +20,29 @@ import io.netty.util.AttributeKey; import io.opentelemetry.context.Context; +import org.apache.eventmesh.trace.api.EventMeshTraceContext; + /** * keys. */ public final class AttributeKeys { public static final AttributeKey WRITE_CONTEXT = - AttributeKey.valueOf(AttributeKeys.class, "passed-context"); + AttributeKey.valueOf(AttributeKeys.class, "passed-context"); // this is the context that has the server span // // note: this attribute key is also used by ratpack instrumentation public static final AttributeKey SERVER_CONTEXT = - AttributeKey.valueOf(AttributeKeys.class, "server-span"); + AttributeKey.valueOf(AttributeKeys.class, "server-span"); public static final AttributeKey CLIENT_CONTEXT = - AttributeKey.valueOf(AttributeKeys.class, "client-context"); + AttributeKey.valueOf(AttributeKeys.class, "client-context"); public static final AttributeKey CLIENT_PARENT_CONTEXT = - AttributeKey.valueOf(AttributeKeys.class, "client-parent-context"); + AttributeKey.valueOf(AttributeKeys.class, "client-parent-context"); + + public static final AttributeKey EVENTMESH_SERVER_CONTEXT = + AttributeKey.valueOf(AttributeKeys.class, "server-span"); private AttributeKeys() { } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/Trace.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/Trace.java new file mode 100644 index 0000000000..f1d94cde70 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/Trace.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.trace; + +import org.apache.eventmesh.trace.api.EventMeshTraceService; +import org.apache.eventmesh.trace.api.TracePluginFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.cloudevents.CloudEvent; +import io.netty.channel.ChannelHandlerContext; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Context; + +public class Trace { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private boolean useTrace = false; + private EventMeshTraceService eventMeshTraceService; + + public Trace(boolean useTrace) { + this.useTrace = useTrace; + } + + public void init(String tracePluginType) throws Exception { + if (useTrace) { + eventMeshTraceService = TracePluginFactory.getEventMeshTraceService(tracePluginType); + eventMeshTraceService.init(); + } + } + + public Span createSpan(String spanName, SpanKind spanKind, long startTime, TimeUnit timeUnit, + Context context, boolean isSpanFinishInOtherThread) { + if (!useTrace) { + return null; + } + return eventMeshTraceService.createSpan(spanName, spanKind, startTime, timeUnit, context, + isSpanFinishInOtherThread); + } + + public Span createSpan(String spanName, SpanKind spanKind, Context context, + boolean isSpanFinishInOtherThread) { + if (!useTrace) { + return null; + } + return eventMeshTraceService.createSpan(spanName, spanKind, context, + isSpanFinishInOtherThread); + } + + public Context extractFrom(Context context, Map map) { + if (!useTrace) { + return null; + } + if (map == null) { + return context; + } + return eventMeshTraceService.extractFrom(context, map); + } + + public void inject(Context context, Map map) { + if (!useTrace) { + return; + } + if (context == null || map == null) { + return; + } + eventMeshTraceService.inject(context, map); + } + + public Span addTraceInfoToSpan(ChannelHandlerContext ctx, CloudEvent cloudEvent) { + if (!useTrace) { + return null; + } + Context context = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).get(); + Span span = context != null ? context.get(SpanKey.SERVER_KEY) : null; + + if (span == null) { + logger.warn("span is null when finishSpan"); + return null; + } + + //add trace info + for (String entry : cloudEvent.getExtensionNames()) { + span.setAttribute(entry, cloudEvent.getExtension(entry).toString()); + } + return span; + } + + public Span addTraceInfoToSpan(Span span, CloudEvent cloudEvent) { + if (!useTrace) { + return null; + } + + if (span == null) { + logger.warn("span is null when finishSpan"); + return null; + } + + if (cloudEvent == null) { + return span; + } + + for (String entry : cloudEvent.getExtensionNames()) { + span.setAttribute(entry, cloudEvent.getExtension(entry).toString()); + } + return span; + } + + public Span addTraceInfoToSpan(Span span, Map map) { + if (!useTrace) { + return null; + } + + if (span == null) { + logger.warn("span is null when finishSpan"); + return null; + } + + if (map == null || map.size() < 1) { + return span; + } + + for (Map.Entry entry : map.entrySet()) { + span.setAttribute(entry.getKey(), entry.getValue().toString()); + } + return span; + } + + public void finishSpan(ChannelHandlerContext ctx, StatusCode statusCode) { + try { + if (useTrace) { + Context context = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).get(); + Span span = context != null ? context.get(SpanKey.SERVER_KEY) : null; + + if (span == null) { + logger.warn("span is null when finishSpan"); + return; + } + if (statusCode != null) { + span.setStatus(statusCode); + } + span.end(); + } + } catch (Exception e) { + logger.warn("finishSpan occur exception,", e); + } + } + + public void finishSpan(Span span, StatusCode statusCode) { + try { + if (useTrace) { + if (span == null) { + logger.warn("span is null when finishSpan"); + return; + } + if (statusCode != null) { + span.setStatus(statusCode); + } + span.end(); + } + } catch (Exception e) { + logger.warn("finishSpan occur exception,", e); + } + } + + public void finishSpan(Span span, StatusCode statusCode, String errMsg, Throwable throwable) { + try { + if (useTrace) { + if (span == null) { + logger.warn("span is null when finishSpan"); + return; + } + if (statusCode != null) { + span.setStatus(statusCode, errMsg); + } + if (throwable != null) { + span.recordException(throwable); + } + span.end(); + } + } catch (Exception e) { + logger.warn("finishSpan occur exception,", e); + } + } + + public void finishSpan(ChannelHandlerContext ctx, StatusCode statusCode, String errMsg, + Throwable throwable) { + try { + if (useTrace) { + Context context = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).get(); + Span span = context != null ? context.get(SpanKey.SERVER_KEY) : null; + + if (span == null) { + logger.warn("span is null when finishSpan"); + return; + } + if (statusCode != null) { + span.setStatus(statusCode, errMsg); + } + if (throwable != null) { + span.recordException(throwable); + } + span.end(); + } + } catch (Exception e) { + logger.warn("finishSpan occur exception,", e); + } + } + + public void shutdown() throws Exception { + if (useTrace) { + eventMeshTraceService.shutdown(); + } + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/TraceUtils.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/TraceUtils.java new file mode 100644 index 0000000000..b8133c3a19 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/TraceUtils.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.trace; + +import org.apache.eventmesh.runtime.boot.EventMeshServer; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.cloudevents.CloudEvent; +import io.netty.channel.ChannelHandlerContext; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Context; + +public class TraceUtils { + private static Logger logger = LoggerFactory.getLogger(TraceUtils.class); + + public static Span prepareClientSpan(Map map, String spanName, + boolean isSpanFinishInOtherThread) { + Span span = null; + try { + span = EventMeshServer.getTrace().createSpan( + spanName, SpanKind.CLIENT, Context.current(), isSpanFinishInOtherThread); + EventMeshServer.getTrace().inject(Context.current(), map); + } catch (Throwable ex) { + logger.warn("upload trace fail when prepareSpan", ex); + } + return span; + } + + public static Span prepareServerSpan(Map map, String spanName, + boolean isSpanFinishInOtherThread) { + Span span = null; + try { + Context traceContext = EventMeshServer.getTrace().extractFrom(Context.current(), map); + span = EventMeshServer.getTrace() + .createSpan(spanName, SpanKind.SERVER, traceContext, isSpanFinishInOtherThread); + } catch (Throwable ex) { + logger.warn("upload trace fail when prepareSpan", ex); + } + return span; + } + + public static Span prepareServerSpan(Map map, String spanName, long startTime, + TimeUnit timeUnit, boolean isSpanFinishInOtherThread) { + Span span = null; + try { + Context traceContext = EventMeshServer.getTrace().extractFrom(Context.current(), map); + if (startTime > 0) { + span = EventMeshServer.getTrace() + .createSpan(spanName, SpanKind.SERVER, startTime, timeUnit, traceContext, + isSpanFinishInOtherThread); + } else { + span = EventMeshServer.getTrace() + .createSpan(spanName, SpanKind.SERVER, traceContext, isSpanFinishInOtherThread); + } + } catch (Throwable ex) { + logger.warn("upload trace fail when prepareSpan", ex); + } + return span; + } + + + public static void finishSpan(Span span, CloudEvent event) { + try { + logger.debug("finishSpan with event:{}", event); + EventMeshServer.getTrace().addTraceInfoToSpan(span, event); + EventMeshServer.getTrace().finishSpan(span, StatusCode.OK); + } catch (Throwable ex) { + logger.warn("upload trace fail when finishSpan", ex); + } + + } + + public static void finishSpan(ChannelHandlerContext ctx, CloudEvent event) { + try { + logger.debug("finishSpan with event:{}", event); + EventMeshServer.getTrace().addTraceInfoToSpan(ctx, event); + EventMeshServer.getTrace().finishSpan(ctx, StatusCode.OK); + } catch (Throwable ex) { + logger.warn("upload trace fail when finishSpan", ex); + } + + } + + public static void finishSpanWithException(ChannelHandlerContext ctx, CloudEvent event, + String errMsg, Throwable e) { + try { + logger.debug("finishSpanWithException with event:{}", event); + EventMeshServer.getTrace().addTraceInfoToSpan(ctx, event); + EventMeshServer.getTrace().finishSpan(ctx, StatusCode.ERROR, errMsg, e); + } catch (Throwable ex) { + logger.warn("upload trace fail when finishSpanWithException", ex); + } + } + + public static void finishSpanWithException(Span span, Map map, String errMsg, + Throwable e) { + try { + logger.debug("finishSpanWithException with map:{}", map); + EventMeshServer.getTrace().addTraceInfoToSpan(span, map); + EventMeshServer.getTrace().finishSpan(span, StatusCode.ERROR, errMsg, e); + } catch (Throwable ex) { + logger.warn("upload trace fail when finishSpanWithException", ex); + } + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshCloudEventWriter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshCloudEventWriter.java new file mode 100644 index 0000000000..e939ac25e5 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshCloudEventWriter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.util; + +import java.util.HashMap; +import java.util.Map; + +import io.cloudevents.rw.CloudEventContextWriter; +import io.cloudevents.rw.CloudEventRWException; + +public class EventMeshCloudEventWriter implements CloudEventContextWriter { + private Map extensionMap = null; + + public EventMeshCloudEventWriter() { + extensionMap = new HashMap(); + } + + @Override + public CloudEventContextWriter withContextAttribute(String key, String value) + throws CloudEventRWException { + extensionMap.put(key, value); + return this; + } + + public Map getExtensionMap() { + return extensionMap; + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java index 5d5fc1af9f..df78227c35 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java @@ -48,6 +48,9 @@ import org.slf4j.LoggerFactory; import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.v03.CloudEventV03; +import io.cloudevents.core.v1.CloudEventV1; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -285,4 +288,20 @@ public static T cloneObject(T object) throws IOException, ClassNotFoundExcep ObjectInputStream inputStream = new ObjectInputStream(byIn); return (T) inputStream.readObject(); } + + public static Map getCloudEventExtensionMap(String protocolVersion, + CloudEvent cloudEvent) { + try { + EventMeshCloudEventWriter eventMeshCloudEventWriter = new EventMeshCloudEventWriter(); + if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) { + ((CloudEventV1) cloudEvent).readContext(eventMeshCloudEventWriter); + } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) { + ((CloudEventV03) cloudEvent).readContext(eventMeshCloudEventWriter); + } + return eventMeshCloudEventWriter.getExtensionMap(); + } catch (Throwable e) { + logger.warn("getCloudEventExtensionMap fail", e); + return null; + } + } } From cf06a4816b81d79162ce94a256764ad8683a45ba Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Thu, 23 Jun 2022 20:13:16 +0800 Subject: [PATCH 2/3] fix checkMainStyle problem --- .../tcp/client/EventMeshTcpMessageDispatcher.java | 4 ++-- .../tcp/client/session/send/SessionSender.java | 14 -------------- .../eventmesh/runtime/trace/AttributeKeys.java | 5 ----- 3 files changed, 2 insertions(+), 21 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java index 29594ab56e..0623d282cf 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java @@ -39,6 +39,8 @@ import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; +import java.util.concurrent.TimeUnit; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +48,6 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.opentelemetry.api.trace.Span; -import java.util.concurrent.TimeUnit; - public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler { private final Logger logger = LoggerFactory.getLogger(this.getClass()); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java index b3191fe326..b9e4e81758 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java @@ -26,18 +26,12 @@ import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; -import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; -import org.apache.eventmesh.runtime.boot.EventMeshServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; import org.apache.eventmesh.runtime.trace.TraceUtils; -import org.apache.eventmesh.runtime.util.EventMeshCloudEventWriter; import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.Utils; -import org.apache.eventmesh.trace.api.EventMeshSpan; -import org.apache.eventmesh.trace.api.EventMeshTraceContext; import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; -import org.apache.eventmesh.trace.api.propagation.EventMeshContextCarrier; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; @@ -51,16 +45,8 @@ import org.slf4j.LoggerFactory; import io.cloudevents.CloudEvent; -import io.cloudevents.SpecVersion; import io.cloudevents.core.builder.CloudEventBuilder; -import io.cloudevents.core.v03.CloudEventV03; -import io.cloudevents.core.v1.CloudEventV1; -import io.netty.channel.ChannelHandlerContext; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; public class SessionSender { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/AttributeKeys.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/AttributeKeys.java index 018e367522..ac151fda89 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/AttributeKeys.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/AttributeKeys.java @@ -20,8 +20,6 @@ import io.netty.util.AttributeKey; import io.opentelemetry.context.Context; -import org.apache.eventmesh.trace.api.EventMeshTraceContext; - /** * keys. */ @@ -41,9 +39,6 @@ public final class AttributeKeys { public static final AttributeKey CLIENT_PARENT_CONTEXT = AttributeKey.valueOf(AttributeKeys.class, "client-parent-context"); - public static final AttributeKey EVENTMESH_SERVER_CONTEXT = - AttributeKey.valueOf(AttributeKeys.class, "server-span"); - private AttributeKeys() { } } From db3bd5451d1f6052e899ae50dc528a3afc919d51 Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Fri, 24 Jun 2022 19:34:26 +0800 Subject: [PATCH 3/3] fix variable format --- .../runtime/boot/EventMeshServer.java | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java index 82c355af50..a36a48abcb 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java @@ -45,7 +45,7 @@ public class EventMeshServer { private EventMeshHTTPConfiguration eventMeshHttpConfiguration; - private EventMeshTCPConfiguration eventMeshTCPConfiguration; + private EventMeshTCPConfiguration eventMeshTcpConfiguration; private Acl acl; @@ -58,17 +58,17 @@ public class EventMeshServer { private ServiceState serviceState; public EventMeshServer(EventMeshHTTPConfiguration eventMeshHttpConfiguration, - EventMeshTCPConfiguration eventMeshTCPConfiguration, + EventMeshTCPConfiguration eventMeshTcpConfiguration, EventMeshGrpcConfiguration eventMeshGrpcConfiguration) { this.eventMeshHttpConfiguration = eventMeshHttpConfiguration; - this.eventMeshTCPConfiguration = eventMeshTCPConfiguration; + this.eventMeshTcpConfiguration = eventMeshTcpConfiguration; this.eventMeshGrpcConfiguration = eventMeshGrpcConfiguration; this.acl = new Acl(); this.registry = new Registry(); this.trace = new Trace(eventMeshHttpConfiguration.eventMeshServerTraceEnable); this.connectorResource = new ConnectorResource(); - ConfigurationContextUtil.putIfAbsent(ConfigurationContextUtil.TCP, eventMeshTCPConfiguration); + ConfigurationContextUtil.putIfAbsent(ConfigurationContextUtil.TCP, eventMeshTcpConfiguration); ConfigurationContextUtil.putIfAbsent(ConfigurationContextUtil.GRPC, eventMeshGrpcConfiguration); ConfigurationContextUtil.putIfAbsent(ConfigurationContextUtil.HTTP, eventMeshHttpConfiguration); } @@ -79,10 +79,10 @@ public void init() throws Exception { } // registry init - if (eventMeshTCPConfiguration != null - && eventMeshTCPConfiguration.eventMeshTcpServerEnabled - && eventMeshTCPConfiguration.eventMeshServerRegistryEnable) { - registry.init(eventMeshTCPConfiguration.eventMeshRegistryPluginType); + if (eventMeshTcpConfiguration != null + && eventMeshTcpConfiguration.eventMeshTcpServerEnabled + && eventMeshTcpConfiguration.eventMeshServerRegistryEnable) { + registry.init(eventMeshTcpConfiguration.eventMeshRegistryPluginType); } if (eventMeshGrpcConfiguration != null && eventMeshGrpcConfiguration.eventMeshServerRegistryEnable) { @@ -110,9 +110,9 @@ public void init() throws Exception { eventMeshHTTPServer.init(); } - if (eventMeshTCPConfiguration != null) { - eventMeshTCPServer = new EventMeshTCPServer(this, eventMeshTCPConfiguration, registry); - if (eventMeshTCPConfiguration.eventMeshTcpServerEnabled) { + if (eventMeshTcpConfiguration != null) { + eventMeshTCPServer = new EventMeshTCPServer(this, eventMeshTcpConfiguration, registry); + if (eventMeshTcpConfiguration.eventMeshTcpServerEnabled) { eventMeshTCPServer.init(); } } @@ -130,9 +130,9 @@ public void start() throws Exception { acl.start(); } // registry start - if (eventMeshTCPConfiguration != null - && eventMeshTCPConfiguration.eventMeshTcpServerEnabled - && eventMeshTCPConfiguration.eventMeshServerRegistryEnable) { + if (eventMeshTcpConfiguration != null + && eventMeshTcpConfiguration.eventMeshTcpServerEnabled + && eventMeshTcpConfiguration.eventMeshServerRegistryEnable) { registry.start(); } if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerRegistryEnable) { @@ -149,7 +149,7 @@ public void start() throws Exception { if (eventMeshHttpConfiguration != null) { eventMeshHTTPServer.start(); } - if (eventMeshTCPConfiguration != null && eventMeshTCPConfiguration.eventMeshTcpServerEnabled) { + if (eventMeshTcpConfiguration != null && eventMeshTcpConfiguration.eventMeshTcpServerEnabled) { eventMeshTCPServer.start(); } serviceState = ServiceState.RUNNING; @@ -160,7 +160,7 @@ public void shutdown() throws Exception { serviceState = ServiceState.STOPING; logger.info("server state:{}", serviceState); eventMeshHTTPServer.shutdown(); - if (eventMeshTCPConfiguration != null && eventMeshTCPConfiguration.eventMeshTcpServerEnabled) { + if (eventMeshTcpConfiguration != null && eventMeshTcpConfiguration.eventMeshTcpServerEnabled) { eventMeshTCPServer.shutdown(); }