Skip to content

Commit

Permalink
[ISSUE #340]Add http trace http point (#527)
Browse files Browse the repository at this point in the history
* tracing in AbstractHTTPServer

* add licence

* the span exporter

* design docs

* fix the error on text

* delete the useless dependence

* remove the unused code

* fix the different spanExporter

* change the class name

* fix gradle -build problem

* design docs improve

* fix the gradle.build error problem

* fixed

* unsure fix

* fix the path name

* fix check error

* format code

* add javadoc

* checkstyle fix

* unversioned files

* put context into channel in advance
  • Loading branch information
Roc-00 authored Dec 16, 2021
1 parent 05f8a3c commit a9fe559
Show file tree
Hide file tree
Showing 15 changed files with 768 additions and 120 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ subprojects {
dependency 'io.opentelemetry:opentelemetry-exporter-prometheus:1.3.0-alpha'
dependency 'io.prometheus:simpleclient:0.8.1'
dependency 'io.prometheus:simpleclient_httpserver:0.8.1'
dependency 'io.opentelemetry:opentelemetry-exporter-zipkin:1.3.0'
dependency 'io.opentelemetry:opentelemetry-semconv:1.3.0-alpha'

dependency "io.openmessaging:openmessaging-api:2.2.1-pubsub"

Expand Down
89 changes: 89 additions & 0 deletions docs/en/features/eventmesh-trace-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# eventmesh-HTTP-trace-design

## Introduction

[EventMesh(incubating)](https://github.com/apache/incubator-eventmesh) is a dynamic cloud-native eventing infrastructure.

## An overview of OpenTelemetry

OpenTelemetry is a collection of tools, APIs, and SDKs. You can use it to instrument, generate, collect, and export telemetry data (metrics, logs, and traces) for analysis in order to understand your software's performance and behavior.

## Requirements

- set tracer
- different exporter
- start and end span in server

## Design Details

* SpanProcessor: BatchSpanProcessor

* Exporter: log(default), would be changed from properties

```java
// Configure the batch spans processor. This span processor exports span in batches.
BatchSpanProcessor batchSpansProcessor =
BatchSpanProcessor.builder(exporter)
.setMaxExportBatchSize(512) // set the maximum batch size to use
.setMaxQueueSize(2048) // set the queue size. This must be >= the export batch size
.setExporterTimeout(
30, TimeUnit.SECONDS) // set the max amount of time an export can run before getting
// interrupted
.setScheduleDelay(5, TimeUnit.SECONDS) // set time between two different exports
.build();
OpenTelemetrySdk.builder()
.setTracerProvider(
SdkTracerProvider.builder().addSpanProcessor(batchSpansProcessor).build())
.build();
```

1. When using the method 'init()' of the class "EventMeshHTTPServer", the class "AbstractHTTPServer” will get the tracer

```java
super.openTelemetryTraceFactory = new OpenTelemetryTraceFactory(eventMeshHttpConfiguration);
super.tracer = openTelemetryTraceFactory.getTracer(this.getClass().toString());
super.textMapPropagator = openTelemetryTraceFactory.getTextMapPropagator();
```

2. then the trace in class "AbstractHTTPServer” will work.

## Problems

#### How to set different exporter in class 'OpenTelemetryTraceFactory'?(Solved)

After I get the exporter type from properties, how to deal with it.

The 'logExporter' only needs to new it.

But the 'zipkinExporter' needs to new and use the "getZipkinExporter()" method.

## Solutions
#### Solution of different exporter
Use reflection to get an exporter.

First of all, different exporter must implement the interface 'EventMeshExporter'.

Then we get the exporter name from the configuration and reflect to the class.
```java
//different spanExporter
String exporterName = configuration.eventMeshTraceExporterType;
//use reflection to get spanExporter
String className = String.format("org.apache.eventmesh.runtime.exporter.%sExporter",exporterName);
EventMeshExporter eventMeshExporter = (EventMeshExporter) Class.forName(className).newInstance();
spanExporter = eventMeshExporter.getSpanExporter(configuration);
```

Additional, this will surround with try catch.If the specified exporter cannot be obtained successfully, the default exporter log will be used instead

#### Improvement of different exporter

SPI(To be completed)


## Appendix

#### References

https://github.com/open-telemetry/docs-cn/blob/main/QUICKSTART.md

https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/netty
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ public class CommonConfiguration {
public String eventMeshSecurityPluginType = "security";
public int eventMeshPrometheusPort = 19090;
public String eventMeshRegistryPluginType = "namesrv";
public String eventMeshTraceExporterType = "Log";
public int eventMeshTraceExporterMaxExportSize = 512;
public int eventMeshTraceExporterMaxQueueSize = 2048;
public int eventMeshTraceExporterExportTimeout = 30;
public int eventMeshTraceExporterExportInterval = 5;
public String eventMeshTraceExportZipkinIp = "localhost";
public int eventMeshTraceExportZipkinPort = 9411;

public String namesrvAddr = "";
public Integer eventMeshRegisterIntervalInMills = 10 * 1000;
Expand All @@ -48,54 +55,125 @@ public CommonConfiguration(ConfigurationWrapper configurationWrapper) {
public void init() {

if (configurationWrapper != null) {
String eventMeshEnvStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ENV);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshEnvStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ENV));
String eventMeshEnvStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ENV);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshEnvStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ENV));
eventMeshEnv = StringUtils.deleteWhitespace(eventMeshEnvStr);

String sysIdStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SYSID);
Preconditions.checkState(StringUtils.isNotEmpty(sysIdStr) && StringUtils.isNumeric(sysIdStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SYSID));
String sysIdStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SYSID);
Preconditions.checkState(StringUtils.isNotEmpty(sysIdStr) && StringUtils.isNumeric(sysIdStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SYSID));
sysID = StringUtils.deleteWhitespace(sysIdStr);

String eventMeshClusterStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_CLUSTER);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshClusterStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_CLUSTER));
String eventMeshClusterStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_CLUSTER);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshClusterStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_CLUSTER));
eventMeshCluster = StringUtils.deleteWhitespace(eventMeshClusterStr);

String eventMeshNameStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_NAME);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshNameStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_NAME));
String eventMeshNameStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_NAME);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshNameStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_NAME));
eventMeshName = StringUtils.deleteWhitespace(eventMeshNameStr);

String eventMeshIDCStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_IDC);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshIDCStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_IDC));
eventMeshIDC = StringUtils.deleteWhitespace(eventMeshIDCStr);
String eventMeshIdcStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_IDC);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshIdcStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_IDC));
eventMeshIDC = StringUtils.deleteWhitespace(eventMeshIdcStr);

String eventMeshPrometheusPortStr = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_METRICS_PROMETHEUS_PORT);
String eventMeshPrometheusPortStr =
configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_METRICS_PROMETHEUS_PORT);
if (StringUtils.isNotEmpty(eventMeshPrometheusPortStr)) {
eventMeshPrometheusPort = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshPrometheusPortStr));
eventMeshPrometheusPort =
Integer.valueOf(StringUtils.deleteWhitespace(eventMeshPrometheusPortStr));
}

eventMeshServerIp = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP);
eventMeshServerIp =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP);
if (StringUtils.isBlank(eventMeshServerIp)) {
eventMeshServerIp = IPUtils.getLocalAddress();
}

eventMeshConnectorPluginType = configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE));
eventMeshConnectorPluginType =
configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType),
String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE));

String eventMeshServerAclEnableStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SECURITY_ENABLED);
String eventMeshServerAclEnableStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SECURITY_ENABLED);
if (StringUtils.isNotBlank(eventMeshServerAclEnableStr)) {
eventMeshServerSecurityEnable = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerAclEnableStr));
eventMeshServerSecurityEnable =
Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerAclEnableStr));
}

eventMeshSecurityPluginType = configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshSecurityPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE));
eventMeshSecurityPluginType =
configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshSecurityPluginType),
String.format("%s error", ConfKeys.KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE));

String eventMeshServerRegistryEnableStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_REGISTRY_ENABLED);
String eventMeshServerRegistryEnableStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_REGISTRY_ENABLED);
if (StringUtils.isNotBlank(eventMeshServerRegistryEnableStr)) {
eventMeshServerRegistryEnable = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerRegistryEnableStr));
eventMeshServerRegistryEnable =
Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerRegistryEnableStr));
}

eventMeshRegistryPluginType = configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_REGISTRY_PLUGIN_TYPE);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshRegistryPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_REGISTRY_PLUGIN_TYPE));
eventMeshRegistryPluginType =
configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_REGISTRY_PLUGIN_TYPE);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshRegistryPluginType),
String.format("%s error", ConfKeys.KEYS_ENENTMESH_REGISTRY_PLUGIN_TYPE));

String eventMeshTraceExporterTypeStr =
configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_TRACE_EXPORTER_TYPE);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTraceExporterTypeStr),
String.format("%s error", ConfKeys.KEYS_ENENTMESH_TRACE_EXPORTER_TYPE));
eventMeshTraceExporterType =
StringUtils.deleteWhitespace(eventMeshTraceExporterTypeStr);

String eventMeshTraceExporterMaxExportSizeStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORTER_MAX_EXPORT_SIZE);
if (StringUtils.isNotEmpty(eventMeshTraceExporterMaxExportSizeStr)) {
eventMeshTraceExporterMaxExportSize =
Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTraceExporterMaxExportSizeStr));
}

String eventMeshTraceExporterMaxQueueSizeStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORTER_MAX_QUEUE_SIZE);
if (StringUtils.isNotEmpty(eventMeshTraceExporterMaxQueueSizeStr)) {
eventMeshTraceExporterMaxQueueSize =
Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTraceExporterMaxQueueSizeStr));
}

String eventMeshTraceExporterExportTimeoutStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORTER_EXPORT_TIMEOUT);
if (StringUtils.isNotEmpty(eventMeshTraceExporterExportTimeoutStr)) {
eventMeshTraceExporterExportTimeout =
Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTraceExporterExportTimeoutStr));
}

String eventMeshTraceExporterExportIntervalStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORTER_EXPORT_INTERVAL);
if (StringUtils.isNotEmpty(eventMeshTraceExporterExportIntervalStr)) {
eventMeshTraceExporterExportInterval =
Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTraceExporterExportIntervalStr));
}

String eventMeshTraceExportZipkinIpStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_IP);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTraceExportZipkinIpStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_IP));
eventMeshTraceExportZipkinIp = StringUtils.deleteWhitespace(eventMeshTraceExportZipkinIpStr);

String eventMeshTraceExportZipkinPortStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_PORT);
if (StringUtils.isNotEmpty(eventMeshTraceExportZipkinPortStr)) {
eventMeshTraceExportZipkinPort =
Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTraceExportZipkinPortStr));
}
}
}

Expand All @@ -112,9 +190,11 @@ static class ConfKeys {

public static String KEYS_EVENTMESH_SERVER_HOST_IP = "eventMesh.server.hostIp";

public static String KEYS_EVENTMESH_SERVER_REGISTER_INTERVAL = "eventMesh.server.registry.registerIntervalInMills";
public static String KEYS_EVENTMESH_SERVER_REGISTER_INTERVAL =
"eventMesh.server.registry.registerIntervalInMills";

public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills";
public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL =
"eventMesh.server.registry.fetchRegistryAddrIntervalInMills";

public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type";

Expand All @@ -127,5 +207,19 @@ static class ConfKeys {
public static String KEYS_EVENTMESH_REGISTRY_ENABLED = "eventMesh.server.registry.enabled";

public static String KEYS_ENENTMESH_REGISTRY_PLUGIN_TYPE = "eventMesh.registry.plugin.type";

public static String KEYS_ENENTMESH_TRACE_EXPORTER_TYPE = "eventmesh.trace.exporter.type";

public static String KEYS_EVENTMESH_TRACE_EXPORTER_MAX_EXPORT_SIZE = "eventmesh.trace.exporter.max.export.size";

public static String KEYS_EVENTMESH_TRACE_EXPORTER_MAX_QUEUE_SIZE = "eventmesh.trace.exporter.max.queue.size";

public static String KEYS_EVENTMESH_TRACE_EXPORTER_EXPORT_TIMEOUT = "eventmesh.trace.exporter.export.timeout";

public static String KEYS_EVENTMESH_TRACE_EXPORTER_EXPORT_INTERVAL = "eventmesh.trace.exporter.export.interval";

public static String KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_IP = "eventmesh.trace.export.zipkin.ip";

public static String KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_PORT = "eventmesh.trace.export.zipkin.port";
}
}
4 changes: 3 additions & 1 deletion eventmesh-common/src/test/resources/configuration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ eventMesh.server.name=value5
eventMesh.server.hostIp=value6
eventMesh.connector.plugin.type=rocketmq
eventMesh.security.plugin.type=acl
eventMesh.registry.plugin.type=namesrv
eventMesh.registry.plugin.type=namesrv
eventmesh.trace.export.zipkin.ip=localhost
eventmesh.trace.exporter.type=Zipkin
3 changes: 3 additions & 0 deletions eventmesh-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ dependencies {
implementation 'io.prometheus:simpleclient'
implementation 'io.prometheus:simpleclient_httpserver'
implementation 'io.cloudevents:cloudevents-core'
implementation 'io.opentelemetry:opentelemetry-exporter-zipkin'
implementation 'io.opentelemetry:opentelemetry-semconv'


implementation "org.apache.httpcomponents:httpclient"
implementation 'io.netty:netty-all'
Expand Down
18 changes: 17 additions & 1 deletion eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,20 @@ eventMesh.server.registry.enabled=false
eventMesh.registry.plugin.type=namesrv

#prometheusPort
eventMesh.metrics.prometheus.port=19090
eventMesh.metrics.prometheus.port=19090

#trace exporter
eventmesh.trace.exporter.type=Zipkin

#set the maximum batch size to use
eventmesh.trace.exporter.max.export.size=512
#set the queue size. This must be >= the export batch size
eventmesh.trace.exporter.max.queue.size=2048
#set the max amount of time an export can run before getting(TimeUnit=SECONDS)
eventmesh.trace.exporter.export.timeout=30
#set time between two different exports(TimeUnit=SECONDS)
eventmesh.trace.exporter.export.interval=5

#zipkin
eventmesh.trace.export.zipkin.ip=localhost
eventmesh.trace.export.zipkin.port=9411
Loading

0 comments on commit a9fe559

Please sign in to comment.