Skip to content

Commit

Permalink
[Feature #562] Implement CloudEvents protocol adaptor (#595)
Browse files Browse the repository at this point in the history
* [Feature #564] Support CloudEvents protocols for pub/sub in EventMesh-feature design

* support cloudevents api in eventmesh-connector-api module

* fix checkStyle

* fix checkStyle

* fix checkStyle

* 1.support LifeCycle.java
2.update Consumer and Producer

* fix remove the extra blank line

* support cloudEvents

* Add files via upload

* Update README.md

* support cloudEvents

* support cloudEvents

* [ISSUE #580] Add checkstyle gradle plugin (#581)

* Add checkstyle gradle plugin, change plugin package

* skip check in ci

* support cloudEvents

* support cloudevents

* update wechat-official qr code

* update mesh-helper qr code

* Add files via upload

* update README.md

* update README.md

* Update .asf.yaml

* support cloudEvents

* support cloudEvents

* [ISSUE #588] Fix typo in README.md (#589)

close #588

* support cloudEvents

* [Bug #590] Consumer subscription topic is invalid (#590) (#592)

* [Bug #590] Consumer subscription topic is invalid (#590)

* [Bug #590] Consumer subscription topic is invalid (#590)

close #590

* support cloudEvents adaptor

* [Feature #562] Implement CloudEvents adaptor

Co-authored-by: Eason Chen <qqeasonchen@gmail.com>
Co-authored-by: Wenjun Ruan <wenjun@apache.org>
Co-authored-by: Nicholas Zhan <zhan_nicholas@outlook.com>
Co-authored-by: hagsyn <44764414+hagsyn@users.noreply.github.com>
  • Loading branch information
5 people authored Nov 18, 2021
1 parent d3a36b5 commit 7c68241
Show file tree
Hide file tree
Showing 40 changed files with 651 additions and 115 deletions.
3 changes: 3 additions & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ github:
description: EventMesh is a dynamic event-driven application runtime used to decouple the application and backend middleware layer, which supports a wide range of use cases that encompass complex multi-cloud, widely distributed topologies using diverse technology stacks.
homepage: https://eventmesh.apache.org/
labels:
- pubsub
- event-mesh
- event-gateway
- event-driven
Expand All @@ -33,6 +34,8 @@ github:
- message-bus
- cqrs
- multi-runtime
- microservice
- state-management
enabled_merge_buttons:
squash: true
merge: false
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ jobs:
java-version: ${{ matrix.java }}

- name: Build
run: ./gradlew clean build jacocoTestReport checkLicense
# skip check here, since we use Checkstyle task to check the added file
run: ./gradlew clean build jacocoTestReport checkLicense -x check

- name: Perform CodeQL analysis
uses: github/codeql-action/analyze@v1
Expand Down
2 changes: 2 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Editor -> Code Style -> Java -> Scheme -> Import Scheme -> CheckStyle Configurat
```
If you can't see CheckStyle Configuration section under Import Scheme, you can install CheckStyle-IDEA plugin first, and you will see it.

You can also use `./gradlew check` to check the code style.
(NOTE: this command will check all file in project, when you submit a pr, the ci will only check the file has been changed in this pr).
## Contributing

We are always very happy to have contributions, whether for typo fix, bug fix or big new features. Please do not ever
Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTING.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Editor -> Code Style -> Java -> Scheme -> Import Scheme -> CheckStyle Configurat
```
如果你在Import Scheme下看不到CheckStyle Configuration选项,你可以先安装CheckStyle-IDEA插件,然后你就可以看到这个选项了。

你也可以通过执行`./gradlew check`来检查代码格式。(NOTE: 这个命令将会检查整个项目中的代码格式, 当你提交一个PR时,CI只会检查在此次PR中被被修改的文件的代码格式)
## 贡献

无论是对于拼写错误,BUG修复还是重要的新功能,我们总是很乐意接受您的贡献。请不要犹豫,在Github Issue上提出或者通过邮件列表进行讨论。
Expand Down
34 changes: 15 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,13 @@

![logo](docs/images/logo2.png)
## What is EventMesh?
EventMesh(incubating) is a dynamic cloud-native eventing infrastruture used to decouple the application and backend middleware layer, which supports a wide range of use cases that encompass complex multi-cloud, widely distributed topologies using diverse technology stacks.
EventMesh(incubating) is a dynamic event-driven application runtime used to decouple the application and backend middleware layer, which supports a wide range of use cases that encompass complex multi-cloud, widely distributed topologies using diverse technology stacks.

![architecture1](docs/images/eventmesh-multi-runtime.png)

**EventMesh Ecosystem:**

![architecture1](docs/images/eventmesh-define.png)

**EventMesh Architecture:**

![architecture1](docs/images/eventmesh-runtime.png)

**EventMesh Cloud Native:**

![architecture2](docs/images/eventmesh-panels.png)

![architecture1](docs/images/eventmesh-runtime2.png)

**Components:**

Expand All @@ -37,9 +28,11 @@ EventMesh(incubating) is a dynamic cloud-native eventing infrastruture used to d
* **eventmesh-connector-rocketmq** : an implementation of eventmesh-connector-api, pub event to or sub event from RocketMQ as EventStore.
* **eventmesh-connector-kafka(WIP)** : an implementation of eventmesh-connector-api, pub event to or sub event from Kafka as EventStore.
* **eventmesh-connector-redis(WIP)** : an implementation of eventmesh-connector-api, pub event to or sub event from Redis as EventStore.
* **eventmesh-connector-defibus(WIP)** : an implementation of eventmesh-connector-api, pub event to or sub event from [DeFiBus](https://github.com/webankfintech/defibus) as EventStore
* **eventmesh-admin** : clients,topics,subscriptions and other management.
* **eventmesh-registry-plugin** : plugins for registry.
* **eventmesh-security-plugin** : plugins for security.
* **eventmesh-registry-plugin** : plugins for registry adapter.
* **eventmesh-security-plugin** : plugins for security adpater.
* **eventmesh-protocol-plugin** : plugins for protocol adapter.

**Protocol:**

Expand All @@ -54,9 +47,10 @@ Event & Service
- [ ] Event transaction
- [ ] At-least-once/at-most-once delivery guarantees

Store
Connector
- [x] RocketMQ
- [x] InMemory
- [ ] Federated
- [ ] Kafka
- [ ] Redis
- [ ] Pulsar
Expand Down Expand Up @@ -96,7 +90,7 @@ Governance
- [x] Client management
- [ ] Topic management
- [ ] Metadata registry
- [ ] Schema registry
- [x] Schema registry
- [ ] Dynamic config

Choreography
Expand All @@ -107,11 +101,13 @@ Security
- [ ] Auth
- [ ] ACL

Runtime
- [ ] WebAssembly runtime

## Quick Start
1. [Event-store](https://rocketmq.apache.org/docs/quick-start/) (RocketMQ, ignore this step if use standalone).
1. [Connector quickstart](https://rocketmq.apache.org/docs/quick-start/) (RocketMQ, ignore this step if use standalone).
2. [Runtime quickstart](docs/en/instructions/eventmesh-runtime-quickstart.md) or [Runtime quickstart with docker](docs/en/instructions/eventmesh-runtime-quickstart-with-docker.md).
3. [Java examples ](docs/en/instructions/eventmesh-sdk-java-quickstart.md).
3. [Java SDK examples](docs/en/instructions/eventmesh-sdk-java-quickstart.md).

## Contributing
Contributions are always welcomed! Please see [CONTRIBUTING](CONTRIBUTING.md) for detailed guidelines.
Expand All @@ -131,9 +127,9 @@ EventMesh enriches the <a href="https://landscape.cncf.io/serverless?license=apa
[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation.

## Community
| WeChat group | WeChat official account |
| WeChat group | WeChat public account |
| :---------------------------------------: | :----------------------------------------------------: |
| ![wechat_qr](docs/images/mesh-helper.png) | ![wechat_official_qr](docs/images/wechat-official.png) |
| ![wechat_qr](docs/images/mesh-helper.jpg) | ![wechat_official_qr](docs/images/wechat-official.png) |



Expand Down
73 changes: 43 additions & 30 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ allprojects {
apply plugin: "pmd"
apply plugin: "java-library"
apply plugin: 'signing'
apply plugin: 'checkstyle'
apply plugin: 'com.github.jk1.dependency-license-report'

[compileJava, compileTestJava, javadoc]*.options*.encoding = 'UTF-8'
Expand Down Expand Up @@ -78,6 +79,14 @@ allprojects {
writer.flush()
writer.close()
}

checkstyle {
toolVersion = '9.0'
ignoreFailures = false
showViolations = true
maxWarnings = 0
configFile = new File("${rootDir}/style/checkStyle.xml")
}
}

task tar(type: Tar) {
Expand All @@ -103,38 +112,42 @@ task installPlugin() {
if (!new File("${rootDir}/dist").exists()) {
return
}
// pluginType -> [pluginInstanceName -> moduleName]
Map<String, Map<String, String>> pluginTypeMap = [
"connector": ["rocketmq": "eventmesh-connector-rocketmq", "standalone": "eventmesh-connector-standalone",],
"security" : ["acl": "eventmesh-security-acl",],
"registry" : ["namesrv": "eventmesh-registry-namesrv",]
]
String[] libJars = java.util.Optional.ofNullable(new File("${rootDir}/dist/lib").list()).orElseGet(() -> new String[0])
getAllprojects().forEach(subProject -> {
pluginTypeMap.forEach((pluginType, pluginInstanceMap) -> {
pluginInstanceMap.forEach((pluginInstanceName, moduleName) -> {
if (moduleName == subProject.name) {
println String.format("install plugin, pluginType: %s, pluginInstanceName: %s, module: %s",
pluginType, pluginInstanceName, moduleName)

new File("${rootDir}/dist/plugin/${pluginType}/${pluginInstanceName}").mkdirs()
copy {
into "${rootDir}/dist/plugin/${pluginType}/${pluginInstanceName}"
from "${subProject.getProjectDir()}/dist/apps"
}
copy {
into "${rootDir}/dist/plugin/${pluginType}/${pluginInstanceName}"
from "${subProject.getProjectDir()}/dist/lib/"
exclude(libJars)
}
copy {
into "${rootDir}/dist/conf"
from "${subProject.getProjectDir()}/dist/conf"
exclude 'META-INF'
}
}
})
})
var file = new File("${subProject.projectDir}/gradle.properties")
if (!file.exists()) {
return
}
var properties = new Properties()
properties.load(new FileInputStream(file))
var pluginType = properties.getProperty("pluginType")
var pluginName = properties.getProperty("pluginName")
if (pluginType == null || pluginName == null) {
return
}
var pluginFile = new File("${rootDir}/dist/plugin/${pluginType}/${pluginName}")
if (pluginFile.exists()) {
return
}
pluginFile.mkdirs()
println String.format(
"install plugin, pluginType: %s, pluginInstanceName: %s, module: %s", pluginType, pluginName, subProject.getName()
)

copy {
into "${rootDir}/dist/plugin/${pluginType}/${pluginName}"
from "${subProject.getProjectDir()}/dist/apps"
}
copy {
into "${rootDir}/dist/plugin/${pluginType}/${pluginName}"
from "${subProject.getProjectDir()}/dist/lib/"
exclude(libJars)
}
copy {
into "${rootDir}/dist/conf"
from "${subProject.getProjectDir()}/dist/conf"
exclude 'META-INF'
}
})
}

Expand Down
Binary file added docs/images/Wechat-helper.jpeg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/eventmesh-runtime2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/mesh-helper.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/images/wechat-official.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public class ProtocolKey {
public static final String LANGUAGE = "Language";
public static final String VERSION = "Version";

public static final String PROTOCOL_TYPE = "protocol_type";

public static final String PROTOCOL_VERSION = "protocol_version";

public static final String PROTOCOL_DESC = "protocol_desc";

public static class ClientInstanceKey {
////////////////////////////////////Protocol layer requester description///////////
public static final String ENV = "Env";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ public class ReplyMessageRequestHeader extends Header {
//protocol version adopted by requester, default:1.0
private ProtocolVersion version;

//protocol type, cloudevents or eventmeshMessage
private String protocolType;

//protocol version, cloudevents:1.0 or 0.3
private String protocolVersion;

//protocol desc
private String protocolDesc;

//the environment number of the requester
private String env;

Expand Down Expand Up @@ -139,10 +148,37 @@ public void setIp(String ip) {
this.ip = ip;
}

public String getProtocolType() {
return protocolType;
}

public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getProtocolVersion() {
return protocolVersion;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public String getProtocolDesc() {
return protocolDesc;
}

public void setProtocolDesc(String protocolDesc) {
this.protocolDesc = protocolDesc;
}

public static ReplyMessageRequestHeader buildHeader(Map<String, Object> headerParam) {
ReplyMessageRequestHeader header = new ReplyMessageRequestHeader();
header.setCode(MapUtils.getString(headerParam, ProtocolKey.REQUEST_CODE));
header.setVersion(ProtocolVersion.get(MapUtils.getString(headerParam, ProtocolKey.VERSION)));
header.setProtocolType(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_TYPE));
header.setProtocolVersion(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_VERSION));
header.setProtocolDesc(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_DESC));
String lan = StringUtils.isBlank(MapUtils.getString(headerParam, ProtocolKey.LANGUAGE))
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public class SendMessageBatchRequestHeader extends Header {
//protocol version adopted by requester, default:1.0
private ProtocolVersion version;

//protocol type, cloudevents or eventmeshMessage
private String protocolType;

//protocol version, cloudevents:1.0 or 0.3
private String protocolVersion;

//protocol desc
private String protocolDesc;

//the environment number of the requester
private String env;

Expand Down Expand Up @@ -140,10 +149,37 @@ public void setIp(String ip) {
this.ip = ip;
}

public String getProtocolType() {
return protocolType;
}

public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getProtocolVersion() {
return protocolVersion;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public String getProtocolDesc() {
return protocolDesc;
}

public void setProtocolDesc(String protocolDesc) {
this.protocolDesc = protocolDesc;
}

public static SendMessageBatchRequestHeader buildHeader(final Map<String, Object> headerParam) {
SendMessageBatchRequestHeader header = new SendMessageBatchRequestHeader();
header.setCode(MapUtils.getString(headerParam, ProtocolKey.REQUEST_CODE));
header.setVersion(ProtocolVersion.get(MapUtils.getString(headerParam, ProtocolKey.VERSION)));
header.setProtocolType(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_TYPE));
header.setProtocolVersion(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_VERSION));
header.setProtocolDesc(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_DESC));
String lan = StringUtils.isBlank(MapUtils.getString(headerParam, ProtocolKey.LANGUAGE))
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
Expand Down
Loading

0 comments on commit 7c68241

Please sign in to comment.