diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/http/SendMessageRequestProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/http/SendMessageRequestProtocolResolver.java index b53245ffa1..215f2e38ce 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/http/SendMessageRequestProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/http/SendMessageRequestProtocolResolver.java @@ -29,6 +29,7 @@ import java.net.URI; import java.nio.charset.StandardCharsets; +import java.util.Map; import io.cloudevents.CloudEvent; import io.cloudevents.SpecVersion; @@ -64,60 +65,70 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan cloudEventBuilder = CloudEventBuilder.v1(); cloudEventBuilder = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo()) - .withSubject(sendMessageRequestBody.getTopic()) - .withType("eventmeshmessage") - .withSource(URI.create("/")) - .withData(content.getBytes(StandardCharsets.UTF_8)) - .withExtension(ProtocolKey.REQUEST_CODE, code) - .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) - .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) - .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) - .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) - .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) - .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) - .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) - .withExtension(ProtocolKey.VERSION, version.getVersion()) - .withExtension(ProtocolKey.LANGUAGE, language) - .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) - .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) - .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) - .withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo()) - .withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId()) - .withExtension(SendMessageRequestBody.PRODUCERGROUP, - sendMessageRequestBody.getProducerGroup()) - .withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl()); + .withSubject(sendMessageRequestBody.getTopic()) + .withType("eventmeshmessage") + .withSource(URI.create("/")) + .withData(content.getBytes(StandardCharsets.UTF_8)) + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo()) + .withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId()) + .withExtension(SendMessageRequestBody.PRODUCERGROUP, + sendMessageRequestBody.getProducerGroup()) + .withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl()); if (StringUtils.isNotEmpty(sendMessageRequestBody.getTag())) { cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageRequestBody.getTag()); } + if (sendMessageRequestBody.getExtFields() != null && sendMessageRequestBody.getExtFields().size() > 0) { + for (Map.Entry entry : sendMessageRequestBody.getExtFields().entrySet()) { + cloudEventBuilder = cloudEventBuilder.withExtension(entry.getKey(), entry.getValue()); + } + } event = cloudEventBuilder.build(); } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) { cloudEventBuilder = CloudEventBuilder.v03(); cloudEventBuilder = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo()) - .withSubject(sendMessageRequestBody.getTopic()) - .withType("eventmeshmessage") - .withSource(URI.create("/")) - .withData(content.getBytes(StandardCharsets.UTF_8)) - .withExtension(ProtocolKey.REQUEST_CODE, code) - .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) - .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) - .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) - .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) - .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) - .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) - .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) - .withExtension(ProtocolKey.VERSION, version.getVersion()) - .withExtension(ProtocolKey.LANGUAGE, language) - .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) - .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) - .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) - .withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo()) - .withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId()) - .withExtension(SendMessageRequestBody.PRODUCERGROUP, - sendMessageRequestBody.getProducerGroup()) - .withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl()); + .withSubject(sendMessageRequestBody.getTopic()) + .withType("eventmeshmessage") + .withSource(URI.create("/")) + .withData(content.getBytes(StandardCharsets.UTF_8)) + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo()) + .withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId()) + .withExtension(SendMessageRequestBody.PRODUCERGROUP, + sendMessageRequestBody.getProducerGroup()) + .withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl()); if (StringUtils.isNotEmpty(sendMessageRequestBody.getTag())) { cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageRequestBody.getTag()); } + if (sendMessageRequestBody.getExtFields() != null && sendMessageRequestBody.getExtFields().size() > 0) { + for (Map.Entry entry : sendMessageRequestBody.getExtFields().entrySet()) { + cloudEventBuilder = cloudEventBuilder.withExtension(entry.getKey(), entry.getValue()); + } + } event = cloudEventBuilder.build(); } return event; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index 0d1f3846cd..86d0964f08 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -28,14 +28,15 @@ import org.apache.eventmesh.common.protocol.http.header.Header; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.runtime.common.Pair; +import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.EventProcessor; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer; -import org.apache.eventmesh.runtime.trace.AttributeKeys; -import org.apache.eventmesh.runtime.trace.SpanKey; +import org.apache.eventmesh.runtime.trace.TraceUtils; import org.apache.eventmesh.runtime.util.RemotingHelper; +import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.ObjectUtils; @@ -91,13 +92,6 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.context.propagation.TextMapGetter; -import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import com.fasterxml.jackson.core.type.TypeReference; @@ -119,56 +113,40 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer { public Boolean useTrace = false; //Determine whether trace is enabled - public TextMapPropagator textMapPropagator; - - public Tracer tracer; + private EventMeshHTTPConfiguration eventMeshHttpConfiguration; public ThreadPoolExecutor asyncContextCompleteHandler = - ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext-"); + ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext-"); static { DiskAttribute.deleteOnExitTemporaryFile = false; } protected final Map> - processorTable = new HashMap<>(64); + processorTable = new HashMap<>(64); protected final Map> eventProcessorTable = new HashMap<>(64); - public AbstractHTTPServer(int port, boolean useTLS) { + public AbstractHTTPServer(int port, boolean useTLS, EventMeshHTTPConfiguration eventMeshHttpConfiguration) { this.port = port; this.useTLS = useTLS; + this.eventMeshHttpConfiguration = eventMeshHttpConfiguration; } public void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status); HttpHeaders responseHeaders = response.headers(); responseHeaders.add( - HttpHeaderNames.CONTENT_TYPE, String.format("text/plain; charset=%s", EventMeshConstants.DEFAULT_CHARSET) + HttpHeaderNames.CONTENT_TYPE, String.format("text/plain; charset=%s", EventMeshConstants.DEFAULT_CHARSET) ); responseHeaders.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); responseHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); - if (useTrace) { - Context context = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).get(); - Span span = context.get(SpanKey.SERVER_KEY); - try (Scope ignored = context.makeCurrent()) { - span.setStatus(StatusCode.ERROR); //set this span's status to ERROR - span.end(); // closing the scope does not end the span, this has to be done manually - } - } + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } public void sendResponse(ChannelHandlerContext ctx, DefaultFullHttpResponse response) { - if (useTrace) { - Context context = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).get(); - Span span = context.get(SpanKey.SERVER_KEY); - try (Scope ignored = context.makeCurrent()) { - span.end(); - } - } - ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> { if (!f.isSuccess()) { httpLogger.warn("send response to [{}] fail, will close this channel", @@ -185,9 +163,9 @@ public void start() throws Exception { ServerBootstrap b = new ServerBootstrap(); SSLContext sslContext = useTLS ? SSLContextFactory.getSslContext() : null; b.group(this.bossGroup, this.workerGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new HttpsServerInitializer(sslContext)) - .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); + .channel(NioServerSocketChannel.class) + .childHandler(new HttpsServerInitializer(sslContext)) + .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); try { httpServerLogger.info("HTTPServer[port={}] started......", this.port); ChannelFuture future = b.bind(this.port).sync(); @@ -238,8 +216,8 @@ private Map parseHttpHeader(HttpRequest fullReq) { Map headerParam = new HashMap<>(); for (String key : fullReq.headers().names()) { if (StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_TYPE.toString(), key) - || StringUtils.equalsIgnoreCase(HttpHeaderNames.ACCEPT_ENCODING.toString(), key) - || StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_LENGTH.toString(), key)) { + || StringUtils.equalsIgnoreCase(HttpHeaderNames.ACCEPT_ENCODING.toString(), key) + || StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_LENGTH.toString(), key)) { continue; } headerParam.put(key, fullReq.headers().get(key)); @@ -277,14 +255,17 @@ private HttpResponseStatus validateHttpRequest(HttpRequest httpRequest) { * @param httpRequest */ private void preProcessHttpRequestHeader(ChannelHandlerContext ctx, HttpRequest httpRequest) { + long startTime = System.currentTimeMillis(); HttpHeaders requestHeaders = httpRequest.headers(); requestHeaders.set(ProtocolKey.ClientInstanceKey.IP, - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION); if (StringUtils.isBlank(protocolVersion)) { requestHeaders.set(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion()); } + requestHeaders.set(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, startTime); + requestHeaders.set(EventMeshConstants.REQ_SEND_EVENTMESH_IP, eventMeshHttpConfiguration.eventMeshServerIp); } /** @@ -302,7 +283,7 @@ private Map parseHttpRequestBody(HttpRequest httpRequest) throws getDecoder.parameters().forEach((key, value) -> httpRequestBody.put(key, value.get(0))); } else if (HttpMethod.POST.equals(httpRequest.method())) { HttpPostRequestDecoder decoder = - new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest); + new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest); for (InterfaceHttpData parm : decoder.getBodyHttpDatas()) { if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { Attribute data = (Attribute) parm; @@ -319,36 +300,56 @@ class HTTPHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) { - Context context = null; - Span span = null; - if (useTrace) { - //if the client injected span context,this will extract the context from httpRequest or it will be null - context = textMapPropagator.extract(Context.current(), httpRequest, new TextMapGetter() { - @Override - public Iterable keys(HttpRequest carrier) { - return carrier.headers().names(); - } + // Context context = null; + // Span span = null; + // if (useTrace) { + // + // //if the client injected span context,this will extract the context from httpRequest or it will be null + // context = textMapPropagator.extract(Context.current(), httpRequest, new TextMapGetter() { + // @Override + // public Iterable keys(HttpRequest carrier) { + // return carrier.headers().names(); + // } + // + // @Override + // public String get(HttpRequest carrier, String key) { + // return carrier.headers().get(key); + // } + // }); + // + // span = tracer.spanBuilder("HTTP " + httpRequest.method()) + // .setParent(context) + // .setSpanKind(SpanKind.SERVER) + // .startSpan(); + // //attach the span to the server context + // context = context.with(SpanKey.SERVER_KEY, span); + // //put the context in channel + // ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).set(context); + // } - @Override - public String get(HttpRequest carrier, String key) { - return carrier.headers().get(key); - } - }); - span = tracer.spanBuilder("HTTP " + httpRequest.method()) - .setParent(context) - .setSpanKind(SpanKind.SERVER) - .startSpan(); - //attach the span to the server context - context = context.with(SpanKey.SERVER_KEY, span); - //put the context in channel - ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).set(context); - } + + + Span span = null; + // Context context = null; + // context = EventMeshServer.getTrace().extractFrom(); + // span = EventMeshServer.getTrace().createSpan("", context); + // //attach the span to the server context + // context = context.with(SpanKey.SERVER_KEY, span); + // //put the context in channel + // ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).set(context); try { preProcessHttpRequestHeader(ctx, httpRequest); + + final Map headerMap = parseHttpHeader(httpRequest); + + final HttpResponseStatus errorStatus = validateHttpRequest(httpRequest); if (errorStatus != null) { sendError(ctx, errorStatus); + + span = TraceUtils.prepareServerSpan(headerMap, EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(span, headerMap, errorStatus.reasonPhrase(), null); return; } metrics.getSummaryMetrics().recordHTTPRequest(); @@ -386,11 +387,11 @@ public String get(HttpRequest carrier, String key) { requestCommand.setHttpVersion(httpRequest.protocolVersion().protocolName()); requestCommand.setRequestCode(requestCode); - if (useTrace) { - span.setAttribute(SemanticAttributes.HTTP_METHOD, httpRequest.method().name()); - span.setAttribute(SemanticAttributes.HTTP_FLAVOR, httpRequest.protocolVersion().protocolName()); - span.setAttribute(String.valueOf(SemanticAttributes.HTTP_STATUS_CODE), requestCode); - } + // if (useTrace) { + // span.setAttribute(SemanticAttributes.HTTP_METHOD, httpRequest.method().name()); + // span.setAttribute(SemanticAttributes.HTTP_FLAVOR, httpRequest.protocolVersion().protocolName()); + // span.setAttribute(String.valueOf(SemanticAttributes.HTTP_STATUS_CODE), requestCode); + // } HttpCommand responseCommand = null; @@ -400,15 +401,21 @@ public String get(HttpRequest carrier, String key) { responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID); sendResponse(ctx, responseCommand.httpResponse()); + + span = TraceUtils.prepareServerSpan(headerMap, EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(span, headerMap, EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID.getErrMsg(), null); return; } try { - requestCommand.setHeader(Header.buildHeader(requestCode, parseHttpHeader(httpRequest))); + requestCommand.setHeader(Header.buildHeader(requestCode, headerMap)); requestCommand.setBody(Body.buildBody(requestCode, bodyMap)); } catch (Exception e) { responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_RUNTIME_ERR); sendResponse(ctx, responseCommand.httpResponse()); + + span = TraceUtils.prepareServerSpan(headerMap, EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(span, headerMap, EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg(), e); return; } @@ -424,13 +431,6 @@ public String get(HttpRequest carrier, String key) { } catch (Exception ex) { httpServerLogger.error("AbrstractHTTPServer.HTTPHandler.channelRead0 err", ex); - - if (useTrace) { - span.setAttribute(SemanticAttributes.EXCEPTION_MESSAGE, ex.getMessage()); - span.setStatus(StatusCode.ERROR, ex.getMessage()); //set this span's status to ERROR - span.recordException(ex); //record this exception - span.end(); // closing the scope does not end the span, this has to be done manually - } } } @@ -506,13 +506,19 @@ public void processEventMeshRequest(final ChannelHandlerContext ctx, HttpRequestProcessor processor = choosed.getObject1(); if (processor.rejectRequest()) { HttpCommand responseCommand = - request.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR); + request.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR); asyncContext.onComplete(responseCommand); if (asyncContext.isComplete()) { if (httpLogger.isDebugEnabled()) { httpLogger.debug("{}", asyncContext.getResponse()); } sendResponse(ctx, responseCommand.httpResponse()); + + Map traceMap = asyncContext.getRequest().getHeader().toMap(); + Span span = TraceUtils.prepareServerSpan(traceMap, EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, + false); + TraceUtils.finishSpanWithException(span, traceMap, + EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg(), null); } return; } @@ -523,13 +529,14 @@ public void processEventMeshRequest(final ChannelHandlerContext ctx, } metrics.getSummaryMetrics() - .recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime()); + .recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime()); if (httpLogger.isDebugEnabled()) { httpLogger.debug("{}", asyncContext.getResponse()); } sendResponse(ctx, asyncContext.getResponse().httpResponse()); + } catch (Exception e) { httpServerLogger.error("process error", e); } @@ -541,8 +548,12 @@ public void processEventMeshRequest(final ChannelHandlerContext ctx, metrics.getSummaryMetrics().recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime()); try { sendResponse(ctx, asyncContext.getResponse().httpResponse()); + + Map traceMap = asyncContext.getRequest().getHeader().toMap(); + Span span = TraceUtils.prepareServerSpan(traceMap, EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(span, traceMap, EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg(), re); } catch (Exception e) { - // ignore + httpServerLogger.error("processEventMeshRequest fail", re); } } } @@ -562,6 +573,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } } + + Map extractFromRequest(HttpRequest httpRequest) { + return null; + } } private HttpEventWrapper parseHttpRequest(HttpRequest httpRequest) throws IOException { @@ -635,8 +650,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { int c = connections.incrementAndGet(); if (c > 20000) { httpServerLogger - .warn("client|http|channelActive|remoteAddress={}|msg={}", remoteAddress, - "too many client(20000) connect this eventMesh server"); + .warn("client|http|channelActive|remoteAddress={}|msg={}", remoteAddress, + "too many client(20000) connect this eventMesh server"); ctx.close(); return; } @@ -657,7 +672,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (event.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); httpServerLogger.info("client|http|userEventTriggered|remoteAddress={}|msg={}", - remoteAddress, evt.getClass().getName()); + remoteAddress, evt.getClass().getName()); ctx.close(); } } @@ -685,11 +700,11 @@ protected void initChannel(SocketChannel channel) { pipeline.addFirst("ssl", new SslHandler(sslEngine)); } pipeline.addLast(new HttpRequestDecoder(), - new HttpResponseEncoder(), + new HttpResponseEncoder(), - new HttpConnectionHandler(), - new HttpObjectAggregator(Integer.MAX_VALUE), - new HTTPHandler()); + new HttpConnectionHandler(), + new HttpObjectAggregator(Integer.MAX_VALUE), + new HTTPHandler()); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java index f73211daa7..3d372f385d 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java @@ -86,7 +86,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer { public EventMeshHTTPServer(EventMeshServer eventMeshServer, EventMeshHTTPConfiguration eventMeshHttpConfiguration) { - super(eventMeshHttpConfiguration.httpServerPort, eventMeshHttpConfiguration.eventMeshServerUseTls); + super(eventMeshHttpConfiguration.httpServerPort, eventMeshHttpConfiguration.eventMeshServerUseTls, eventMeshHttpConfiguration); this.eventMeshServer = eventMeshServer; this.eventMeshHttpConfiguration = eventMeshHttpConfiguration; this.registry = eventMeshServer.getRegistry(); @@ -253,7 +253,7 @@ public void init() throws Exception { //get the trace-plugin if (StringUtils.isNotEmpty(eventMeshHttpConfiguration.eventMeshTracePluginType) && eventMeshHttpConfiguration.eventMeshServerTraceEnable) { - super.useTrace = true; + super.useTrace = eventMeshHttpConfiguration.eventMeshServerTraceEnable; } logger.info("--------------------------EventMeshHTTPServer inited"); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java index 347e6e8609..1a63e784be 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java @@ -37,11 +37,14 @@ import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext; import org.apache.eventmesh.runtime.core.protocol.http.push.HTTPMessageHandler; import org.apache.eventmesh.runtime.core.protocol.http.push.MessageHandler; +import org.apache.eventmesh.runtime.trace.TraceUtils; import org.apache.eventmesh.runtime.util.EventMeshUtil; +import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; import org.apache.commons.collections4.MapUtils; import java.util.List; +import java.util.Objects; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; @@ -50,6 +53,8 @@ import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Scope; public class EventMeshConsumer { @@ -89,55 +94,67 @@ public synchronized void init() throws Exception { keyValue.put("consumerGroup", consumerGroupConf.getConsumerGroup()); keyValue.put("eventMeshIDC", eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC); keyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(), - eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster)); + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster)); persistentMqConsumer.init(keyValue); EventListener cluserEventListener = new EventListener() { @Override public void consume(CloudEvent event, AsyncConsumeContext context) { - String topic = event.getSubject(); - String bizSeqNo = (String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS); - String uniqueId = (String) event.getExtension(Constants.RMB_UNIQ_ID); - - event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())) - .build(); - if (messageLogger.isDebugEnabled()) { - messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event); - } else { - messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId); - } - - ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), - topic, null); - EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context; + String protocolVersion = + Objects.requireNonNull(event.getExtension(Constants.PROTOCOL_VERSION)).toString(); + + Span span = TraceUtils.prepareServerSpan( + EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event), + EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false); + try (Scope scope = span.makeCurrent()) { + String topic = event.getSubject(); + String bizSeqNo = (String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS); + String uniqueId = (String) event.getExtension(Constants.RMB_UNIQ_ID); + + event = CloudEventBuilder.from(event) + .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())) + .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerIp) + .build(); + if (messageLogger.isDebugEnabled()) { + messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event); + } else { + messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId); + } - if (currentTopicConfig == null) { - logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic); - try { - sendMessageBack(event, uniqueId, bizSeqNo); - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); - return; - } catch (Exception ex) { - //ignore + ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), + topic, null); + EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context; + + if (currentTopicConfig == null) { + logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic); + try { + sendMessageBack(event, uniqueId, bizSeqNo); + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + return; + } catch (Exception ex) { + //ignore + } } - } - SubscriptionItem subscriptionItem = consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem(); - HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), - consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this, - topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), - consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); - - if (httpMessageHandler.handle(handleMsgContext)) { - eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); - } else { - try { - sendMessageBack(event, uniqueId, bizSeqNo); - } catch (Exception e) { - //ignore + SubscriptionItem subscriptionItem = consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem(); + HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), + consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this, + topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), + consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); + + if (httpMessageHandler.handle(handleMsgContext)) { + eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); + } else { + try { + sendMessageBack(event, uniqueId, bizSeqNo); + } catch (Exception e) { + //ignore + } + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); } - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + } finally { + TraceUtils.finishSpan(span, event); } } }; @@ -149,60 +166,81 @@ public void consume(CloudEvent event, AsyncConsumeContext context) { broadcastKeyValue.put("consumerGroup", consumerGroupConf.getConsumerGroup()); broadcastKeyValue.put("eventMeshIDC", eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC); broadcastKeyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(), - eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster)); + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster)); broadcastMqConsumer.init(broadcastKeyValue); EventListener broadcastEventListener = new EventListener() { @Override public void consume(CloudEvent event, AsyncConsumeContext context) { - event = CloudEventBuilder.from(event) - .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, - String.valueOf(System.currentTimeMillis())) - .build(); - - String topic = event.getSubject(); - String bizSeqNo = event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS).toString(); - String uniqueId = event.getExtension(Constants.RMB_UNIQ_ID).toString(); - - if (messageLogger.isDebugEnabled()) { - messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event); - } else { - messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, - uniqueId); - } - - ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject( - consumerGroupConf.getConsumerGroupTopicConf(), topic, null); - EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context; + String protocolVersion = + Objects.requireNonNull(event.getExtension(Constants.PROTOCOL_VERSION)).toString(); + + Span span = TraceUtils.prepareServerSpan( + EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event), + EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false); + try (Scope scope = span.makeCurrent()) { + + event = CloudEventBuilder.from(event) + .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, + String.valueOf(System.currentTimeMillis())) + .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerIp) + .build(); + + String topic = event.getSubject(); + String bizSeqNo = + event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS).toString(); + String uniqueId = event.getExtension(Constants.RMB_UNIQ_ID).toString(); + + if (messageLogger.isDebugEnabled()) { + messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event); + } else { + messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", + topic, bizSeqNo, + uniqueId); + } - if (currentTopicConfig == null) { - logger.error("no topicConfig found, consumerGroup:{} topic:{}", - consumerGroupConf.getConsumerGroup(), topic); - try { - sendMessageBack(event, uniqueId, bizSeqNo); - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); - return; - } catch (Exception ex) { - //ignore + ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject( + consumerGroupConf.getConsumerGroupTopicConf(), topic, null); + EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = + (EventMeshAsyncConsumeContext) context; + + if (currentTopicConfig == null) { + logger.error("no topicConfig found, consumerGroup:{} topic:{}", + consumerGroupConf.getConsumerGroup(), topic); + try { + sendMessageBack(event, uniqueId, bizSeqNo); + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + return; + } catch (Exception ex) { + //ignore + } } - } - SubscriptionItem subscriptionItem = consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem(); - HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), - consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this, - topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), - consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); - - if (httpMessageHandler.handle(handleMsgContext)) { - eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); - } else { - try { - sendMessageBack(event, uniqueId, bizSeqNo); - } catch (Exception e) { - //ignore + SubscriptionItem subscriptionItem = + consumerGroupConf.getConsumerGroupTopicConf().get(topic) + .getSubscriptionItem(); + HandleMsgContext handleMsgContext = + new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), + consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this, + topic, event, subscriptionItem, + eventMeshAsyncConsumeContext.getAbstractContext(), + consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, + currentTopicConfig); + + if (httpMessageHandler.handle(handleMsgContext)) { + eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); + } else { + try { + sendMessageBack(event, uniqueId, bizSeqNo); + } catch (Exception e) { + //ignore + } + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); } - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + } finally { + TraceUtils.finishSpan(span, event); } } }; @@ -263,16 +301,16 @@ public EventMeshHTTPServer getEventMeshHTTPServer() { public void sendMessageBack(final CloudEvent event, final String uniqueId, String bizSeqNo) throws Exception { EventMeshProducer sendMessageBack - = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup()); + = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup()); if (sendMessageBack == null) { logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}", - consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId); + consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId); return; } final SendMessageContext sendMessageBackContext = new SendMessageContext(bizSeqNo, event, sendMessageBack, - eventMeshHTTPServer); + eventMeshHTTPServer); sendMessageBack.send(sendMessageBackContext, new SendCallback() { @Override @@ -282,7 +320,7 @@ public void onSuccess(SendResult sendResult) { @Override public void onException(OnExceptionContext context) { logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}", - consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId); + consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId); } }); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java index 0392e21efe..ef21aa0fb7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java @@ -40,8 +40,10 @@ import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer; import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext; +import org.apache.eventmesh.runtime.trace.TraceUtils; import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.RemotingHelper; +import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; import org.apache.commons.lang3.StringUtils; @@ -55,6 +57,7 @@ import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import io.netty.channel.ChannelHandlerContext; +import io.opentelemetry.api.trace.Span; public class SendAsyncMessageProcessor implements HttpRequestProcessor { @@ -78,34 +81,43 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext HttpCommand responseEventMeshCommand; cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get( - Integer.valueOf(asyncContext.getRequest().getRequestCode())), - EventMeshConstants.PROTOCOL_HTTP, - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress()); + Integer.valueOf(asyncContext.getRequest().getRequestCode())), + EventMeshConstants.PROTOCOL_HTTP, + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress()); SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) asyncContext.getRequest().getHeader(); SendMessageResponseHeader sendMessageResponseHeader = - SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), - eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster, - IPUtils.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv, - eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC); + SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster, + IPUtils.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv, + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC); String protocolType = sendMessageRequestHeader.getProtocolType(); + String protocolVersin = sendMessageRequestHeader.getProtocolVersion(); ProtocolAdaptor httpCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType); CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest()); + Span span = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, true); + //validate event if (event == null - || StringUtils.isBlank(event.getId()) - || event.getSource() == null - || event.getSpecVersion() == null - || StringUtils.isBlank(event.getType()) - || StringUtils.isBlank(event.getSubject())) { + || StringUtils.isBlank(event.getId()) + || event.getSource() == null + || event.getSpecVersion() == null + || StringUtils.isBlank(event.getType()) + || StringUtils.isBlank(event.getSubject())) { responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( - sendMessageResponseHeader, - SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), - EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg())); + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg())); asyncContext.onComplete(responseEventMeshCommand); + + Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(excepSpan, EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg(), null); return; } @@ -115,14 +127,19 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext //validate event-extension if (StringUtils.isBlank(idc) - || StringUtils.isBlank(pid) - || !StringUtils.isNumeric(pid) - || StringUtils.isBlank(sys)) { + || StringUtils.isBlank(pid) + || !StringUtils.isNumeric(pid) + || StringUtils.isBlank(sys)) { responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( - sendMessageResponseHeader, - SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), - EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg())); + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg())); asyncContext.onComplete(responseEventMeshCommand); + + Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(excepSpan, EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg(), null); return; } @@ -133,15 +150,20 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext //validate body if (StringUtils.isBlank(bizNo) - || StringUtils.isBlank(uniqueId) - || StringUtils.isBlank(producerGroup) - || StringUtils.isBlank(topic) - || event.getData() == null) { + || StringUtils.isBlank(uniqueId) + || StringUtils.isBlank(producerGroup) + || StringUtils.isBlank(topic) + || event.getData() == null) { responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( - sendMessageResponseHeader, - SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), - EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg())); + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg())); asyncContext.onComplete(responseEventMeshCommand); + + Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(excepSpan, EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg(), null); return; } @@ -158,23 +180,33 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp); responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( - sendMessageResponseHeader, - SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage())); + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage())); asyncContext.onComplete(responseEventMeshCommand); aclLogger.warn("CLIENT HAS NO PERMISSION,SendAsyncMessageProcessor send failed", e); + + Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(excepSpan, EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshRetCode.EVENTMESH_ACL_ERR.getErrMsg(), null); return; } } // control flow rate limit if (!eventMeshHTTPServer.getMsgRateLimiter() - .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) { + .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) { responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( - sendMessageResponseHeader, - SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(), - EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg())); + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg())); eventMeshHTTPServer.metrics.getSummaryMetrics().recordHTTPDiscard(); asyncContext.onComplete(responseEventMeshCommand); + + Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(excepSpan, EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg(), null); return; } @@ -182,16 +214,22 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext if (!eventMeshProducer.getStarted().get()) { responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( - sendMessageResponseHeader, - SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(), - EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getErrMsg())); + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getErrMsg())); asyncContext.onComplete(responseEventMeshCommand); + + Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(excepSpan, EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getErrMsg(), null); + return; } String ttl = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS); if (StringUtils.isBlank(event.getExtension(SendMessageRequestBody.TTL).toString()) - && !StringUtils.isNumeric(event.getExtension(SendMessageRequestBody.TTL).toString())) { + && !StringUtils.isNumeric(event.getExtension(SendMessageRequestBody.TTL).toString())) { event = CloudEventBuilder.from(event).withExtension(SendMessageRequestBody.TTL, ttl).build(); } @@ -205,15 +243,21 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize)); asyncContext.onComplete(responseEventMeshCommand); + + Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(excepSpan, EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg(), null); return; } try { event = CloudEventBuilder.from(event) - .withExtension("msgtype", "persistent") - .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())) - .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis())) - .build(); + .withExtension("msgtype", "persistent") + .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, asyncContext.getRequest().reqTime) + .withExtension(EventMeshConstants.REQ_SEND_EVENTMESH_IP, + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerIp) + .build(); if (messageLogger.isDebugEnabled()) { messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic); @@ -221,15 +265,20 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext } catch (Exception e) { messageLogger.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e); responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( - sendMessageResponseHeader, - SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getRetCode(), - EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2))); + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2))); asyncContext.onComplete(responseEventMeshCommand); + + Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(excepSpan, EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg(), null); return; } final SendMessageContext sendMessageContext = new SendMessageContext(bizNo, event, eventMeshProducer, - eventMeshHTTPServer); + eventMeshHTTPServer); eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsg(); long startTime = System.currentTimeMillis(); @@ -242,8 +291,9 @@ public void onResponse(HttpCommand httpCommand) { httpLogger.debug("{}", httpCommand); } eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse()); + eventMeshHTTPServer.metrics.getSummaryMetrics().recordHTTPReqResTimeCost( - System.currentTimeMillis() - asyncContext.getRequest().getReqTime()); + System.currentTimeMillis() - asyncContext.getRequest().getReqTime()); } catch (Exception ex) { //ignore } @@ -253,53 +303,73 @@ public void onResponse(HttpCommand httpCommand) { try { event = CloudEventBuilder.from(sendMessageContext.getEvent()) - .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis())) - .build(); + .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis())) + .build(); sendMessageContext.setEvent(event); - eventMeshProducer.send(sendMessageContext, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse( + Span clientSpan = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_CLIENT_SPAN, false); + try { + eventMeshProducer.send(sendMessageContext, new SendCallback() { + + @Override + public void onSuccess(SendResult sendResult) { + HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse( sendMessageResponseHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(), - EventMeshRetCode.SUCCESS.getErrMsg() + sendResult.toString())); - asyncContext.onComplete(succ, handler); - long endTime = System.currentTimeMillis(); - eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgCost(endTime - startTime); - messageLogger.info("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", + EventMeshRetCode.SUCCESS.getErrMsg() + sendResult.toString())); + asyncContext.onComplete(succ, handler); + long endTime = System.currentTimeMillis(); + eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgCost(endTime - startTime); + messageLogger.info("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", endTime - startTime, topic, bizNo, uniqueId); - } - @Override - public void onException(OnExceptionContext context) { - HttpCommand err = asyncContext.getRequest().createHttpCommandResponse( + TraceUtils.finishSpan(span, sendMessageContext.getEvent()); + } + + @Override + public void onException(OnExceptionContext context) { + HttpCommand err = asyncContext.getRequest().createHttpCommandResponse( sendMessageResponseHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(), - EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() - + EventMeshUtil.stackTrace(context.getException(), 2))); - asyncContext.onComplete(err, handler); - - eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000)); - long endTime = System.currentTimeMillis(); - eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgFailed(); - eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgCost(endTime - startTime); - messageLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", + EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() + + EventMeshUtil.stackTrace(context.getException(), 2))); + asyncContext.onComplete(err, handler); + + eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000)); + long endTime = System.currentTimeMillis(); + eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgFailed(); + eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgCost(endTime - startTime); + messageLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", endTime - startTime, topic, bizNo, uniqueId, context.getException()); - } - }); + + TraceUtils.finishSpanWithException(span, + EventMeshUtil.getCloudEventExtensionMap(protocolVersin, sendMessageContext.getEvent()), + EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg(), context.getException()); + } + }); + } finally { + TraceUtils.finishSpan(clientSpan, event); + } + + } catch (Exception ex) { HttpCommand err = asyncContext.getRequest().createHttpCommandResponse( - sendMessageResponseHeader, - SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(), - EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() - + EventMeshUtil.stackTrace(ex, 2))); + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() + + EventMeshUtil.stackTrace(ex, 2))); asyncContext.onComplete(err); + Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); + TraceUtils.finishSpanWithException(excepSpan, EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event), + EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg(), null); + eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000)); long endTime = System.currentTimeMillis(); messageLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", - endTime - startTime, topic, bizNo, uniqueId, ex); + endTime - startTime, topic, bizNo, uniqueId, ex); eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgFailed(); eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgCost(endTime - startTime); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index cf4fa9e0a9..5abc9932d1 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -121,6 +121,8 @@ public void tryHTTPRequest() { CloudEvent event = CloudEventBuilder.from(handleMsgContext.getEvent()) .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis())) + .withExtension(EventMeshConstants.RSP_URL, currPushUrl) + .withExtension(EventMeshConstants.RSP_GROUP, handleMsgContext.getConsumerGroup()) .build(); handleMsgContext.setEvent(event); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPMessageHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPMessageHandler.java index 1bdfd7f625..96b388c939 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPMessageHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPMessageHandler.java @@ -17,13 +17,18 @@ package org.apache.eventmesh.runtime.core.protocol.http.push; +import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.ThreadPoolFactory; import org.apache.eventmesh.runtime.core.protocol.http.consumer.EventMeshConsumer; import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext; +import org.apache.eventmesh.runtime.trace.TraceUtils; +import org.apache.eventmesh.runtime.util.EventMeshUtil; +import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; import org.apache.commons.collections4.MapUtils; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -33,6 +38,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.opentelemetry.api.trace.Span; + import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -69,22 +76,33 @@ public HTTPMessageHandler(EventMeshConsumer eventMeshConsumer) { @Override public boolean handle(final HandleMsgContext handleMsgContext) { Set waitingRequests4Group = MapUtils.getObject(waitingRequests, - handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet()); + handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet()); if (waitingRequests4Group.size() > CONSUMER_GROUP_WAITING_REQUEST_THRESHOLD) { logger.warn("waitingRequests is too many, so reject, this message will be send back to MQ, consumerGroup:{}, threshold:{}", - handleMsgContext.getConsumerGroup(), CONSUMER_GROUP_WAITING_REQUEST_THRESHOLD); + handleMsgContext.getConsumerGroup(), CONSUMER_GROUP_WAITING_REQUEST_THRESHOLD); return false; } try { pushExecutor.submit(() -> { - AsyncHTTPPushRequest asyncPushRequest = new AsyncHTTPPushRequest(handleMsgContext, waitingRequests); - asyncPushRequest.tryHTTPRequest(); + String protocolVersion = Objects.requireNonNull(handleMsgContext.getEvent().getExtension( + Constants.PROTOCOL_VERSION)).toString(); + + Span span = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, handleMsgContext.getEvent()), + EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_CLIENT_SPAN, false); + + try { + AsyncHTTPPushRequest asyncPushRequest = new AsyncHTTPPushRequest(handleMsgContext, waitingRequests); + asyncPushRequest.tryHTTPRequest(); + } finally { + TraceUtils.finishSpan(span, handleMsgContext.getEvent()); + } + }); return true; } catch (RejectedExecutionException e) { logger.warn("pushMsgThreadPoolQueue is full, so reject, current task size {}", - handleMsgContext.getEventMeshHTTPServer().getPushMsgExecutor().getQueue().size(), e); + handleMsgContext.getEventMeshHTTPServer().getPushMsgExecutor().getQueue().size(), e); return false; } }