Skip to content

Commit

Permalink
[ISSUE apache#563] SDK SUPPORT CLOUD EVENT (apache#575)
Browse files Browse the repository at this point in the history
* SDK SUPPORT CLOUD EVENT

* SDK SUPPORT CLOUD EVENT

* SDK SUPPORT CLOUD EVENT

* SDK SUPPORT CLOUD EVENT

* SDK SUPPORT CLOUD EVENT

* SDK SUPPORT CLOUD EVENT

* SDK SUPPORT CLOUD EVENT

* SDK SUPPORT CLOUD EVENT

* SDK SUPPORT CLOUD EVENT

Co-authored-by: wangshaojie <wangshaojie@cmss.chinamobile.com>
  • Loading branch information
2 people authored and xwm1992 committed Dec 27, 2021
1 parent d04d1c3 commit 5ad08cb
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,32 @@

package org.apache.eventmesh.common.protocol.tcp;

import java.util.HashMap;
import java.util.Map;

public class Header {

private Command cmd;
private int code;
private String msg;
private String dsec;
private String seq;
private Map<String,Object> properties;

public Header() {
}

public Header(Command cmd, int code, String msg, String seq) {
public Header(Command cmd, int code, String dsec, String seq) {
this.cmd = cmd;
this.code = code;
this.msg = msg;
this.dsec = dsec;
this.seq = seq;
}

public Header(int code, String dsec, String seq, Map<String, Object> properties) {
this.code = code;
this.dsec = dsec;
this.seq = seq;
this.properties = properties;
}

public Command getCommand() {
Expand All @@ -50,12 +61,12 @@ public void setCode(int code) {
this.code = code;
}

public String getMsg() {
return msg;
public String getDsec() {
return dsec;
}

public void setMsg(String msg) {
this.msg = msg;
public void setDsec(String dsec) {
this.dsec = dsec;
}

public String getSeq() {
Expand All @@ -66,13 +77,38 @@ public void setSeq(String seq) {
this.seq = seq;
}

public Map<String, Object> getProperties() {
return properties;
}

public void setProperties(Map<String, Object> properties) {
this.properties = properties;
}

public void putProperty(final String name, final Object value) {
if (null == this.properties) {
this.properties = new HashMap<>();
}

this.properties.put(name, value);
}

public Object getProperty(final String name) {
if (null == this.properties) {
this.properties = new HashMap<>();
}

return this.properties.get(name);
}

@Override
public String toString() {
return "Header{" +
"cmd=" + cmd +
", code=" + code +
", msg='" + msg + '\'' +
", seq='" + seq + '\'' +
'}';
"cmd=" + cmd +
", code=" + code +
", dsec='" + dsec + '\'' +
", seq='" + seq + '\'' +
", properties=" + properties +
'}';
}
}
2 changes: 1 addition & 1 deletion eventmesh-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies {
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'io.netty:netty-all'

implementation "io.cloudevents:cloudevents-core"
testImplementation project(":eventmesh-sdk-java")
testImplementation project(":eventmesh-common")
testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncSubscribe implements ReceiveMsgHook {
public class AsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(AsyncSubscribe.class);

Expand Down Expand Up @@ -68,7 +68,12 @@ public static void main(String[] agrs) throws Exception {

@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody();
EventMeshMessage eventMeshMessage = convert(msg);
logger.info("receive async msg====================={}", eventMeshMessage);
}

@Override
public EventMeshMessage convert(Package pkg) {
return (EventMeshMessage) pkg.getBody();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncSubscribeBroadcast implements ReceiveMsgHook {
public class AsyncSubscribeBroadcast implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(AsyncSubscribeBroadcast.class);

Expand Down Expand Up @@ -68,7 +68,12 @@ public static void main(String[] agrs) throws Exception {

@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody();
EventMeshMessage eventMeshMessage = convert(msg);
logger.info("receive broadcast msg==============={}", eventMeshMessage);
}

@Override
public EventMeshMessage convert(Package pkg) {
return (EventMeshMessage) pkg.getBody();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncResponse implements ReceiveMsgHook {
public class SyncResponse implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(SyncResponse.class);

Expand Down Expand Up @@ -66,4 +67,9 @@ public void handle(Package msg, ChannelHandlerContext ctx) {
Package pkg = EventMeshTestUtils.rrResponse(msg);
ctx.writeAndFlush(pkg);
}

@Override
public EventMeshMessage convert(Package pkg) {
return null;
}
}
2 changes: 2 additions & 0 deletions eventmesh-sdk-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ dependencies {
implementation "io.netty:netty-all"
implementation "org.apache.httpcomponents:httpclient"

implementation "io.cloudevents:cloudevents-core"

testImplementation project(":eventmesh-common")
testImplementation project(":eventmesh-common")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.client.tcp;

import io.cloudevents.CloudEvent;
import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.common.protocol.SubscriptionType;
Expand All @@ -31,6 +32,10 @@ public interface EventMeshClient {

Package publish(Package msg, long timeout) throws Exception;

Package publish(CloudEvent cloudEvent, long timeout) throws Exception;

void broadcast(CloudEvent cloudEvent, long timeout) throws Exception;

void broadcast(Package msg, long timeout) throws Exception;

void init() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.client.tcp;


import io.cloudevents.CloudEvent;
import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.common.protocol.tcp.Package;
Expand All @@ -39,6 +40,10 @@ public interface SimplePubClient {

Package publish(Package msg, long timeout) throws Exception;

Package publish(CloudEvent cloudEvent, long timeout) throws Exception;

void broadcast(CloudEvent cloudEvent, long timeout) throws Exception;

void broadcast(Package msg, long timeout) throws Exception;

void registerBusiHandler(ReceiveMsgHook handler) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,6 @@ public class EventMeshCommon {
public static String PREFIX_SESSION_TPS_STAT_EVENTSEND = "event_send_tps_";

public static String PREFIX_SESSION_TPS_STAT_EVENTREV = "event_rev_tps_";

public static String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import io.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
Expand Down Expand Up @@ -76,6 +77,18 @@ public static Package asyncMessageAck(Package in) {
return msg;
}

public static Package asyncCloudEvent(CloudEvent cloudEvent) {
Package msg = new Package();
msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_SERVER, 0,
null, generateRandomString(seqLength)));
msg.getHeader().putProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL,
EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME);
msg.getHeader().putProperty(PropertyConst.PROPERTY_CLOUD_EVENT_VERSION,
cloudEvent.getSpecVersion().toString());
msg.setBody(cloudEvent);
return msg;
}

public static Package broadcastMessageAck(Package in) {
Package msg = new Package();
msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0, null, in.getHeader().getSeq()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.eventmesh.client.tcp.common;

/**
* properties key name
*/
public class PropertyConst {

public static String PROPERTY_MESSAGE_PROTOCOL = "message_protocol";

public static String PROPERTY_CLOUD_EVENT_VERSION = "cloud_event_version";
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
import io.netty.channel.ChannelHandlerContext;
import org.apache.eventmesh.common.protocol.tcp.Package;

public interface ReceiveMsgHook {
/**
* ReceiveMsgHook.
*
* @param <T> receive message type.
*/
public interface ReceiveMsgHook<T> {
void handle(Package msg, ChannelHandlerContext ctx);

T convert(Package pkg);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.client.tcp.impl;


import io.cloudevents.CloudEvent;
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.SimplePubClient;
import org.apache.eventmesh.client.tcp.SimpleSubClient;
Expand Down Expand Up @@ -73,10 +74,20 @@ public Package publish(Package msg, long timeout) throws Exception {
return this.pubClient.publish(msg, timeout);
}

@Override
public Package publish(CloudEvent cloudEvent, long timeout) throws Exception {
return this.pubClient.publish(cloudEvent, timeout);
}

public void broadcast(Package msg, long timeout) throws Exception {
this.pubClient.broadcast(msg, timeout);
}

@Override
public void broadcast(CloudEvent cloudEvent, long timeout) throws Exception {
this.pubClient.broadcast(cloudEvent, timeout);
}

public void init() throws Exception {
this.subClient.init();
this.pubClient.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import io.cloudevents.CloudEvent;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
Expand All @@ -29,6 +30,7 @@
import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.client.tcp.common.PropertyConst;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.RequestContext;
import org.apache.eventmesh.client.tcp.common.TcpClient;
Expand Down Expand Up @@ -145,6 +147,25 @@ public Package publish(Package msg, long timeout) throws Exception {
return io(msg, timeout);
}


@Override
public Package publish(CloudEvent cloudEvent, long timeout) throws Exception {
Package msg = MessageUtils.asyncCloudEvent(cloudEvent);
logger.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}",
clientNo, msg.getHeader().getCommand(),
msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg);
return io(MessageUtils.asyncCloudEvent(cloudEvent), timeout);
}

@Override
public void broadcast(CloudEvent cloudEvent, long timeout) throws Exception {
Package msg = MessageUtils.asyncCloudEvent(cloudEvent);
logger.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}",
clientNo, msg.getHeader().getCommand(),
msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg);
super.send(msg);
}

/**
* Send broadcast message
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncSubscribe implements ReceiveMsgHook {
public class AsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(AsyncSubscribe.class);

Expand Down Expand Up @@ -63,7 +63,12 @@ public static void main(String[] agrs) throws Exception {

@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody();
EventMeshMessage eventMeshMessage = convert(msg);
logger.info("receive async msg====================={}", eventMeshMessage);
}

@Override
public EventMeshMessage convert(Package pkg) {
return (EventMeshMessage) pkg.getBody();
}
}
Loading

0 comments on commit 5ad08cb

Please sign in to comment.