Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #417] Grpc Transport Protocol support #710

Merged
merged 21 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ allprojects {
maxWarnings = 0
configFile = new File("${rootDir}/style/checkStyle.xml")
}

checkstyleMain.exclude '**/org/apache/eventmesh/client/grpc/protos**'
}

task tar(type: Tar) {
Expand Down Expand Up @@ -454,6 +456,11 @@ subprojects {

dependency "io.cloudevents:cloudevents-core:2.2.0"
dependency "io.cloudevents:cloudevents-json-jackson:2.2.0"

dependency "io.grpc:grpc-protobuf:1.15.0"
dependency "io.grpc:grpc-stub:1.15.0"
dependency "io.grpc:grpc-netty:1.15.0"
dependency "io.grpc:grpc-netty-shaded:1.15.0"
}
}
}
161 changes: 161 additions & 0 deletions docs/cn/instructions/eventmesh-runtime-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,164 @@ public class LiteMessage {
| 场景 | Server向Client发送消息请求码 | Client回复Server消息响应码 | 说明 |
| ------------------ | ---------------------------- | -------------------------- | ---------------------- |
| 客户端接收异步事件 | HTTP_PUSH_CLIENT_ASYNC(105) | retCode | retCode值为0时代表成功 |


## gRPC 协议文档

#### 1. protobuf

在 `eventmesh-protocol-gprc` 模块有 Eventmesh gRPC 客户端的 protobuf 文件. the protobuf 文件路径是 `/src/main/proto/eventmesh-client.proto`.

用gradle build 生成 gRPC 代码在 `/build/generated/source/proto/main`. 生成代码用于 `eventmesh-sdk-java` 模块.

#### 2. gRPC 数据模型

- 消息

以下消息数据模型用于 `publish()`, `requestReply()` 和 `broadcast()` APIs.

```
message RequestHeader {
string env = 1;
string region = 2;
string idc = 3;
string ip = 4;
string pid = 5;
string sys = 6;
string username = 7;
string password = 8;
string language = 9;
string protocolType = 10;
string protocolVersion = 11;
string protocolDesc = 12;
}

message EventMeshMessage {
RequestHeader header = 1;
string producerGroup = 2;
string topic = 3;
string content = 4;
string ttl = 5;
string uniqueId = 6;
string seqNum = 7;
string tag = 8;
map<string, string> properties = 9;
}

message BatchMessage {
RequestHeader header = 1;
string producerGroup = 2;
string topic = 3;

message MessageItem {
string content = 1;
string ttl = 2;
string uniqueId = 3;
string seqNum = 4;
string tag = 5;
map<string, string> properties = 6;
}

repeated MessageItem messageItem = 4;
}

message Response {
string respCode = 1;
string respMsg = 2;
string respTime = 3;
}
```

- 订阅

以下订阅数据模型用于 `subscribe()` 和 `unsubscribe()` APIs.

```
message Subscription {
RequestHeader header = 1;
string consumerGroup = 2;

message SubscriptionItem {
enum SubscriptionMode {
CLUSTERING = 0;
BROADCASTING = 1;
}

enum SubscriptionType {
ASYNC = 0;
SYNC = 1;
}

string topic = 1;
SubscriptionMode mode = 2;
SubscriptionType type = 3;
}

repeated SubscriptionItem subscriptionItems = 3;
string url = 4;
}
```

- 心跳

以下心跳数据模型用于 `heartbeat()` API.

```
message Heartbeat {
enum ClientType {
PUB = 0;
SUB = 1;
}

RequestHeader header = 1;
ClientType clientType = 2;
string producerGroup = 3;
string consumerGroup = 4;

message HeartbeatItem {
string topic = 1;
string url = 2;
}

repeated HeartbeatItem heartbeatItems = 5;
}
```

#### 3. gRPC 服务接口

- 事件生产端服务 APIs

```
service PublisherService {
# 异步事件生产
rpc publish(EventMeshMessage) returns (Response);

# 同步事件生产
rpc requestReply(EventMeshMessage) returns (Response);

# 批量事件生产
rpc batchPublish(BatchMessage) returns (Response);
}
```

- 事件消费端服务 APIs

```
service ConsumerService {
# 所消费事件通过 HTTP Webhook推送事件
rpc subscribe(Subscription) returns (Response);

# 所消费事件通过 TCP stream推送事件
rpc subscribeStream(Subscription) returns (stream EventMeshMessage);

rpc unsubscribe(Subscription) returns (Response);
}
```

- 客户端心跳服务 API

```
service HeartbeatService {
rpc heartbeat(Heartbeat) returns (Response);
}
```
163 changes: 163 additions & 0 deletions docs/en/instructions/eventmesh-runtime-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,166 @@ same with RequestHeader of Heartbeat Msg
| Scene | Server Send | Client Reply | Remark |
| ------------------ | ---------------------------- | -------------------------- | ---------------------- |
| Push async msg to client | HTTP_PUSH_CLIENT_ASYNC(105) | retCode | retCode=0,send success |

## gRPC Protocol Document In Eventmesh-Runtime

#### 1. protobuf

The `eventmesh-protocol-gprc` module contains the protobuf file of the evenmesh client. the protobuf file
is located as `/src/main/proto/eventmesh-client.proto`.

Run the gradle build to generate the gRPC codes. The generated codes are located at `/build/generated/source/proto/main`.

These generated grpc codes will be used in `eventmesh-sdk-java` module.

#### 2. data models

- message

The following is the message data model, used by `publish()`, `requestReply()` and `broadcast()` APIs.

```
message RequestHeader {
string env = 1;
string region = 2;
string idc = 3;
string ip = 4;
string pid = 5;
string sys = 6;
string username = 7;
string password = 8;
string language = 9;
string protocolType = 10;
string protocolVersion = 11;
string protocolDesc = 12;
}

message EventMeshMessage {
RequestHeader header = 1;
string producerGroup = 2;
string topic = 3;
string content = 4;
string ttl = 5;
string uniqueId = 6;
string seqNum = 7;
string tag = 8;
map<string, string> properties = 9;
}

message BatchMessage {
RequestHeader header = 1;
string producerGroup = 2;
string topic = 3;

message MessageItem {
string content = 1;
string ttl = 2;
string uniqueId = 3;
string seqNum = 4;
string tag = 5;
map<string, string> properties = 6;
}

repeated MessageItem messageItem = 4;
}

message Response {
string respCode = 1;
string respMsg = 2;
string respTime = 3;
}
```

- subscription

The following data model is used by `subscribe()` and `unsubscribe()` APIs.

```
message Subscription {
RequestHeader header = 1;
string consumerGroup = 2;

message SubscriptionItem {
enum SubscriptionMode {
CLUSTERING = 0;
BROADCASTING = 1;
}

enum SubscriptionType {
ASYNC = 0;
SYNC = 1;
}

string topic = 1;
SubscriptionMode mode = 2;
SubscriptionType type = 3;
}

repeated SubscriptionItem subscriptionItems = 3;
string url = 4;
}
```

- heartbeat

The following data model is used by `heartbeat()` API.

```
message Heartbeat {
enum ClientType {
PUB = 0;
SUB = 1;
}

RequestHeader header = 1;
ClientType clientType = 2;
string producerGroup = 3;
string consumerGroup = 4;

message HeartbeatItem {
string topic = 1;
string url = 2;
}

repeated HeartbeatItem heartbeatItems = 5;
}
```

#### 3. service operations

- event publisher service APIs

```
service PublisherService {
# Async event publish
rpc publish(EventMeshMessage) returns (Response);

# Sync event publish
rpc requestReply(EventMeshMessage) returns (Response);

# Batch event publish
rpc batchPublish(BatchMessage) returns (Response);
}
```

- event consumer service APIs

```
service ConsumerService {
# The subscribed event will be delivered by invoking the webhook url in the Subscription
rpc subscribe(Subscription) returns (Response);

# The subscribed event will be delivered through stream of Message
rpc subscribeStream(Subscription) returns (stream EventMeshMessage);

rpc unsubscribe(Subscription) returns (Response);
}
```

- client heartbeat service API

```
service HeartbeatService {
rpc heartbeat(Heartbeat) returns (Response);
}
```
3 changes: 3 additions & 0 deletions eventmesh-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ dependencies {

implementation "io.netty:netty-all"

implementation "io.grpc:grpc-protobuf:1.15.0"
implementation "io.grpc:grpc-stub:1.15.0"

implementation "com.github.stefanbirkner:system-rules"

compileOnly 'org.projectlombok:lombok:1.18.22'
Expand Down
Loading