Skip to content

Commit

Permalink
[Issue apache#417] Grpc Transport Protocol support (apache#710)
Browse files Browse the repository at this point in the history
Grpc Transport Protocol support
  • Loading branch information
jinrongluo authored and xwm1992 committed Feb 16, 2022
1 parent e051d58 commit 888e18e
Show file tree
Hide file tree
Showing 93 changed files with 19,770 additions and 7 deletions.
7 changes: 7 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ allprojects {
configFile = new File("${rootDir}/style/checkStyle.xml")
}

checkstyleMain.exclude '**/org/apache/eventmesh/client/grpc/protos**'

dependencies {
testImplementation "junit:junit"
}
Expand Down Expand Up @@ -464,6 +466,11 @@ subprojects {
dependency "io.cloudevents:cloudevents-core:2.2.0"
dependency "io.cloudevents:cloudevents-json-jackson:2.2.0"

dependency "io.grpc:grpc-protobuf:1.15.0"
dependency "io.grpc:grpc-stub:1.15.0"
dependency "io.grpc:grpc-netty:1.15.0"
dependency "io.grpc:grpc-netty-shaded:1.15.0"

dependency "com.github.seancfoley:ipaddress:5.3.3"
}
}
Expand Down
161 changes: 161 additions & 0 deletions docs/cn/instructions/eventmesh-runtime-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,164 @@ public class LiteMessage {
| 场景 | Server向Client发送消息请求码 | Client回复Server消息响应码 | 说明 |
| ------------------ | ---------------------------- | -------------------------- | ---------------------- |
| 客户端接收异步事件 | HTTP_PUSH_CLIENT_ASYNC(105) | retCode | retCode值为0时代表成功 |


## gRPC 协议文档

#### 1. protobuf

`eventmesh-protocol-gprc` 模块有 Eventmesh gRPC 客户端的 protobuf 文件. the protobuf 文件路径是 `/src/main/proto/eventmesh-client.proto`.

用gradle build 生成 gRPC 代码在 `/build/generated/source/proto/main`. 生成代码用于 `eventmesh-sdk-java` 模块.

#### 2. gRPC 数据模型

- 消息

以下消息数据模型用于 `publish()`, `requestReply()``broadcast()` APIs.

```
message RequestHeader {
string env = 1;
string region = 2;
string idc = 3;
string ip = 4;
string pid = 5;
string sys = 6;
string username = 7;
string password = 8;
string language = 9;
string protocolType = 10;
string protocolVersion = 11;
string protocolDesc = 12;
}
message EventMeshMessage {
RequestHeader header = 1;
string producerGroup = 2;
string topic = 3;
string content = 4;
string ttl = 5;
string uniqueId = 6;
string seqNum = 7;
string tag = 8;
map<string, string> properties = 9;
}
message BatchMessage {
RequestHeader header = 1;
string producerGroup = 2;
string topic = 3;
message MessageItem {
string content = 1;
string ttl = 2;
string uniqueId = 3;
string seqNum = 4;
string tag = 5;
map<string, string> properties = 6;
}
repeated MessageItem messageItem = 4;
}
message Response {
string respCode = 1;
string respMsg = 2;
string respTime = 3;
}
```

- 订阅

以下订阅数据模型用于 `subscribe()``unsubscribe()` APIs.

```
message Subscription {
RequestHeader header = 1;
string consumerGroup = 2;
message SubscriptionItem {
enum SubscriptionMode {
CLUSTERING = 0;
BROADCASTING = 1;
}
enum SubscriptionType {
ASYNC = 0;
SYNC = 1;
}
string topic = 1;
SubscriptionMode mode = 2;
SubscriptionType type = 3;
}
repeated SubscriptionItem subscriptionItems = 3;
string url = 4;
}
```

- 心跳

以下心跳数据模型用于 `heartbeat()` API.

```
message Heartbeat {
enum ClientType {
PUB = 0;
SUB = 1;
}
RequestHeader header = 1;
ClientType clientType = 2;
string producerGroup = 3;
string consumerGroup = 4;
message HeartbeatItem {
string topic = 1;
string url = 2;
}
repeated HeartbeatItem heartbeatItems = 5;
}
```

#### 3. gRPC 服务接口

- 事件生产端服务 APIs

```
service PublisherService {
# 异步事件生产
rpc publish(EventMeshMessage) returns (Response);
# 同步事件生产
rpc requestReply(EventMeshMessage) returns (Response);
# 批量事件生产
rpc batchPublish(BatchMessage) returns (Response);
}
```

- 事件消费端服务 APIs

```
service ConsumerService {
# 所消费事件通过 HTTP Webhook推送事件
rpc subscribe(Subscription) returns (Response);
# 所消费事件通过 TCP stream推送事件
rpc subscribeStream(Subscription) returns (stream EventMeshMessage);
rpc unsubscribe(Subscription) returns (Response);
}
```

- 客户端心跳服务 API

```
service HeartbeatService {
rpc heartbeat(Heartbeat) returns (Response);
}
```
163 changes: 163 additions & 0 deletions docs/en/instructions/eventmesh-runtime-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,166 @@ same with RequestHeader of Heartbeat Msg
| Scene | Server Send | Client Reply | Remark |
| ------------------ | ---------------------------- | -------------------------- | ---------------------- |
| Push async msg to client | HTTP_PUSH_CLIENT_ASYNC(105) | retCode | retCode=0,send success |

## gRPC Protocol Document In Eventmesh-Runtime

#### 1. protobuf

The `eventmesh-protocol-gprc` module contains the protobuf file of the evenmesh client. the protobuf file
is located as `/src/main/proto/eventmesh-client.proto`.

Run the gradle build to generate the gRPC codes. The generated codes are located at `/build/generated/source/proto/main`.

These generated grpc codes will be used in `eventmesh-sdk-java` module.

#### 2. data models

- message

The following is the message data model, used by `publish()`, `requestReply()` and `broadcast()` APIs.

```
message RequestHeader {
string env = 1;
string region = 2;
string idc = 3;
string ip = 4;
string pid = 5;
string sys = 6;
string username = 7;
string password = 8;
string language = 9;
string protocolType = 10;
string protocolVersion = 11;
string protocolDesc = 12;
}
message EventMeshMessage {
RequestHeader header = 1;
string producerGroup = 2;
string topic = 3;
string content = 4;
string ttl = 5;
string uniqueId = 6;
string seqNum = 7;
string tag = 8;
map<string, string> properties = 9;
}
message BatchMessage {
RequestHeader header = 1;
string producerGroup = 2;
string topic = 3;
message MessageItem {
string content = 1;
string ttl = 2;
string uniqueId = 3;
string seqNum = 4;
string tag = 5;
map<string, string> properties = 6;
}
repeated MessageItem messageItem = 4;
}
message Response {
string respCode = 1;
string respMsg = 2;
string respTime = 3;
}
```

- subscription

The following data model is used by `subscribe()` and `unsubscribe()` APIs.

```
message Subscription {
RequestHeader header = 1;
string consumerGroup = 2;
message SubscriptionItem {
enum SubscriptionMode {
CLUSTERING = 0;
BROADCASTING = 1;
}
enum SubscriptionType {
ASYNC = 0;
SYNC = 1;
}
string topic = 1;
SubscriptionMode mode = 2;
SubscriptionType type = 3;
}
repeated SubscriptionItem subscriptionItems = 3;
string url = 4;
}
```

- heartbeat

The following data model is used by `heartbeat()` API.

```
message Heartbeat {
enum ClientType {
PUB = 0;
SUB = 1;
}
RequestHeader header = 1;
ClientType clientType = 2;
string producerGroup = 3;
string consumerGroup = 4;
message HeartbeatItem {
string topic = 1;
string url = 2;
}
repeated HeartbeatItem heartbeatItems = 5;
}
```

#### 3. service operations

- event publisher service APIs

```
service PublisherService {
# Async event publish
rpc publish(EventMeshMessage) returns (Response);
# Sync event publish
rpc requestReply(EventMeshMessage) returns (Response);
# Batch event publish
rpc batchPublish(BatchMessage) returns (Response);
}
```

- event consumer service APIs

```
service ConsumerService {
# The subscribed event will be delivered by invoking the webhook url in the Subscription
rpc subscribe(Subscription) returns (Response);
# The subscribed event will be delivered through stream of Message
rpc subscribeStream(Subscription) returns (stream EventMeshMessage);
rpc unsubscribe(Subscription) returns (Response);
}
```

- client heartbeat service API

```
service HeartbeatService {
rpc heartbeat(Heartbeat) returns (Response);
}
```
3 changes: 3 additions & 0 deletions eventmesh-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ dependencies {

implementation "io.netty:netty-all"

implementation "io.grpc:grpc-protobuf:1.15.0"
implementation "io.grpc:grpc-stub:1.15.0"

implementation "com.github.stefanbirkner:system-rules"

compileOnly 'org.projectlombok:lombok:1.18.22'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.protocol.grpc.common;

import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage;

public class BatchMessageWrapper implements ProtocolTransportObject {

private BatchMessage batchMessage;

public BatchMessageWrapper(BatchMessage batchMessage) {
this.batchMessage = batchMessage;
}

public BatchMessage getMessage() {
return batchMessage;
}
}
Loading

0 comments on commit 888e18e

Please sign in to comment.