From d33876bbb613096b1e2ef34db4b35cd3ebcf291a Mon Sep 17 00:00:00 2001 From: ZePeng Chen <84842773+Roc-00@users.noreply.github.com> Date: Thu, 16 Dec 2021 10:50:24 +0800 Subject: [PATCH] [ISSUE #340]Add http trace http point (#527) * tracing in AbstractHTTPServer * add licence * the span exporter * design docs * fix the error on text * delete the useless dependence * remove the unused code * fix the different spanExporter * change the class name * fix gradle -build problem * design docs improve * fix the gradle.build error problem * fixed * unsure fix * fix the path name * fix check error * format code * add javadoc * checkstyle fix * unversioned files * put context into channel in advance --- build.gradle | 2 + docs/en/features/eventmesh-trace-design.md | 89 +++++++ .../common/config/CommonConfiguration.java | 146 ++++++++-- .../test/resources/configuration.properties | 4 +- eventmesh-runtime/build.gradle | 3 + eventmesh-runtime/conf/eventmesh.properties | 18 +- .../runtime/boot/AbstractHTTPServer.java | 252 +++++++++++------- .../runtime/boot/EventMeshHTTPServer.java | 5 + .../runtime/exporter/EventMeshExporter.java | 29 ++ .../runtime/exporter/LogExporter.java | 83 ++++++ .../runtime/exporter/ZipkinExporter.java | 46 ++++ .../runtime/trace/AttributeKeys.java | 44 +++ .../trace/OpenTelemetryTraceFactory.java | 109 ++++++++ .../eventmesh/runtime/trace/SpanKey.java | 52 ++++ .../known-dependencies.txt | 6 + 15 files changed, 768 insertions(+), 120 deletions(-) create mode 100644 docs/en/features/eventmesh-trace-design.md create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/exporter/EventMeshExporter.java create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/exporter/LogExporter.java create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/exporter/ZipkinExporter.java create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/AttributeKeys.java create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/OpenTelemetryTraceFactory.java create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/SpanKey.java diff --git a/build.gradle b/build.gradle index 6adb1c663f..16ac6931bf 100644 --- a/build.gradle +++ b/build.gradle @@ -437,6 +437,8 @@ subprojects { dependency 'io.opentelemetry:opentelemetry-exporter-prometheus:1.3.0-alpha' dependency 'io.prometheus:simpleclient:0.8.1' dependency 'io.prometheus:simpleclient_httpserver:0.8.1' + dependency 'io.opentelemetry:opentelemetry-exporter-zipkin:1.3.0' + dependency 'io.opentelemetry:opentelemetry-semconv:1.3.0-alpha' dependency "io.openmessaging:openmessaging-api:2.2.1-pubsub" diff --git a/docs/en/features/eventmesh-trace-design.md b/docs/en/features/eventmesh-trace-design.md new file mode 100644 index 0000000000..81e6c23b8a --- /dev/null +++ b/docs/en/features/eventmesh-trace-design.md @@ -0,0 +1,89 @@ +# eventmesh-HTTP-trace-design + +## Introduction + +[EventMesh(incubating)](https://github.com/apache/incubator-eventmesh) is a dynamic cloud-native eventing infrastructure. + +## An overview of OpenTelemetry + +OpenTelemetry is a collection of tools, APIs, and SDKs. You can use it to instrument, generate, collect, and export telemetry data (metrics, logs, and traces) for analysis in order to understand your software's performance and behavior. + +## Requirements + +- set tracer +- different exporter +- start and end span in server + +## Design Details + +* SpanProcessor: BatchSpanProcessor + +* Exporter: log(default), would be changed from properties + +```java +// Configure the batch spans processor. This span processor exports span in batches. +BatchSpanProcessor batchSpansProcessor = + BatchSpanProcessor.builder(exporter) + .setMaxExportBatchSize(512) // set the maximum batch size to use + .setMaxQueueSize(2048) // set the queue size. This must be >= the export batch size + .setExporterTimeout( + 30, TimeUnit.SECONDS) // set the max amount of time an export can run before getting + // interrupted + .setScheduleDelay(5, TimeUnit.SECONDS) // set time between two different exports + .build(); +OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder().addSpanProcessor(batchSpansProcessor).build()) + .build(); +``` + +1. When using the method 'init()' of the class "EventMeshHTTPServer", the class "AbstractHTTPServer” will get the tracer + +```java +super.openTelemetryTraceFactory = new OpenTelemetryTraceFactory(eventMeshHttpConfiguration); +super.tracer = openTelemetryTraceFactory.getTracer(this.getClass().toString()); +super.textMapPropagator = openTelemetryTraceFactory.getTextMapPropagator(); +``` + +2. then the trace in class "AbstractHTTPServer” will work. + +## Problems + +#### How to set different exporter in class 'OpenTelemetryTraceFactory'?(Solved) + +After I get the exporter type from properties, how to deal with it. + +The 'logExporter' only needs to new it. + +But the 'zipkinExporter' needs to new and use the "getZipkinExporter()" method. + +## Solutions +#### Solution of different exporter +Use reflection to get an exporter. + +First of all, different exporter must implement the interface 'EventMeshExporter'. + +Then we get the exporter name from the configuration and reflect to the class. +```java +//different spanExporter +String exporterName = configuration.eventMeshTraceExporterType; +//use reflection to get spanExporter +String className = String.format("org.apache.eventmesh.runtime.exporter.%sExporter",exporterName); +EventMeshExporter eventMeshExporter = (EventMeshExporter) Class.forName(className).newInstance(); +spanExporter = eventMeshExporter.getSpanExporter(configuration); +``` + +Additional, this will surround with try catch.If the specified exporter cannot be obtained successfully, the default exporter log will be used instead + +#### Improvement of different exporter + +SPI(To be completed) + + +## Appendix + +#### References + +https://github.com/open-telemetry/docs-cn/blob/main/QUICKSTART.md + +https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/netty diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java index 3e4ade1df4..33e9ce950e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java @@ -32,6 +32,13 @@ public class CommonConfiguration { public String eventMeshSecurityPluginType = "security"; public int eventMeshPrometheusPort = 19090; public String eventMeshRegistryPluginType = "namesrv"; + public String eventMeshTraceExporterType = "Log"; + public int eventMeshTraceExporterMaxExportSize = 512; + public int eventMeshTraceExporterMaxQueueSize = 2048; + public int eventMeshTraceExporterExportTimeout = 30; + public int eventMeshTraceExporterExportInterval = 5; + public String eventMeshTraceExportZipkinIp = "localhost"; + public int eventMeshTraceExportZipkinPort = 9411; public String namesrvAddr = ""; public Integer eventMeshRegisterIntervalInMills = 10 * 1000; @@ -48,54 +55,125 @@ public CommonConfiguration(ConfigurationWrapper configurationWrapper) { public void init() { if (configurationWrapper != null) { - String eventMeshEnvStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ENV); - Preconditions.checkState(StringUtils.isNotEmpty(eventMeshEnvStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ENV)); + String eventMeshEnvStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ENV); + Preconditions.checkState(StringUtils.isNotEmpty(eventMeshEnvStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_ENV)); eventMeshEnv = StringUtils.deleteWhitespace(eventMeshEnvStr); - String sysIdStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SYSID); - Preconditions.checkState(StringUtils.isNotEmpty(sysIdStr) && StringUtils.isNumeric(sysIdStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SYSID)); + String sysIdStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SYSID); + Preconditions.checkState(StringUtils.isNotEmpty(sysIdStr) && StringUtils.isNumeric(sysIdStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_SYSID)); sysID = StringUtils.deleteWhitespace(sysIdStr); - String eventMeshClusterStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_CLUSTER); - Preconditions.checkState(StringUtils.isNotEmpty(eventMeshClusterStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_CLUSTER)); + String eventMeshClusterStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_CLUSTER); + Preconditions.checkState(StringUtils.isNotEmpty(eventMeshClusterStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_CLUSTER)); eventMeshCluster = StringUtils.deleteWhitespace(eventMeshClusterStr); - String eventMeshNameStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_NAME); - Preconditions.checkState(StringUtils.isNotEmpty(eventMeshNameStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_NAME)); + String eventMeshNameStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_NAME); + Preconditions.checkState(StringUtils.isNotEmpty(eventMeshNameStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_NAME)); eventMeshName = StringUtils.deleteWhitespace(eventMeshNameStr); - String eventMeshIDCStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_IDC); - Preconditions.checkState(StringUtils.isNotEmpty(eventMeshIDCStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_IDC)); - eventMeshIDC = StringUtils.deleteWhitespace(eventMeshIDCStr); + String eventMeshIdcStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_IDC); + Preconditions.checkState(StringUtils.isNotEmpty(eventMeshIdcStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_IDC)); + eventMeshIDC = StringUtils.deleteWhitespace(eventMeshIdcStr); - String eventMeshPrometheusPortStr = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_METRICS_PROMETHEUS_PORT); + String eventMeshPrometheusPortStr = + configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_METRICS_PROMETHEUS_PORT); if (StringUtils.isNotEmpty(eventMeshPrometheusPortStr)) { - eventMeshPrometheusPort = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshPrometheusPortStr)); + eventMeshPrometheusPort = + Integer.valueOf(StringUtils.deleteWhitespace(eventMeshPrometheusPortStr)); } - eventMeshServerIp = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP); + eventMeshServerIp = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP); if (StringUtils.isBlank(eventMeshServerIp)) { eventMeshServerIp = IPUtils.getLocalAddress(); } - eventMeshConnectorPluginType = configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE); - Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE)); + eventMeshConnectorPluginType = + configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE); + Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), + String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE)); - String eventMeshServerAclEnableStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SECURITY_ENABLED); + String eventMeshServerAclEnableStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SECURITY_ENABLED); if (StringUtils.isNotBlank(eventMeshServerAclEnableStr)) { - eventMeshServerSecurityEnable = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerAclEnableStr)); + eventMeshServerSecurityEnable = + Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerAclEnableStr)); } - eventMeshSecurityPluginType = configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE); - Preconditions.checkState(StringUtils.isNotEmpty(eventMeshSecurityPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE)); + eventMeshSecurityPluginType = + configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE); + Preconditions.checkState(StringUtils.isNotEmpty(eventMeshSecurityPluginType), + String.format("%s error", ConfKeys.KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE)); - String eventMeshServerRegistryEnableStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_REGISTRY_ENABLED); + String eventMeshServerRegistryEnableStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_REGISTRY_ENABLED); if (StringUtils.isNotBlank(eventMeshServerRegistryEnableStr)) { - eventMeshServerRegistryEnable = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerRegistryEnableStr)); + eventMeshServerRegistryEnable = + Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerRegistryEnableStr)); } - eventMeshRegistryPluginType = configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_REGISTRY_PLUGIN_TYPE); - Preconditions.checkState(StringUtils.isNotEmpty(eventMeshRegistryPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_REGISTRY_PLUGIN_TYPE)); + eventMeshRegistryPluginType = + configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_REGISTRY_PLUGIN_TYPE); + Preconditions.checkState(StringUtils.isNotEmpty(eventMeshRegistryPluginType), + String.format("%s error", ConfKeys.KEYS_ENENTMESH_REGISTRY_PLUGIN_TYPE)); + + String eventMeshTraceExporterTypeStr = + configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_TRACE_EXPORTER_TYPE); + Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTraceExporterTypeStr), + String.format("%s error", ConfKeys.KEYS_ENENTMESH_TRACE_EXPORTER_TYPE)); + eventMeshTraceExporterType = + StringUtils.deleteWhitespace(eventMeshTraceExporterTypeStr); + + String eventMeshTraceExporterMaxExportSizeStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORTER_MAX_EXPORT_SIZE); + if (StringUtils.isNotEmpty(eventMeshTraceExporterMaxExportSizeStr)) { + eventMeshTraceExporterMaxExportSize = + Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTraceExporterMaxExportSizeStr)); + } + + String eventMeshTraceExporterMaxQueueSizeStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORTER_MAX_QUEUE_SIZE); + if (StringUtils.isNotEmpty(eventMeshTraceExporterMaxQueueSizeStr)) { + eventMeshTraceExporterMaxQueueSize = + Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTraceExporterMaxQueueSizeStr)); + } + + String eventMeshTraceExporterExportTimeoutStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORTER_EXPORT_TIMEOUT); + if (StringUtils.isNotEmpty(eventMeshTraceExporterExportTimeoutStr)) { + eventMeshTraceExporterExportTimeout = + Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTraceExporterExportTimeoutStr)); + } + + String eventMeshTraceExporterExportIntervalStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORTER_EXPORT_INTERVAL); + if (StringUtils.isNotEmpty(eventMeshTraceExporterExportIntervalStr)) { + eventMeshTraceExporterExportInterval = + Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTraceExporterExportIntervalStr)); + } + + String eventMeshTraceExportZipkinIpStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_IP); + Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTraceExportZipkinIpStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_IP)); + eventMeshTraceExportZipkinIp = StringUtils.deleteWhitespace(eventMeshTraceExportZipkinIpStr); + + String eventMeshTraceExportZipkinPortStr = + configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_PORT); + if (StringUtils.isNotEmpty(eventMeshTraceExportZipkinPortStr)) { + eventMeshTraceExportZipkinPort = + Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTraceExportZipkinPortStr)); + } } } @@ -112,9 +190,11 @@ static class ConfKeys { public static String KEYS_EVENTMESH_SERVER_HOST_IP = "eventMesh.server.hostIp"; - public static String KEYS_EVENTMESH_SERVER_REGISTER_INTERVAL = "eventMesh.server.registry.registerIntervalInMills"; + public static String KEYS_EVENTMESH_SERVER_REGISTER_INTERVAL = + "eventMesh.server.registry.registerIntervalInMills"; - public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills"; + public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = + "eventMesh.server.registry.fetchRegistryAddrIntervalInMills"; public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type"; @@ -127,5 +207,19 @@ static class ConfKeys { public static String KEYS_EVENTMESH_REGISTRY_ENABLED = "eventMesh.server.registry.enabled"; public static String KEYS_ENENTMESH_REGISTRY_PLUGIN_TYPE = "eventMesh.registry.plugin.type"; + + public static String KEYS_ENENTMESH_TRACE_EXPORTER_TYPE = "eventmesh.trace.exporter.type"; + + public static String KEYS_EVENTMESH_TRACE_EXPORTER_MAX_EXPORT_SIZE = "eventmesh.trace.exporter.max.export.size"; + + public static String KEYS_EVENTMESH_TRACE_EXPORTER_MAX_QUEUE_SIZE = "eventmesh.trace.exporter.max.queue.size"; + + public static String KEYS_EVENTMESH_TRACE_EXPORTER_EXPORT_TIMEOUT = "eventmesh.trace.exporter.export.timeout"; + + public static String KEYS_EVENTMESH_TRACE_EXPORTER_EXPORT_INTERVAL = "eventmesh.trace.exporter.export.interval"; + + public static String KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_IP = "eventmesh.trace.export.zipkin.ip"; + + public static String KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_PORT = "eventmesh.trace.export.zipkin.port"; } } \ No newline at end of file diff --git a/eventmesh-common/src/test/resources/configuration.properties b/eventmesh-common/src/test/resources/configuration.properties index b8bf913d37..a5e0828ca3 100644 --- a/eventmesh-common/src/test/resources/configuration.properties +++ b/eventmesh-common/src/test/resources/configuration.properties @@ -23,4 +23,6 @@ eventMesh.server.name=value5 eventMesh.server.hostIp=value6 eventMesh.connector.plugin.type=rocketmq eventMesh.security.plugin.type=acl -eventMesh.registry.plugin.type=namesrv \ No newline at end of file +eventMesh.registry.plugin.type=namesrv +eventmesh.trace.export.zipkin.ip=localhost +eventmesh.trace.exporter.type=Zipkin \ No newline at end of file diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index fdca886ccb..6c34c51405 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -23,6 +23,9 @@ dependencies { implementation 'io.prometheus:simpleclient' implementation 'io.prometheus:simpleclient_httpserver' implementation 'io.cloudevents:cloudevents-core' + implementation 'io.opentelemetry:opentelemetry-exporter-zipkin' + implementation 'io.opentelemetry:opentelemetry-semconv' + implementation "org.apache.httpcomponents:httpclient" implementation 'io.netty:netty-all' diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 30cdf4dccf..755eeb3fc2 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -72,4 +72,20 @@ eventMesh.server.registry.enabled=false eventMesh.registry.plugin.type=namesrv #prometheusPort -eventMesh.metrics.prometheus.port=19090 \ No newline at end of file +eventMesh.metrics.prometheus.port=19090 + +#trace exporter +eventmesh.trace.exporter.type=Zipkin + +#set the maximum batch size to use +eventmesh.trace.exporter.max.export.size=512 +#set the queue size. This must be >= the export batch size +eventmesh.trace.exporter.max.queue.size=2048 +#set the max amount of time an export can run before getting(TimeUnit=SECONDS) +eventmesh.trace.exporter.export.timeout=30 +#set time between two different exports(TimeUnit=SECONDS) +eventmesh.trace.exporter.export.interval=5 + +#zipkin +eventmesh.trace.export.zipkin.ip=localhost +eventmesh.trace.export.zipkin.port=9411 \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index e64882b7d7..097ed1416c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -30,6 +30,9 @@ import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer; +import org.apache.eventmesh.runtime.trace.AttributeKeys; +import org.apache.eventmesh.runtime.trace.OpenTelemetryTraceFactory; +import org.apache.eventmesh.runtime.trace.SpanKey; import org.apache.eventmesh.runtime.util.RemotingHelper; import org.apache.commons.collections4.MapUtils; @@ -84,6 +87,15 @@ import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; public abstract class AbstractHTTPServer extends AbstractRemotingServer { @@ -99,6 +111,14 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer { private boolean useTLS; + private Boolean useTrace = true; //Determine whether trace is enabled + + public TextMapPropagator textMapPropagator; + + public OpenTelemetryTraceFactory openTelemetryTraceFactory; + + public Tracer tracer; + public ThreadPoolExecutor asyncContextCompleteHandler = ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext-"); @@ -122,12 +142,26 @@ public void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { ); responseHeaders.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); responseHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); - // todo server span end with error, record status, we should get channel here to get span in channel's context in async call.. + if (useTrace) { + Context context = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).get(); + Span span = context.get(SpanKey.SERVER_KEY); + try (Scope ignored = context.makeCurrent()) { + span.setStatus(StatusCode.ERROR); //set this span's status to ERROR + span.end(); // closing the scope does not end the span, this has to be done manually + } + } ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } public void sendResponse(ChannelHandlerContext ctx, DefaultFullHttpResponse response) { - // todo end server span, we should get channel here to get span in channel's context in async call. + if (useTrace) { + Context context = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).get(); + Span span = context.get(SpanKey.SERVER_KEY); + try (Scope ignored = context.makeCurrent()) { + span.end(); + } + } + ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { @@ -161,6 +195,7 @@ public void start() throws Exception { } catch (Exception e1) { httpServerLogger.error("HTTPServer shutdown Err!", e); } + return; } }; @@ -188,14 +223,119 @@ public void registerProcessor(Integer requestCode, HttpRequestProcessor processo this.processorTable.put(requestCode.toString(), pair); } + private Map parseHttpHeader(HttpRequest fullReq) { + Map headerParam = new HashMap<>(); + for (String key : fullReq.headers().names()) { + if (StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_TYPE.toString(), key) + || StringUtils.equalsIgnoreCase(HttpHeaderNames.ACCEPT_ENCODING.toString(), key) + || StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_LENGTH.toString(), key)) { + continue; + } + headerParam.put(key, fullReq.headers().get(key)); + } + return headerParam; + } + + /** + * Validate request, return error status. + * + * @param httpRequest + * @return if request is validated return null else return error status + */ + private HttpResponseStatus validateHttpRequest(HttpRequest httpRequest) { + if (!started.get()) { + return HttpResponseStatus.SERVICE_UNAVAILABLE; + } + if (!httpRequest.decoderResult().isSuccess()) { + return HttpResponseStatus.BAD_REQUEST; + } + if (!HttpMethod.GET.equals(httpRequest.method()) && !HttpMethod.POST.equals(httpRequest.method())) { + return HttpResponseStatus.METHOD_NOT_ALLOWED; + } + final String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION); + if (!ProtocolVersion.contains(protocolVersion)) { + return HttpResponseStatus.BAD_REQUEST; + } + return null; + } + + /** + * Inject ip and protocol version, if the protocol version is empty, set default to {@link ProtocolVersion#V1}. + * + * @param ctx + * @param httpRequest + */ + private void preProcessHttpRequestHeader(ChannelHandlerContext ctx, HttpRequest httpRequest) { + HttpHeaders requestHeaders = httpRequest.headers(); + requestHeaders.set(ProtocolKey.ClientInstanceKey.IP, + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION); + if (StringUtils.isBlank(protocolVersion)) { + requestHeaders.set(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion()); + } + } + + /** + * Parse request body to map + * + * @param httpRequest + * @return + */ + private Map parseHttpRequestBody(HttpRequest httpRequest) throws IOException { + final long bodyDecodeStart = System.currentTimeMillis(); + Map httpRequestBody = new HashMap<>(); + + if (HttpMethod.GET.equals(httpRequest.method())) { + QueryStringDecoder getDecoder = new QueryStringDecoder(httpRequest.uri()); + getDecoder.parameters().forEach((key, value) -> httpRequestBody.put(key, value.get(0))); + } else if (HttpMethod.POST.equals(httpRequest.method())) { + HttpPostRequestDecoder decoder = + new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest); + for (InterfaceHttpData parm : decoder.getBodyHttpDatas()) { + if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + Attribute data = (Attribute) parm; + httpRequestBody.put(data.getName(), data.getValue()); + } + } + decoder.destroy(); + } + metrics.summaryMetrics.recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart); + return httpRequestBody; + } + class HTTPHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) { - // todo start server span, we should get channel here to put span in channel's context in async call. + Context context = null; + Span span = null; + if (useTrace) { + //if the client injected span context,this will extract the context from httpRequest or it will be null + context = textMapPropagator.extract(Context.current(), httpRequest, new TextMapGetter() { + @Override + public Iterable keys(HttpRequest carrier) { + return carrier.headers().names(); + } + + @Override + public String get(HttpRequest carrier, String key) { + return carrier.headers().get(key); + } + }); + span = tracer.spanBuilder("HTTP " + httpRequest.method()) + .setParent(context) + .setSpanKind(SpanKind.SERVER) + .startSpan(); + //attach the span to the server context + context = context.with(SpanKey.SERVER_KEY, span); + //put the context in channel + ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).set(context); + } + try { - preProcessHTTPRequestHeader(ctx, httpRequest); - final HttpResponseStatus errorStatus = validateHTTPRequest(httpRequest); + preProcessHttpRequestHeader(ctx, httpRequest); + final HttpResponseStatus errorStatus = validateHttpRequest(httpRequest); if (errorStatus != null) { sendError(ctx, errorStatus); return; @@ -203,11 +343,9 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) metrics.summaryMetrics.recordHTTPRequest(); final HttpCommand requestCommand = new HttpCommand(); - // todo record command opaque in span. final Map bodyMap = parseHttpRequestBody(httpRequest); - // todo: split get and post, use different submethod to process String requestCode = (httpRequest.method() == HttpMethod.POST) ? httpRequest.headers().get(ProtocolKey.REQUEST_CODE) @@ -216,7 +354,12 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) requestCommand.setHttpMethod(httpRequest.method().name()); requestCommand.setHttpVersion(httpRequest.protocolVersion().protocolName()); requestCommand.setRequestCode(requestCode); - // todo record command method, version and requestCode in span. + + if (useTrace) { + span.setAttribute(SemanticAttributes.HTTP_METHOD, httpRequest.method().name()); + span.setAttribute(SemanticAttributes.HTTP_FLAVOR, httpRequest.protocolVersion().protocolName()); + span.setAttribute(String.valueOf(SemanticAttributes.HTTP_STATUS_CODE), requestCode); + } HttpCommand responseCommand = null; @@ -230,7 +373,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) } try { - requestCommand.setHeader(Header.buildHeader(requestCode, parseHTTPHeader(httpRequest))); + requestCommand.setHeader(Header.buildHeader(requestCode, parseHttpHeader(httpRequest))); requestCommand.setBody(Body.buildBody(requestCode, bodyMap)); } catch (Exception e) { responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_RUNTIME_ERR); @@ -246,8 +389,14 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) new AsyncContext<>(requestCommand, responseCommand, asyncContextCompleteHandler); processEventMeshRequest(ctx, asyncContext); } catch (Exception ex) { - httpServerLogger.error("AbstractHTTPServer.HTTPHandler.channelRead0 err", ex); - // todo span end with exception. + httpServerLogger.error("AbrstractHTTPServer.HTTPHandler.channelRead0 err", ex); + + if (useTrace) { + span.setAttribute(SemanticAttributes.EXCEPTION_MESSAGE, ex.getMessage()); + span.setStatus(StatusCode.ERROR, ex.getMessage()); //set this span's status to ERROR + span.recordException(ex); //record this exception + span.end(); // closing the scope does not end the span, this has to be done manually + } } } @@ -395,85 +544,4 @@ protected void initChannel(SocketChannel channel) { } } - private Map parseHTTPHeader(HttpRequest fullReq) { - Map headerParam = new HashMap<>(); - for (String key : fullReq.headers().names()) { - if (StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_TYPE.toString(), key) - || StringUtils.equalsIgnoreCase(HttpHeaderNames.ACCEPT_ENCODING.toString(), key) - || StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_LENGTH.toString(), key)) { - continue; - } - headerParam.put(key, fullReq.headers().get(key)); - } - return headerParam; - } - - /** - * Validate request, return error status. - * - * @param httpRequest - * @return if request is validated return null else return error status - */ - private HttpResponseStatus validateHTTPRequest(HttpRequest httpRequest) { - if (!started.get()) { - return HttpResponseStatus.SERVICE_UNAVAILABLE; - } - if (!httpRequest.decoderResult().isSuccess()) { - return HttpResponseStatus.BAD_REQUEST; - } - if (!HttpMethod.GET.equals(httpRequest.method()) && !HttpMethod.POST.equals(httpRequest.method())) { - return HttpResponseStatus.METHOD_NOT_ALLOWED; - } - final String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION); - if (!ProtocolVersion.contains(protocolVersion)) { - return HttpResponseStatus.BAD_REQUEST; - } - return null; - } - - /** - * Inject ip and protocol version, if the protocol version is empty, set default to {@link ProtocolVersion#V1}. - * - * @param ctx - * @param httpRequest - */ - private void preProcessHTTPRequestHeader(ChannelHandlerContext ctx, HttpRequest httpRequest) { - HttpHeaders requestHeaders = httpRequest.headers(); - requestHeaders.set(ProtocolKey.ClientInstanceKey.IP, - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - - String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION); - if (StringUtils.isBlank(protocolVersion)) { - requestHeaders.set(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion()); - } - } - - /** - * Parse request body to map - * - * @param httpRequest - * @return - */ - private Map parseHttpRequestBody(HttpRequest httpRequest) throws IOException { - final long bodyDecodeStart = System.currentTimeMillis(); - Map httpRequestBody = new HashMap<>(); - - if (HttpMethod.GET.equals(httpRequest.method())) { - QueryStringDecoder getDecoder = new QueryStringDecoder(httpRequest.uri()); - getDecoder.parameters().forEach((key, value) -> httpRequestBody.put(key, value.get(0))); - } else if (HttpMethod.POST.equals(httpRequest.method())) { - HttpPostRequestDecoder decoder = - new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest); - for (InterfaceHttpData parm : decoder.getBodyHttpDatas()) { - if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { - Attribute data = (Attribute) parm; - httpRequestBody.put(data.getName(), data.getValue()); - } - } - decoder.destroy(); - } - metrics.summaryMetrics.recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart); - return httpRequestBody; - } - } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java index ce3e2cc10d..8e223f0ee3 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java @@ -37,6 +37,7 @@ import org.apache.eventmesh.runtime.core.protocol.http.push.AbstractHTTPPushRequest; import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer; import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer; +import org.apache.eventmesh.runtime.trace.OpenTelemetryTraceFactory; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -203,6 +204,10 @@ public void init() throws Exception { registerHTTPRequestProcessor(); + super.openTelemetryTraceFactory = new OpenTelemetryTraceFactory(eventMeshHttpConfiguration); + super.tracer = openTelemetryTraceFactory.getTracer(this.getClass().toString()); + super.textMapPropagator = openTelemetryTraceFactory.getTextMapPropagator(); + logger.info("--------------------------EventMeshHTTPServer inited"); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/exporter/EventMeshExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/exporter/EventMeshExporter.java new file mode 100644 index 0000000000..abc9cf9fd6 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/exporter/EventMeshExporter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.exporter; + +import org.apache.eventmesh.common.config.CommonConfiguration; + +import io.opentelemetry.sdk.trace.export.SpanExporter; + +/** + * different exporters should implement this interface. + */ +public interface EventMeshExporter { + public SpanExporter getSpanExporter(CommonConfiguration configuration); +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/exporter/LogExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/exporter/LogExporter.java new file mode 100644 index 0000000000..b4b11a486d --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/exporter/LogExporter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.exporter; + +import java.util.Collection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; + +/** + * Because the class 'LoggingSpanExporter' in openTelemetry exported garbled code in eventMesh's startUp, + * I override the 'LoggingSpanExporter'. + */ +public class LogExporter implements SpanExporter { + private static final Logger logger = LoggerFactory.getLogger(LogExporter.class); + + @Override + public CompletableResultCode export(Collection spans) { + // We always have 32 + 16 + name + several whitespace, 60 seems like an OK initial guess. + StringBuilder sb = new StringBuilder(60); + for (SpanData span : spans) { + sb.setLength(0); + InstrumentationLibraryInfo instrumentationLibraryInfo = span.getInstrumentationLibraryInfo(); + sb.append("'") + .append(span.getName()) + .append("' : ") + .append(span.getTraceId()) + .append(" ") + .append(span.getSpanId()) + .append(" ") + .append(span.getKind()) + .append(" [tracer: ") + .append(instrumentationLibraryInfo.getName()) + .append(":") + .append( + instrumentationLibraryInfo.getVersion() == null + ? "" + : instrumentationLibraryInfo.getVersion()) + .append("] ") + .append(span.getAttributes()); + logger.info(sb.toString()); + } + return CompletableResultCode.ofSuccess(); + } + + /** + * Flushes the data. + * (i guess it is not necessary for slf4j's log) + * + * @return the result of the operation + */ + @Override + public CompletableResultCode flush() { + CompletableResultCode resultCode = new CompletableResultCode(); + return resultCode.succeed(); + } + + @Override + public CompletableResultCode shutdown() { + return flush(); + } + +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/exporter/ZipkinExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/exporter/ZipkinExporter.java new file mode 100644 index 0000000000..c5b0284be0 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/exporter/ZipkinExporter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.exporter; + +import org.apache.eventmesh.common.config.CommonConfiguration; + +import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter; + +/** + * an exporter to export traced data to zipkin. + */ +public class ZipkinExporter implements EventMeshExporter { + private String ip = "localhost"; + + private int port = 9411; + + // Zipkin API Endpoints for uploading spans + private static final String ENDPOINT_V2_SPANS = "/api/v2/spans"; + + private ZipkinSpanExporter zipkinExporter; + + @Override + public ZipkinSpanExporter getSpanExporter(CommonConfiguration configuration) { + ip = configuration.eventMeshServerIp; + port = configuration.eventMeshTraceExportZipkinPort; + String httpUrl = String.format("http://%s:%s", ip, port); + zipkinExporter = + ZipkinSpanExporter.builder().setEndpoint(httpUrl + ENDPOINT_V2_SPANS).build(); + return zipkinExporter; + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/AttributeKeys.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/AttributeKeys.java new file mode 100644 index 0000000000..10214a8aa1 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/AttributeKeys.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.trace; + +import io.netty.util.AttributeKey; +import io.opentelemetry.context.Context; + +/** + * keys. + */ +public final class AttributeKeys { + public static final AttributeKey WRITE_CONTEXT = + AttributeKey.valueOf(AttributeKeys.class, "passed-context"); + + // this is the context that has the server span + // + // note: this attribute key is also used by ratpack instrumentation + public static final AttributeKey SERVER_CONTEXT = + AttributeKey.valueOf(AttributeKeys.class, "server-span"); + + public static final AttributeKey CLIENT_CONTEXT = + AttributeKey.valueOf(AttributeKeys.class, "client-context"); + + public static final AttributeKey CLIENT_PARENT_CONTEXT = + AttributeKey.valueOf(AttributeKeys.class, "client-parent-context"); + + private AttributeKeys() { + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/OpenTelemetryTraceFactory.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/OpenTelemetryTraceFactory.java new file mode 100644 index 0000000000..ae4ce41d91 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/OpenTelemetryTraceFactory.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.trace; + +import static io.opentelemetry.api.common.AttributeKey.stringKey; + +import org.apache.eventmesh.common.config.CommonConfiguration; +import org.apache.eventmesh.runtime.exporter.EventMeshExporter; +import org.apache.eventmesh.runtime.exporter.LogExporter; + +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; + +/** + * create tracer. + */ +public class OpenTelemetryTraceFactory { + private static final Logger logger = LoggerFactory.getLogger(OpenTelemetryTraceFactory.class); + // Name of the service(using the instrumentationName) + private final String serviceName = "eventmesh_trace"; + private OpenTelemetry openTelemetry; + private SpanExporter spanExporter; + private SpanExporter defaultExporter = new LogExporter(); + private SpanProcessor spanProcessor; + + public OpenTelemetryTraceFactory(CommonConfiguration configuration) { + try { + //different spanExporter + String exporterName = configuration.eventMeshTraceExporterType; + //use reflection to get spanExporter + String className = String.format("org.apache.eventmesh.runtime.exporter.%sExporter", exporterName); + EventMeshExporter eventMeshExporter = (EventMeshExporter) Class.forName(className).newInstance(); + spanExporter = eventMeshExporter.getSpanExporter(configuration); + } catch (Exception ex) { + logger.error("fail to set tracer's exporter,due to {}", ex.getMessage()); + //fail to set the exporter in configuration, changing to use the default Exporter + spanExporter = defaultExporter; + logger.info("change to use the default exporter {}", defaultExporter.getClass()); + } + + // Configure the batch spans processor. This span processor exports span in batches. + spanProcessor = BatchSpanProcessor.builder(spanExporter) + // set the maximum batch size to use + .setMaxExportBatchSize(configuration.eventMeshTraceExporterMaxExportSize) + // set the queue size. This must be >= the export batch size + .setMaxQueueSize(configuration.eventMeshTraceExporterMaxQueueSize) + // set the max amount of time an export can run before getting + .setExporterTimeout(configuration.eventMeshTraceExporterExportTimeout, TimeUnit.SECONDS) + // set time between two different exports + .setScheduleDelay(configuration.eventMeshTraceExporterExportInterval, TimeUnit.SECONDS) + .build(); + + //set the trace service's name + Resource serviceNameResource = + Resource.create(Attributes.of(stringKey("service.name"), serviceName)); + + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(spanProcessor) + .setResource(Resource.getDefault().merge(serviceNameResource)) + .build(); + + openTelemetry = OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .setTracerProvider(sdkTracerProvider) + .build(); + + Runtime.getRuntime().addShutdownHook(new Thread(sdkTracerProvider::close)); + } + + //Gets or creates a named tracer instance + public Tracer getTracer(String instrumentationName) { + return openTelemetry.getTracer(instrumentationName); + } + + //to inject or extract span context + public TextMapPropagator getTextMapPropagator() { + return openTelemetry.getPropagators().getTextMapPropagator(); + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/SpanKey.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/SpanKey.java new file mode 100644 index 0000000000..3fb285d14d --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/trace/SpanKey.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.trace; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.ContextKey; + +/** + * Makes span keys for specific instrumentation accessible to enrich and suppress spans. + */ +public final class SpanKey { + // server span key + public static final ContextKey SERVER_KEY = + ContextKey.named("opentelemetry-traces-span-key-server"); + + // client span keys + public static final ContextKey HTTP_CLIENT_KEY = + ContextKey.named("opentelemetry-traces-span-key-http"); + public static final ContextKey RPC_CLIENT_KEY = + ContextKey.named("opentelemetry-traces-span-key-rpc"); + public static final ContextKey DB_CLIENT_KEY = + ContextKey.named("opentelemetry-traces-span-key-db"); + + // this is used instead of above, depending on the configuration value for + // otel.instrumentation.experimental.outgoing-span-suppression-by-type + public static final ContextKey CLIENT_KEY = + ContextKey.named("opentelemetry-traces-span-key-client"); + + // producer & consumer (messaging) span keys + public static final ContextKey PRODUCER_KEY = + ContextKey.named("opentelemetry-traces-span-key-producer"); + public static final ContextKey CONSUMER_RECEIVE_KEY = + ContextKey.named("opentelemetry-traces-span-key-consumer-receive"); + public static final ContextKey CONSUMER_PROCESS_KEY = + ContextKey.named("opentelemetry-traces-span-key-consumer-process"); +} + diff --git a/tools/third-party-dependencies/known-dependencies.txt b/tools/third-party-dependencies/known-dependencies.txt index c504e5a61b..1e56dd31fd 100644 --- a/tools/third-party-dependencies/known-dependencies.txt +++ b/tools/third-party-dependencies/known-dependencies.txt @@ -43,11 +43,14 @@ metrics-healthchecks-4.1.0.jar metrics-json-4.1.0.jar netty-all-4.1.49.Final.jar netty-tcnative-boringssl-static-1.1.33.Fork26.jar +okhttp-3.14.9.jar +okio-1.17.2.jar openmessaging-api-2.2.1-pubsub.jar opentelemetry-api-1.3.0.jar opentelemetry-api-metrics-1.3.0-alpha.jar opentelemetry-context-1.3.0.jar opentelemetry-exporter-prometheus-1.3.0-alpha.jar +opentelemetry-exporter-zipkin-1.3.0.jar opentelemetry-sdk-1.3.0.jar opentelemetry-sdk-common-1.3.0.jar opentelemetry-sdk-metrics-1.3.0-alpha.jar @@ -72,3 +75,6 @@ slf4j-api-1.7.30.jar snakeyaml-1.19.jar system-rules-1.16.1.jar truth-0.30.jar +zipkin-2.23.2.jar +zipkin-reporter-2.16.3.jar +zipkin-sender-okhttp3-2.16.3.jar \ No newline at end of file