Skip to content

Commit

Permalink
[Feature apache#562] Implement CloudEvents protocol adaptor (apache#595)
Browse files Browse the repository at this point in the history
* [Feature apache#564] Support CloudEvents protocols for pub/sub in EventMesh-feature design

* support cloudevents api in eventmesh-connector-api module

* fix checkStyle

* fix checkStyle

* fix checkStyle

* 1.support LifeCycle.java
2.update Consumer and Producer

* fix remove the extra blank line

* support cloudEvents

* Add files via upload

* Update README.md

* support cloudEvents

* support cloudEvents

* [ISSUE apache#580] Add checkstyle gradle plugin (apache#581)

* Add checkstyle gradle plugin, change plugin package

* skip check in ci

* support cloudEvents

* support cloudevents

* update wechat-official qr code

* update mesh-helper qr code

* Add files via upload

* update README.md

* update README.md

* Update .asf.yaml

* support cloudEvents

* support cloudEvents

* [ISSUE apache#588] Fix typo in README.md (apache#589)

close apache#588

* support cloudEvents

* [Bug apache#590] Consumer subscription topic is invalid (apache#590) (apache#592)

* [Bug apache#590] Consumer subscription topic is invalid (apache#590)

* [Bug apache#590] Consumer subscription topic is invalid (apache#590)

close apache#590

* support cloudEvents adaptor

* [Feature apache#562] Implement CloudEvents adaptor

Co-authored-by: Eason Chen <qqeasonchen@gmail.com>
Co-authored-by: Wenjun Ruan <wenjun@apache.org>
Co-authored-by: Nicholas Zhan <zhan_nicholas@outlook.com>
Co-authored-by: hagsyn <44764414+hagsyn@users.noreply.github.com>
  • Loading branch information
5 people committed Dec 27, 2021
1 parent b5a2ccf commit 6c2a85a
Show file tree
Hide file tree
Showing 23 changed files with 494 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public class ProtocolKey {
public static final String LANGUAGE = "Language";
public static final String VERSION = "Version";

public static final String PROTOCOL_TYPE = "protocol_type";

public static final String PROTOCOL_VERSION = "protocol_version";

public static final String PROTOCOL_DESC = "protocol_desc";

public static class ClientInstanceKey {
////////////////////////////////////Protocol layer requester description///////////
public static final String ENV = "Env";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ public class ReplyMessageRequestHeader extends Header {
//protocol version adopted by requester, default:1.0
private ProtocolVersion version;

//protocol type, cloudevents or eventmeshMessage
private String protocolType;

//protocol version, cloudevents:1.0 or 0.3
private String protocolVersion;

//protocol desc
private String protocolDesc;

//the environment number of the requester
private String env;

Expand Down Expand Up @@ -139,10 +148,37 @@ public void setIp(String ip) {
this.ip = ip;
}

public String getProtocolType() {
return protocolType;
}

public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getProtocolVersion() {
return protocolVersion;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public String getProtocolDesc() {
return protocolDesc;
}

public void setProtocolDesc(String protocolDesc) {
this.protocolDesc = protocolDesc;
}

public static ReplyMessageRequestHeader buildHeader(Map<String, Object> headerParam) {
ReplyMessageRequestHeader header = new ReplyMessageRequestHeader();
header.setCode(MapUtils.getString(headerParam, ProtocolKey.REQUEST_CODE));
header.setVersion(ProtocolVersion.get(MapUtils.getString(headerParam, ProtocolKey.VERSION)));
header.setProtocolType(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_TYPE));
header.setProtocolVersion(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_VERSION));
header.setProtocolDesc(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_DESC));
String lan = StringUtils.isBlank(MapUtils.getString(headerParam, ProtocolKey.LANGUAGE))
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public class SendMessageBatchRequestHeader extends Header {
//protocol version adopted by requester, default:1.0
private ProtocolVersion version;

//protocol type, cloudevents or eventmeshMessage
private String protocolType;

//protocol version, cloudevents:1.0 or 0.3
private String protocolVersion;

//protocol desc
private String protocolDesc;

//the environment number of the requester
private String env;

Expand Down Expand Up @@ -140,10 +149,37 @@ public void setIp(String ip) {
this.ip = ip;
}

public String getProtocolType() {
return protocolType;
}

public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getProtocolVersion() {
return protocolVersion;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public String getProtocolDesc() {
return protocolDesc;
}

public void setProtocolDesc(String protocolDesc) {
this.protocolDesc = protocolDesc;
}

public static SendMessageBatchRequestHeader buildHeader(final Map<String, Object> headerParam) {
SendMessageBatchRequestHeader header = new SendMessageBatchRequestHeader();
header.setCode(MapUtils.getString(headerParam, ProtocolKey.REQUEST_CODE));
header.setVersion(ProtocolVersion.get(MapUtils.getString(headerParam, ProtocolKey.VERSION)));
header.setProtocolType(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_TYPE));
header.setProtocolVersion(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_VERSION));
header.setProtocolDesc(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_DESC));
String lan = StringUtils.isBlank(MapUtils.getString(headerParam, ProtocolKey.LANGUAGE))
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ public class SendMessageBatchV2RequestHeader extends Header {
//protocol version adopted by requester, default:1.0
private ProtocolVersion version;

//protocol type, cloudevents or eventmeshMessage
private String protocolType;

//protocol version, cloudevents:1.0 or 0.3
private String protocolVersion;

//protocol desc
private String protocolDesc;

//the environment number of the requester
private String env;

Expand Down Expand Up @@ -139,10 +148,38 @@ public void setIp(String ip) {
this.ip = ip;
}

public String getProtocolType() {
return protocolType;
}

public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getProtocolVersion() {
return protocolVersion;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public String getProtocolDesc() {
return protocolDesc;
}

public void setProtocolDesc(String protocolDesc) {
this.protocolDesc = protocolDesc;
}

public static SendMessageBatchV2RequestHeader buildHeader(final Map<String, Object> headerParam) {
SendMessageBatchV2RequestHeader header = new SendMessageBatchV2RequestHeader();
header.setCode(MapUtils.getString(headerParam, ProtocolKey.REQUEST_CODE));
header.setVersion(ProtocolVersion.get(MapUtils.getString(headerParam, ProtocolKey.VERSION)));
header.setProtocolType(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_TYPE));
header.setProtocolVersion(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_VERSION));
header.setProtocolDesc(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_DESC));

String lan = StringUtils.isBlank(MapUtils.getString(headerParam, ProtocolKey.LANGUAGE))
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ public class SendMessageRequestHeader extends Header {
//protocol version adopted by requester, default:1.0
private ProtocolVersion version;

//protocol type, cloudevents or eventmeshMessage
private String protocolType;

//protocol version, cloudevents:1.0 or 0.3
private String protocolVersion;

//protocol desc
private String protocolDesc;

//the environment number of the requester
private String env;

Expand Down Expand Up @@ -139,10 +148,38 @@ public void setIp(String ip) {
this.ip = ip;
}

public String getProtocolType() {
return protocolType;
}

public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getProtocolVersion() {
return protocolVersion;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public String getProtocolDesc() {
return protocolDesc;
}

public void setProtocolDesc(String protocolDesc) {
this.protocolDesc = protocolDesc;
}

public static SendMessageRequestHeader buildHeader(Map<String, Object> headerParam) {
SendMessageRequestHeader header = new SendMessageRequestHeader();
header.setCode(MapUtils.getString(headerParam, ProtocolKey.REQUEST_CODE));
header.setVersion(ProtocolVersion.get(MapUtils.getString(headerParam, ProtocolKey.VERSION)));
header.setProtocolType(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_TYPE));
header.setProtocolVersion(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_VERSION));
header.setProtocolDesc(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_DESC));

String lan = StringUtils.isBlank(MapUtils.getString(headerParam, ProtocolKey.LANGUAGE))
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public interface ProtocolAdaptor<T> {
* @param cloudEvent clout event
* @return target protocol
*/
T fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException;
Object fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException;

/**
* Get protocol type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,22 @@
package org.apache.eventmesh.protocol.cloudevents;

import io.cloudevents.CloudEvent;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;

import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchV2ProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageRequestProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.tcp.TcpMessageProtocolResolver;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
Expand All @@ -34,14 +44,43 @@
public class CloudEventsProtocolAdaptor<T> implements ProtocolAdaptor<T> {

@Override
public CloudEvent toCloudEvent(T cloudEvent) {
public CloudEvent toCloudEvent(T cloudEvent) throws ProtocolHandleException {

if (cloudEvent instanceof Package){
//todo:convert package to cloudevents
}else if (cloudEvent instanceof HttpCommand){
//todo:convert httpCommand to cloudevents
if (cloudEvent instanceof Package) {
Header header = ((Package) cloudEvent).getHeader();
Object body = ((Package) cloudEvent).getBody();

return deserializeTcpProtocol(header, body);

} else if (cloudEvent instanceof HttpCommand) {
org.apache.eventmesh.common.protocol.http.header.Header header = ((HttpCommand) cloudEvent).getHeader();
Body body = ((HttpCommand) cloudEvent).getBody();
String requestCode = ((HttpCommand) cloudEvent).getRequestCode();

return deserializeHttpProtocol(requestCode, header, body);
} else {
throw new ProtocolHandleException(String.format("protocol class: %s", cloudEvent.getClass()));
}
return null;
}

private CloudEvent deserializeTcpProtocol(Header header, Object body) throws ProtocolHandleException {
return TcpMessageProtocolResolver.buildEvent(header, body);
}

private CloudEvent deserializeHttpProtocol(String requestCode, org.apache.eventmesh.common.protocol.http.header.Header header, Body body) throws ProtocolHandleException {

if (String.valueOf(RequestCode.MSG_BATCH_SEND.getRequestCode()).equals(requestCode)) {
return SendMessageBatchProtocolResolver.buildEvent(header, body);
} else if (String.valueOf(RequestCode.MSG_BATCH_SEND_V2.getRequestCode()).equals(requestCode)) {
return SendMessageBatchV2ProtocolResolver.buildEvent(header, body);
} else if (String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()).equals(requestCode)) {
return SendMessageRequestProtocolResolver.buildEvent(header, body);
} else if (String.valueOf(RequestCode.MSG_SEND_ASYNC.getRequestCode()).equals(requestCode)) {
return SendMessageRequestProtocolResolver.buildEvent(header, body);
} else {
throw new ProtocolHandleException(String.format("unsupported requestCode: %s", requestCode));
}

}

@Override
Expand All @@ -50,8 +89,18 @@ public List<CloudEvent> toBatchCloudEvent(T protocol) throws ProtocolHandleExcep
}

@Override
public T fromCloudEvent(CloudEvent cloudEvent) {
return null;
public Object fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException {
String protocolDesc = cloudEvent.getExtension(Constants.PROTOCOL_DESC).toString();
if (StringUtils.equals("http", protocolDesc)) {
return new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8);
} else if (StringUtils.equals("tcp", protocolDesc)) {
Package pkg = new Package();
pkg.setBody(cloudEvent);
return pkg;
} else {
throw new ProtocolHandleException(String.format("Unsupported protocolDesc: %s", protocolDesc));
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.eventmesh.protocol.cloudevents.resolver.http;

import io.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.header.Header;

public class SendMessageBatchProtocolResolver {
public static CloudEvent buildEvent(Header header, Body body) {
return null;
}
}
Loading

0 comments on commit 6c2a85a

Please sign in to comment.