From fee81c05a467f898170271559c627f805cac9eab Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Tue, 23 Nov 2021 18:27:50 +0800 Subject: [PATCH 1/4] update java sdk --- .../tcp/common/EventMeshTestUtils.java | 57 ++++--- .../http/producer/CloudEventProducer.java | 4 + .../producer/EventMeshMessageProducer.java | 9 ++ .../client/tcp/common/EventMeshCommon.java | 2 + .../client/tcp/common/MessageUtils.java | 27 +++- .../client/tcp/common/TcpClient.java | 3 +- .../cloudevent/CloudEventTCPPubClient.java | 142 +++++++++++++++-- .../cloudevent/CloudEventTCPSubClient.java | 150 ++++++++++++++++-- .../EventMeshMessageTCPPubClient.java | 10 +- .../EventMeshMessageTCPSubClient.java | 41 ++--- 10 files changed, 359 insertions(+), 86 deletions(-) diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java index 88f5016e9d..854eb5b58c 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java @@ -24,7 +24,6 @@ import java.util.concurrent.ThreadLocalRandom; -import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Header; @@ -35,37 +34,37 @@ public class EventMeshTestUtils { private static final int seqLength = 10; public static UserAgent generateClient1() { - UserAgent user = new UserAgent(); - user.setEnv("test"); - user.setHost("127.0.0.1"); - user.setPassword(generateRandomString(8)); - user.setUsername("PU4283"); - user.setProducerGroup("EventmeshTest-ProducerGroup"); - user.setConsumerGroup("EventmeshTest-ConsumerGroup"); - user.setPath("/data/app/umg_proxy"); - user.setPort(8362); - user.setSubsystem("5023"); - user.setPid(32893); - user.setVersion("2.0.11"); - user.setIdc("FT"); - return user; + return UserAgent.builder() + .env("test") + .host("127.0.0.1") + .password(generateRandomString(8)) + .username("PU4283") + .producerGroup("EventmeshTest-ProducerGroup") + .consumerGroup("EventmeshTest-ConsumerGroup") + .path("/data/app/umg_proxy") + .port(8362) + .subsystem("5023") + .pid(32893) + .version("2.0.11") + .idc("FT") + .build(); } public static UserAgent generateClient2() { - UserAgent user = new UserAgent(); - user.setEnv("test"); - user.setHost("127.0.0.1"); - user.setPassword(generateRandomString(8)); - user.setUsername("PU4283"); - user.setConsumerGroup("EventmeshTest-ConsumerGroup"); - user.setProducerGroup("EventmeshTest-ProducerGroup"); - user.setPath("/data/app/umg_proxy"); - user.setPort(9362); - user.setSubsystem("5017"); - user.setPid(42893); - user.setVersion("2.0.11"); - user.setIdc("FT"); - return user; + return UserAgent.builder() + .env("test") + .host("127.0.0.1") + .password(generateRandomString(8)) + .username("PU4283") + .producerGroup("EventmeshTest-ProducerGroup") + .consumerGroup("EventmeshTest-ConsumerGroup") + .path("/data/app/umg_proxy") + .port(9362) + .subsystem("5017") + .pid(42893) + .version("2.0.11") + .idc("FT") + .build(); } public static Package syncRR() { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/CloudEventProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/CloudEventProducer.java index 6d6ff2f63d..49905ad1ac 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/CloudEventProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/CloudEventProducer.java @@ -29,6 +29,8 @@ class CloudEventProducer extends AbstractHttpClient implements EventMeshProtocol private static final String PROTOCOL_TYPE = "cloudevents"; + private static final String PROTOCOL_DESC = "http"; + public CloudEventProducer(EventMeshHttpClientConfig eventMeshHttpClientConfig) throws EventMeshException { super(eventMeshHttpClientConfig); } @@ -105,6 +107,8 @@ private RequestParam buildCommonPostParam(CloudEvent cloudEvent) { .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword()) .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA) .addHeader(ProtocolKey.PROTOCOL_TYPE, PROTOCOL_TYPE) + .addHeader(ProtocolKey.PROTOCOL_DESC, PROTOCOL_DESC) + .addHeader(ProtocolKey.PROTOCOL_VERSION, cloudEvent.getSpecVersion().toString()) // todo: move producerGroup tp header .addBody(SendMessageRequestBody.PRODUCERGROUP, eventMeshHttpClientConfig.getProducerGroup()) .addBody(SendMessageRequestBody.CONTENT, JsonUtils.serialize(cloudEvent)); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/EventMeshMessageProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/EventMeshMessageProducer.java index 85f6153953..e9c79f92c6 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/EventMeshMessageProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/EventMeshMessageProducer.java @@ -1,5 +1,6 @@ package org.apache.eventmesh.client.http.producer; +import io.cloudevents.SpecVersion; import org.apache.eventmesh.client.http.AbstractHttpClient; import org.apache.eventmesh.client.http.EventMeshRetObj; import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig; @@ -26,6 +27,10 @@ @Slf4j class EventMeshMessageProducer extends AbstractHttpClient implements EventMeshProtocolProducer { + private static final String PROTOCOL_TYPE = "eventmeshmessage"; + + private static final String PROTOCOL_DESC = "http"; + public EventMeshMessageProducer(EventMeshHttpClientConfig eventMeshHttpClientConfig) throws EventMeshException { super(eventMeshHttpClientConfig); } @@ -106,6 +111,10 @@ private RequestParam buildCommonPostParam(EventMeshMessage message) { .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName()) .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword()) .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion()) + .addHeader(ProtocolKey.PROTOCOL_TYPE, PROTOCOL_TYPE) + .addHeader(ProtocolKey.PROTOCOL_DESC, PROTOCOL_DESC) + //default ce version is 1.0 + .addHeader(ProtocolKey.PROTOCOL_VERSION, SpecVersion.V1.toString()) .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA) .addBody(SendMessageRequestBody.PRODUCERGROUP, eventMeshHttpClientConfig.getProducerGroup()) // todo: set message to content is better diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java index 8238f52ceb..ca56422ff1 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java @@ -122,4 +122,6 @@ public class EventMeshCommon { public static String PREFIX_SESSION_TPS_STAT_EVENTREV = "event_rev_tps_"; public static String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents"; + + public static String EM_MESSAGE_PROTOCOL_NAME = "eventmeshmessage"; } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java index a0c1ba85bc..e799fb726d 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java @@ -22,6 +22,7 @@ import java.util.concurrent.ThreadLocalRandom; import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Subscription; @@ -78,16 +79,26 @@ public static Package asyncMessageAck(Package in) { return msg; } - public static Package asyncCloudEvent(CloudEvent cloudEvent) { + public static Package buildPackage(Object message, Command command) { Package msg = new Package(); - msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_SERVER, 0, - null, generateRandomString(seqLength))); - msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, - EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME); - msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, - cloudEvent.getSpecVersion().toString()); + msg.setHeader(new Header(command, 0, + null, generateRandomString(seqLength))); + if (message instanceof CloudEvent) { + msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, + EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME); + msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, + ((CloudEvent) message).getSpecVersion().toString()); + } else if (message instanceof EventMeshMessage) { + msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, + EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME); + msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, + SpecVersion.V1.toString()); + } else { + // unsupported protocol for server + return msg; + } msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp"); - msg.setBody(cloudEvent); + msg.setBody(message); return msg; } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java index 2b9bfaf38b..46cf370e23 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java @@ -106,14 +106,13 @@ public void initChannel(SocketChannel ch) { } @Override - public void close() throws EventMeshException { + public void close() { try { channel.disconnect().sync(); workers.shutdownGracefully(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("close tcp client failed.|remote address={}", channel.remoteAddress(), e); - throw new EventMeshException(e); } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java index 51341e16e0..7a1d03b1bc 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java @@ -1,54 +1,132 @@ package org.apache.eventmesh.client.tcp.impl.cloudevent; + + import org.apache.eventmesh.client.tcp.EventMeshTCPPubClient; -import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; -import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.client.tcp.common.*; +import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; +import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPPubClient; +import org.apache.eventmesh.common.EventMeshMessage; import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; import io.cloudevents.CloudEvent; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + /** * A CloudEvent TCP publish client implementation. */ @Slf4j -public class CloudEventTCPPubClient implements EventMeshTCPPubClient { +public class CloudEventTCPPubClient extends TcpClient implements EventMeshTCPPubClient { + + private final UserAgent userAgent; + + private ReceiveMsgHook callback; + + private final ConcurrentHashMap callbackConcurrentHashMap = new ConcurrentHashMap<>(); + private ScheduledFuture task; + + public CloudEventTCPPubClient(EventMeshTcpClientConfig eventMeshTcpClientConfig) { + super(eventMeshTcpClientConfig); + this.userAgent = eventMeshTcpClientConfig.getUserAgent(); + } @Override public void init() throws EventMeshException { - + try { + open(new Handler()); + hello(); + } catch (Exception ex) { + throw new EventMeshException("Initialize EventMeshMessageTCPPubClient error", ex); + } } @Override public void heartbeat() throws EventMeshException { - + if (task != null) { + synchronized (EventMeshMessageTCPPubClient.class) { + task = scheduler.scheduleAtFixedRate(() -> { + try { + if (!isActive()) { + reconnect(); + } + Package msg = MessageUtils.heartBeat(); + io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ignore) { + // ignore + } + }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); + } + } } @Override public void reconnect() throws EventMeshException { - + try { + super.reconnect(); + hello(); + } catch (Exception ex) { + throw new EventMeshException("reconnect error", ex); + } } @Override - public Package rr(CloudEvent msg, long timeout) throws EventMeshException { - return null; + public Package rr(CloudEvent event, long timeout) throws EventMeshException { + try { + Package msg = MessageUtils.buildPackage(event, Command.REQUEST_TO_SERVER); + log.info("{}|rr|send|type={}|msg={}", clientNo, msg, msg); + return io(msg, timeout); + } catch (Exception ex) { + throw new EventMeshException("rr error"); + } } @Override - public void asyncRR(CloudEvent msg, AsyncRRCallback callback, long timeout) throws EventMeshException { - + public void asyncRR(CloudEvent event, AsyncRRCallback callback, long timeout) throws EventMeshException { + try { + Package msg = MessageUtils.buildPackage(event, Command.REQUEST_TO_SERVER); + super.send(msg); + this.callbackConcurrentHashMap.put((String) RequestContext._key(msg), callback); + } catch (Exception ex) { + // should trigger callback? + throw new EventMeshException("asyncRR error", ex); + } } @Override public Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshException { - return null; + try { + // todo: transform EventMeshMessage to Package + Package msg = MessageUtils.buildPackage(cloudEvent, Command.ASYNC_MESSAGE_TO_SERVER); + log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}", + clientNo, msg.getHeader().getCommand(), + msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); + return io(msg, timeout); + } catch (Exception ex) { + throw new EventMeshException("publish error", ex); + } } @Override public void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshException { - + try { + // todo: transform EventMeshMessage to Package + Package msg = MessageUtils.buildPackage(cloudEvent, Command.BROADCAST_MESSAGE_TO_SERVER); + log.info("{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), + msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); + super.send(msg); + } catch (Exception ex) { + throw new EventMeshException("Broadcast message error", ex); + } } @Override @@ -57,7 +135,45 @@ public void registerBusiHandler(ReceiveMsgHook handler) throws Event } @Override - public void close() throws EventMeshException { + public void close() { + + } + + // todo: move to abstract class + @ChannelHandler.Sharable + private class Handler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Exception { + log.info("SimplePubClientImpl|{}|receive|type={}|msg={}", clientNo, msg.getHeader(), msg); + + Command cmd = msg.getHeader().getCommand(); + if (cmd == Command.RESPONSE_TO_CLIENT) { + if (callback != null) { + callback.handle(msg, ctx); + } + Package pkg = MessageUtils.responseToClientAck(msg); + send(pkg); + } else if (cmd == Command.SERVER_GOODBYE_REQUEST) { + //TODO + } + + RequestContext context = contexts.get(RequestContext._key(msg)); + if (context != null) { + contexts.remove(context.getKey()); + context.finish(msg); + } + } + } + + // todo: remove hello + private void hello() throws Exception { + Package msg = MessageUtils.hello(userAgent); + this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } + // todo: remove goodbye + private void goodbye() throws Exception { + Package msg = MessageUtils.goodbye(); + this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java index d726c4b90c..716feae3cb 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java @@ -1,59 +1,191 @@ package org.apache.eventmesh.client.tcp.impl.cloudevent; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.commons.collections4.CollectionUtils; import org.apache.eventmesh.client.tcp.EventMeshTCPSubClient; -import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.client.tcp.common.*; +import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; +import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPSubClient; +import org.apache.eventmesh.common.EventMeshMessage; import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; +import org.apache.eventmesh.common.protocol.tcp.Command; +import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import io.cloudevents.CloudEvent; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + /** * CloudEvent TCP subscribe client implementation. */ @Slf4j -public class CloudEventTCPSubClient implements EventMeshTCPSubClient { +public class CloudEventTCPSubClient extends TcpClient implements EventMeshTCPSubClient { + + private final UserAgent userAgent; + private final List subscriptionItems = Collections.synchronizedList(new ArrayList<>()); + private ReceiveMsgHook callback; + private ScheduledFuture task; + + public CloudEventTCPSubClient(EventMeshTcpClientConfig eventMeshTcpClientConfig) { + super(eventMeshTcpClientConfig); + this.userAgent = eventMeshTcpClientConfig.getUserAgent(); + } @Override public void init() throws EventMeshException { - + try { + open(new Handler()); + hello(); + log.info("SimpleSubClientImpl|{}|started!", clientNo); + } catch (Exception ex) { + throw new EventMeshException("Initialize EventMeshMessageTcpSubClient error", ex); + } } @Override public void heartbeat() throws EventMeshException { - + if (task == null) { + synchronized (EventMeshMessageTCPSubClient.class) { + task = scheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (!isActive()) { + reconnect(); + } + Package msg = MessageUtils.heartBeat(); + io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ignore) { + // + } + } + }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); + } + } } @Override public void reconnect() throws EventMeshException { - + try { + super.reconnect(); + hello(); + if (!CollectionUtils.isEmpty(subscriptionItems)) { + for (SubscriptionItem item : subscriptionItems) { + Package request = MessageUtils.subscribe(item.getTopic(), item.getMode(), item.getType()); + this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } + } + listen(); + } catch (Exception ex) { + // + } } @Override public void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws EventMeshException { - + try { + subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType)); + Package request = MessageUtils.subscribe(topic, subscriptionMode, subscriptionType); + io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ex) { + throw new EventMeshException("Subscribe error", ex); + } } @Override public void unsubscribe() throws EventMeshException { - + try { + Package request = MessageUtils.unsubscribe(); + io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ex) { + throw new EventMeshException("Unsubscribe error", ex); + } } @Override public void listen() throws EventMeshException { + try { + Package request = MessageUtils.listen(); + io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ex) { + throw new EventMeshException("Listen error", ex); + } + } + + private void goodbye() throws Exception { + Package msg = MessageUtils.goodbye(); + this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } + private void hello() throws Exception { + Package msg = MessageUtils.hello(userAgent); + this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); } @Override public void registerBusiHandler(ReceiveMsgHook handler) throws EventMeshException { - + this.callback = handler; } @Override - public void close() throws EventMeshException{ + public void close() { + try { + task.cancel(false); + goodbye(); + super.close(); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + private class Handler extends SimpleChannelInboundHandler { + @SuppressWarnings("Duplicates") + @Override + protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Exception { + Command cmd = msg.getHeader().getCommand(); + log.info("|receive|type={}|msg={}", cmd, msg); + if (cmd == Command.REQUEST_TO_CLIENT) { + if (callback != null) { + callback.handle(msg, ctx); + } + Package pkg = MessageUtils.requestToClientAck(msg); + send(pkg); + } else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) { + Package pkg = MessageUtils.asyncMessageAck(msg); + if (callback != null) { + callback.handle(msg, ctx); + } + send(pkg); + } else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) { + Package pkg = MessageUtils.broadcastMessageAck(msg); + if (callback != null) { + callback.handle(msg, ctx); + } + send(pkg); + } else if (cmd == Command.SERVER_GOODBYE_REQUEST) { + //TODO + } else { + log.error("msg ignored|{}|{}", cmd, msg); + } + RequestContext context = contexts.get(RequestContext._key(msg)); + if (context != null) { + contexts.remove(context.getKey()); + context.finish(msg); + } else { + log.error("msg ignored,context not found.|{}|{}", cmd, msg); + } + } } + } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java index ba9e0afda4..1f4e774507 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java @@ -85,7 +85,7 @@ public void reconnect() throws EventMeshException { @Override public Package rr(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { try { - Package msg = MessageUtils.asyncCloudEvent(eventMeshMessage); + Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.REQUEST_TO_SERVER); log.info("{}|rr|send|type={}|msg={}", clientNo, msg, msg); return io(msg, timeout); } catch (Exception ex) { @@ -96,7 +96,7 @@ public Package rr(EventMeshMessage eventMeshMessage, long timeout) throws EventM @Override public void asyncRR(EventMeshMessage eventMeshMessage, AsyncRRCallback callback, long timeout) throws EventMeshException { try { - Package msg = MessageUtils.asyncCloudEvent(eventMeshMessage); + Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.REQUEST_TO_SERVER); super.send(msg); this.callbackConcurrentHashMap.put((String) RequestContext._key(msg), callback); } catch (Exception ex) { @@ -109,7 +109,7 @@ public void asyncRR(EventMeshMessage eventMeshMessage, AsyncRRCallback callback, public Package publish(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { try { // todo: transform EventMeshMessage to Package - Package msg = MessageUtils.asyncCloudEvent(eventMeshMessage); + Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.ASYNC_MESSAGE_TO_SERVER); log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); @@ -123,7 +123,7 @@ public Package publish(EventMeshMessage eventMeshMessage, long timeout) throws E public void broadcast(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { try { // todo: transform EventMeshMessage to Package - Package msg = MessageUtils.asyncCloudEvent(eventMeshMessage); + Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.BROADCAST_MESSAGE_TO_SERVER); log.info("{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); super.send(msg); @@ -138,7 +138,7 @@ public void registerBusiHandler(ReceiveMsgHook receiveMsgHook) } @Override - public void close() throws EventMeshException { + public void close() { try { task.cancel(false); goodbye(); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java index 08aa170083..cdce845a96 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java @@ -113,6 +113,25 @@ public void unsubscribe() throws EventMeshException { } } + private void goodbye() throws Exception { + Package msg = MessageUtils.goodbye(); + this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } + + private void hello() throws Exception { + Package msg = MessageUtils.hello(userAgent); + this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } + + public void listen() throws EventMeshException { + try { + Package request = MessageUtils.listen(); + io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ex) { + throw new EventMeshException("Listen error", ex); + } + } + @Override public void registerBusiHandler(ReceiveMsgHook receiveMsgHook) throws EventMeshException { @@ -120,13 +139,13 @@ public void registerBusiHandler(ReceiveMsgHook receiveMsgHook) } @Override - public void close() throws EventMeshException { + public void close() { try { task.cancel(false); goodbye(); super.close(); } catch (Exception ex) { - throw new EventMeshException("Close EventMeshMessageTcpSubClient error", ex); + ex.printStackTrace(); } } @@ -169,22 +188,4 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep } } - private void goodbye() throws Exception { - Package msg = MessageUtils.goodbye(); - this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } - - private void hello() throws Exception { - Package msg = MessageUtils.hello(userAgent); - this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } - - public void listen() throws EventMeshException { - try { - Package request = MessageUtils.listen(); - io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } catch (Exception ex) { - throw new EventMeshException("Listen error", ex); - } - } } From add1d1c319eed5f13f94852f2b6ccc5e34ff58ac Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Wed, 24 Nov 2021 15:18:30 +0800 Subject: [PATCH 2/4] fix compile error --- eventmesh-common/build.gradle | 6 +- .../SendMessageBatchV2ProtocolResolver.java | 58 ++++++------ .../processor/SendSyncMessageProcessor.java | 7 +- .../http/push/AsyncHTTPPushRequest.java | 4 +- .../group/ClientSessionGroupMapping.java | 94 +++++++++++-------- .../eventmesh/runtime/util/EventMeshUtil.java | 57 ++++++----- .../eventmesh/runtime/util/OMSUtil.java | 73 -------------- 7 files changed, 126 insertions(+), 173 deletions(-) delete mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/OMSUtil.java diff --git a/eventmesh-common/build.gradle b/eventmesh-common/build.gradle index 84a03a5d39..699e9c4fdb 100644 --- a/eventmesh-common/build.gradle +++ b/eventmesh-common/build.gradle @@ -34,9 +34,9 @@ dependencies { implementation "com.lmax:disruptor" - implementation "com.fasterxml.jackson.core:jackson-databind" - implementation "com.fasterxml.jackson.core:jackson-core" - implementation "com.fasterxml.jackson.core:jackson-annotations" + api "com.fasterxml.jackson.core:jackson-databind" + api "com.fasterxml.jackson.core:jackson-core" + api "com.fasterxml.jackson.core:jackson-annotations" implementation "org.apache.httpcomponents:httpclient" diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java index fedbbe046e..2c33aec9bf 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java @@ -5,7 +5,9 @@ import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.core.v03.CloudEventV03; import io.cloudevents.core.v1.CloudEventV1; + import org.apache.commons.lang3.StringUtils; + import org.apache.eventmesh.common.protocol.http.body.Body; import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; @@ -42,37 +44,37 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) { event = JsonUtils.deserialize(content, CloudEventV1.class); event = CloudEventBuilder.from(event) - .withExtension(ProtocolKey.REQUEST_CODE, code) - .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) - .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) - .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) - .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) - .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) - .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) - .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) - .withExtension(ProtocolKey.VERSION, version.getVersion()) - .withExtension(ProtocolKey.LANGUAGE, language) - .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) - .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) - .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) - .build(); + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .build(); } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) { event = JsonUtils.deserialize(content, CloudEventV03.class); event = CloudEventBuilder.from(event) - .withExtension(ProtocolKey.REQUEST_CODE, code) - .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) - .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) - .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) - .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) - .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) - .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) - .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) - .withExtension(ProtocolKey.VERSION, version.getVersion()) - .withExtension(ProtocolKey.LANGUAGE, language) - .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) - .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) - .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) - .build(); + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .build(); } return event; } catch (Exception e) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java index ef025a007a..5842eed21b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java @@ -273,8 +273,11 @@ public void onSuccess(CloudEvent event) { HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse( sendMessageResponseHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(), - JsonUtils.serialize(new SendMessageResponseBody.ReplyMessage(topic, - rtnMsg)))); + JsonUtils.serialize(SendMessageResponseBody.ReplyMessage.builder() + .topic(topic) + .body(rtnMsg) + .properties(EventMeshUtil.getEventProp(event)) + .build()))); asyncContext.onComplete(succ, handler); } catch (Exception ex) { HttpCommand err = asyncContext.getRequest().createHttpCommandResponse( diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index 5fa56ef70f..0bcf02c6c6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -36,7 +36,7 @@ import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext; -import org.apache.eventmesh.runtime.util.OMSUtil; +import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; @@ -150,7 +150,7 @@ public void tryHTTPRequest() { body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic())); body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS, - JsonUtils.serialize(OMSUtil.getEventProp(handleMsgContext.getEvent())))); + JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent())))); try { builder.setEntity(new UrlEncodedFormEntity(body)); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java index 674fb18e83..8f3e383eb4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java @@ -46,6 +46,7 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext; import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.RemotingHelper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,9 +57,11 @@ public class ClientSessionGroupMapping { private ConcurrentHashMap sessionTable = new ConcurrentHashMap<>(); - private ConcurrentHashMap clientGroupMap = new ConcurrentHashMap(); + private ConcurrentHashMap clientGroupMap = + new ConcurrentHashMap(); - private ConcurrentHashMap lockMap = new ConcurrentHashMap(); + private ConcurrentHashMap lockMap = + new ConcurrentHashMap(); private EventMeshTCPServer eventMeshTCPServer; @@ -123,7 +126,7 @@ public synchronized void closeSession(ChannelHandlerContext ctx) throws Exceptio @Override public void operationComplete(ChannelFuture future) throws Exception { logger.info("close the connection to remote address[{}] result: {}", remoteAddress, - future.isSuccess()); + future.isSuccess()); } }); sessionLogger.info("session|close|succeed|address={}|msg={}", addr, "no session was found"); @@ -169,7 +172,7 @@ private void closeSession(Session session) throws Exception { @Override public void operationComplete(ChannelFuture future) throws Exception { logger.info("close the connection to remote address[{}] result: {}", remoteAddress, - future.isSuccess()); + future.isSuccess()); } }); } @@ -179,7 +182,8 @@ public void operationComplete(ChannelFuture future) throws Exception { private ClientGroupWrapper constructClientGroupWrapper(String sysId, String producerGroup, String consumerGroup, EventMeshTCPServer eventMeshTCPServer, DownstreamDispatchStrategy downstreamDispatchStrategy) { - return new ClientGroupWrapper(sysId, producerGroup, consumerGroup, eventMeshTCPServer, downstreamDispatchStrategy); + return new ClientGroupWrapper(sysId, producerGroup, consumerGroup, eventMeshTCPServer, + downstreamDispatchStrategy); } private void initClientGroupWrapper(UserAgent user, Session session) throws Exception { @@ -192,7 +196,7 @@ private void initClientGroupWrapper(UserAgent user, Session session) throws Exce synchronized (lockMap.get(user.getSubsystem())) { if (!clientGroupMap.containsKey(user.getSubsystem())) { ClientGroupWrapper cgw = constructClientGroupWrapper(user.getSubsystem(), user.getProducerGroup(), - user.getConsumerGroup(), eventMeshTCPServer, new FreePriorityDispatchStrategy()); + user.getConsumerGroup(), eventMeshTCPServer, new FreePriorityDispatchStrategy()); clientGroupMap.put(user.getSubsystem(), cgw); logger.info("create new ClientGroupWrapper, subsystem:{}", user.getSubsystem()); } @@ -297,32 +301,40 @@ private void handleUnackMsgsInSession(Session session) { for (Map.Entry entry : unAckMsg.entrySet()) { DownStreamMsgContext downStreamMsgContext = entry.getValue(); if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) { - logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}", downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event), session.getClient()); + logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}", + downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event), + session.getClient()); continue; } - Session reChooseSession = session.getClientGroupWrapper().get().getDownstreamDispatchStrategy().select(session.getClientGroupWrapper().get().getConsumerGroup(), - downStreamMsgContext.event.getSubject(), Objects.requireNonNull(session.getClientGroupWrapper().get()).groupConsumerSessions); + Session reChooseSession = session.getClientGroupWrapper().get().getDownstreamDispatchStrategy() + .select(session.getClientGroupWrapper().get().getConsumerGroup(), + downStreamMsgContext.event.getSubject(), + Objects.requireNonNull(session.getClientGroupWrapper().get()).groupConsumerSessions); if (reChooseSession != null) { downStreamMsgContext.session = reChooseSession; reChooseSession.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); reChooseSession.downstreamMsg(downStreamMsgContext); - logger.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(), downStreamMsgContext.session.getClient()); + logger.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(), + downStreamMsgContext.session.getClient()); } else { - logger.warn("select session fail in handleUnackMsgsInSession,seq:{},topic:{}", entry.getKey(), downStreamMsgContext.event.getSubject()); + logger.warn("select session fail in handleUnackMsgsInSession,seq:{},topic:{}", entry.getKey(), + downStreamMsgContext.event.getSubject()); } } } } private void cleanClientGroupWrapperCommon(Session session) throws Exception { - logger.info("GroupConsumerSessions size:{}", session.getClientGroupWrapper().get().getGroupConsumerSessions().size()); + logger.info("GroupConsumerSessions size:{}", + session.getClientGroupWrapper().get().getGroupConsumerSessions().size()); if (session.getClientGroupWrapper().get().getGroupConsumerSessions().size() == 0) { shutdownClientGroupConsumer(session); } - logger.info("GroupProducerSessions size:{}", session.getClientGroupWrapper().get().getGroupProducerSessions().size()); + logger.info("GroupProducerSessions size:{}", + session.getClientGroupWrapper().get().getGroupProducerSessions().size()); if ((session.getClientGroupWrapper().get().getGroupConsumerSessions().size() == 0) - && (session.getClientGroupWrapper().get().getGroupProducerSessions().size() == 0)) { + && (session.getClientGroupWrapper().get().getGroupProducerSessions().size() == 0)) { shutdownClientGroupProducer(session); clientGroupMap.remove(session.getClientGroupWrapper().get().getSysId()); @@ -350,22 +362,24 @@ private void shutdownClientGroupProducer(Session session) throws Exception { private void initSessionCleaner() { eventMeshTCPServer.getScheduler().scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - Iterator sessionIterator = sessionTable.values().iterator(); - while (sessionIterator.hasNext()) { - Session tmp = sessionIterator.next(); - if (System.currentTimeMillis() - tmp.getLastHeartbeatTime() > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) { - try { - logger.warn("clean expired session,client:{}", tmp.getClient()); - closeSession(tmp.getContext()); - } catch (Exception e) { - logger.error("say goodbye to session error! {}", tmp, e); - } - } - } - } - }, 1000, eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills, TimeUnit.MILLISECONDS); + @Override + public void run() { + Iterator sessionIterator = sessionTable.values().iterator(); + while (sessionIterator.hasNext()) { + Session tmp = sessionIterator.next(); + if (System.currentTimeMillis() - tmp.getLastHeartbeatTime() > + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) { + try { + logger.warn("clean expired session,client:{}", tmp.getClient()); + closeSession(tmp.getContext()); + } catch (Exception e) { + logger.error("say goodbye to session error! {}", tmp, e); + } + } + } + } + }, 1000, eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills, + TimeUnit.MILLISECONDS); } private void initDownStreamMsgContextCleaner() { @@ -386,7 +400,7 @@ public void run() { downStreamMsgContext.ackMsg(); tmp.getPusher().getUnAckMsg().remove(seqKey); logger.warn("remove expire downStreamMsgContext, session:{}, topic:{}, seq:{}", tmp, - downStreamMsgContext.event.getSubject(), seqKey); + downStreamMsgContext.event.getSubject(), seqKey); } } } @@ -406,8 +420,8 @@ public void start() throws Exception { public void shutdown() throws Exception { logger.info("begin to close sessions gracefully"); - for(ClientGroupWrapper clientGroupWrapper : clientGroupMap.values()){ - for(Session subSession : clientGroupWrapper.getGroupConsumerSessions()){ + for (ClientGroupWrapper clientGroupWrapper : clientGroupMap.values()) { + for (Session subSession : clientGroupWrapper.getGroupConsumerSessions()) { try { EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, subSession, this); } catch (Exception e) { @@ -415,7 +429,7 @@ public void shutdown() throws Exception { } } - for(Session pubSession : clientGroupWrapper.getGroupProducerSessions()){ + for (Session pubSession : clientGroupWrapper.getGroupProducerSessions()) { try { EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, pubSession, this); } catch (Exception e) { @@ -437,7 +451,7 @@ public void shutdown() throws Exception { sessionTable.values().parallelStream().forEach(itr -> { try { - EventMeshTcp2Client.serverGoodby2Client(this.eventMeshTCPServer,itr, this); + EventMeshTcp2Client.serverGoodby2Client(this.eventMeshTCPServer, itr, this); } catch (Exception e) { logger.error("say goodbye to session error! {}", itr, e); } @@ -470,15 +484,15 @@ public Map> prepareEventMeshClientDistributionData( return result; } - public Map> prepareProxyClientDistributionData(){ + public Map> prepareProxyClientDistributionData() { Map> result = null; - if(!clientGroupMap.isEmpty()){ + if (!clientGroupMap.isEmpty()) { result = new HashMap<>(); - for(Map.Entry entry : clientGroupMap.entrySet()){ + for (Map.Entry entry : clientGroupMap.entrySet()) { Map map = new HashMap(); - map.put(EventMeshConstants.PURPOSE_SUB,entry.getValue().getGroupConsumerSessions().size()); - map.put(EventMeshConstants.PURPOSE_PUB,entry.getValue().getGroupProducerSessions().size()); + map.put(EventMeshConstants.PURPOSE_SUB, entry.getValue().getGroupConsumerSessions().size()); + map.put(EventMeshConstants.PURPOSE_PUB, entry.getValue().getGroupProducerSessions().size()); result.put(entry.getKey(), map); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java index 58e05e0d65..035e9d4c97 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java @@ -17,9 +17,6 @@ package org.apache.eventmesh.runtime.util; - -import static org.apache.eventmesh.runtime.util.OMSUtil.isOMSHeader; - import java.net.Inet6Address; import java.net.InetAddress; import java.net.NetworkInterface; @@ -28,6 +25,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Enumeration; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -149,6 +147,15 @@ public static String getMessageBizSeq(CloudEvent event) { return keys; } + public static Map getEventProp(CloudEvent event) { + Set extensionSet = event.getExtensionNames(); + Map prop = new HashMap<>(); + for (String extensionKey : extensionSet) { + prop.put(extensionKey, event.getExtension(extensionKey).toString()); + } + return prop; + } + // public static org.apache.rocketmq.common.message.Message decodeMessage(AccessMessage accessMessage) { // org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(); // msg.setTopic(accessMessage.getTopic()); @@ -160,28 +167,28 @@ public static String getMessageBizSeq(CloudEvent event) { // return msg; // } - public static Message decodeMessage(EventMeshMessage eventMeshMessage) { - Message omsMsg = new Message(); - omsMsg.setBody(eventMeshMessage.getBody().getBytes()); - omsMsg.setTopic(eventMeshMessage.getTopic()); - Properties systemProperties = new Properties(); - Properties userProperties = new Properties(); - - final Set> entries = eventMeshMessage.getProperties().entrySet(); - - for (final Map.Entry entry : entries) { - if (isOMSHeader(entry.getKey())) { - systemProperties.put(entry.getKey(), entry.getValue()); - } else { - userProperties.put(entry.getKey(), entry.getValue()); - } - } - - systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, eventMeshMessage.getTopic()); - omsMsg.setSystemProperties(systemProperties); - omsMsg.setUserProperties(userProperties); - return omsMsg; - } +// public static Message decodeMessage(EventMeshMessage eventMeshMessage) { +// Message omsMsg = new Message(); +// omsMsg.setBody(eventMeshMessage.getBody().getBytes()); +// omsMsg.setTopic(eventMeshMessage.getTopic()); +// Properties systemProperties = new Properties(); +// Properties userProperties = new Properties(); +// +// final Set> entries = eventMeshMessage.getProperties().entrySet(); +// +// for (final Map.Entry entry : entries) { +// if (isOMSHeader(entry.getKey())) { +// systemProperties.put(entry.getKey(), entry.getValue()); +// } else { +// userProperties.put(entry.getKey(), entry.getValue()); +// } +// } +// +// systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, eventMeshMessage.getTopic()); +// omsMsg.setSystemProperties(systemProperties); +// omsMsg.setUserProperties(userProperties); +// return omsMsg; +// } // public static AccessMessage encodeMessage(org.apache.rocketmq.common.message.Message msg) throws Exception { // AccessMessage accessMessage = new AccessMessage(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/OMSUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/OMSUtil.java deleted file mode 100644 index b8f2d1e8cb..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/OMSUtil.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to Apache Software Foundation (ASF) under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Apache Software Foundation (ASF) licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.eventmesh.runtime.util; - -import java.lang.reflect.Field; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import io.cloudevents.CloudEvent; -import io.openmessaging.api.Message; -import io.openmessaging.api.OMSBuiltinKeys; - -public class OMSUtil { - - public static boolean isOMSHeader(String value) { - for (Field field : OMSBuiltinKeys.class.getDeclaredFields()) { - try { - if (field.get(OMSBuiltinKeys.class).equals(value)) { - return true; - } - } catch (IllegalAccessException e) { - return false; - } - } - return false; - } - -// public static Properties convertKeyValue2Prop(KeyValue keyValue){ -// Properties properties = new Properties(); -// for (String key : keyValue.keySet()){ -// properties.put(key, keyValue.getString(key)); -// } -// return properties; -// } - - @SuppressWarnings("unchecked") - public static Map combineProp(Properties p1, Properties p2) { - Properties properties = new Properties(); - properties.putAll(p1); - properties.putAll(p2); - - return new HashMap<>((Map) properties); - } - - public static Map getEventProp(CloudEvent event) { - Set extensionSet = event.getExtensionNames(); - Map prop = new HashMap<>(); - for (String extensionKey : extensionSet) { - prop.put(extensionKey, event.getExtension(extensionKey).toString()); - } - return prop; - } - -} From 8c098e603ad82e78978a0fbd029b035ae68689a7 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Wed, 24 Nov 2021 17:56:31 +0800 Subject: [PATCH 3/4] fix sdk error --- .../common/protocol/tcp/UserAgent.java | 68 +++++++++++++++++++ eventmesh-examples/build.gradle | 4 ++ 2 files changed, 72 insertions(+) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java index 701cd6260e..6ad7a32072 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java @@ -20,6 +20,8 @@ import lombok.Builder; import lombok.Data; +import java.util.Objects; + @Data @Builder public class UserAgent { @@ -40,4 +42,70 @@ public class UserAgent { @Builder.Default private int unack = 0; + public UserAgent() { + } + + public UserAgent(String env, String subsystem, String path, int pid, String host, int port, String version, + String username, String password, String idc, String producerGroup, String consumerGroup, + String purpose, int unack) { + this.env = env; + this.subsystem = subsystem; + this.path = path; + this.pid = pid; + this.host = host; + this.port = port; + this.version = version; + this.username = username; + this.password = password; + this.idc = idc; + this.producerGroup = producerGroup; + this.consumerGroup = consumerGroup; + this.purpose = purpose; + this.unack = unack; + } + + @Override + public String toString() { + return String.format( + "UserAgent{env='%s'subsystem='%s', path='%s', pid=%d, host='%s', port=%d, version='%s', idc='%s', purpose='%s', unack='%d'}", + env, subsystem, path, pid, host, port, version, idc, purpose, unack); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + UserAgent userAgent = (UserAgent) o; + + if (pid != userAgent.pid) return false; + if (port != userAgent.port) return false; + if (unack != userAgent.unack) return false; + if (!Objects.equals(subsystem, userAgent.subsystem)) return false; + if (!Objects.equals(path, userAgent.path)) return false; + if (!Objects.equals(host, userAgent.host)) return false; + if (!Objects.equals(purpose, userAgent.purpose)) return false; + if (!Objects.equals(version, userAgent.version)) return false; + if (!Objects.equals(username, userAgent.username)) return false; + if (!Objects.equals(password, userAgent.password)) return false; + if (!Objects.equals(env, userAgent.env)) return false; + return Objects.equals(idc, userAgent.idc); + } + + @Override + public int hashCode() { + int result = subsystem != null ? subsystem.hashCode() : 0; + result = 31 * result + (path != null ? path.hashCode() : 0); + result = 31 * result + pid; + result = 31 * result + (host != null ? host.hashCode() : 0); + result = 31 * result + (purpose != null ? purpose.hashCode() : 0); + result = 31 * result + port; + result = 31 * result + (version != null ? version.hashCode() : 0); + result = 31 * result + (username != null ? username.hashCode() : 0); + result = 31 * result + (password != null ? password.hashCode() : 0); + result = 31 * result + (idc != null ? idc.hashCode() : 0); + result = 31 * result + (env != null ? env.hashCode() : 0); + result = 31 * result + unack; + return result; + } } diff --git a/eventmesh-examples/build.gradle b/eventmesh-examples/build.gradle index b7b1f654a3..3cb42bce3e 100644 --- a/eventmesh-examples/build.gradle +++ b/eventmesh-examples/build.gradle @@ -15,6 +15,10 @@ * limitations under the License. */ +configurations { + implementation.exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging' +} + dependencies { implementation project(":eventmesh-sdk-java") implementation project(":eventmesh-common") From 9f6e15593a1811cb37d48519bae2200016c208a4 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Thu, 25 Nov 2021 19:44:33 +0800 Subject: [PATCH 4/4] 1.remove the openmessage from connector-api 2.fix the standalone connector --- docs/cn/features/spi.md | 2 +- docs/en/features/spi.md | 2 +- .../eventmesh-connector-api/build.gradle | 2 +- .../org/apache/eventmesh/api/RRCallback.java | 56 +- .../api/consumer/MeshMQPushConsumer.java | 84 ++- .../api/producer/MeshMQProducer.java | 90 ++-- .../eventmesh/api/producer/Producer.java | 3 +- .../rocketmq/MessagingAccessPointImpl.java | 180 +++---- .../rocketmq/config/ClientConfig.java | 3 +- .../rocketmq/consumer/PushConsumerImpl.java | 15 +- .../consumer/RocketMQConsumerImpl.java | 16 - .../rocketmq/producer/ProducerImpl.java | 10 +- .../producer/RocketMQProducerImpl.java | 13 +- .../rocketmq/promise/DefaultPromise.java | 227 -------- .../connector/rocketmq/utils/OMSUtil.java | 490 +++++++++--------- ...rg.apache.eventmesh.api.consumer.Consumer} | 0 ...rg.apache.eventmesh.api.producer.Producer} | 0 ...ache.io.openmessaging.MessagingAccessPoint | 20 - .../standalone/broker/StandaloneBroker.java | 1 - .../broker/model/MessageEntity.java | 1 - .../producer/StandaloneProducer.java | 28 +- .../producer/StandaloneProducerAdaptor.java | 11 +- eventmesh-examples/build.gradle | 1 + .../tcp/common/EventMeshTestUtils.java | 9 +- .../org/apache/eventmesh/runtime/acl/Acl.java | 2 +- .../core/plugin/MQProducerWrapper.java | 1 - .../cloudevent/OMSMessageFactory.java | 99 ---- .../impl/OMSBinaryMessageReader.java | 90 ---- .../protocol/cloudevent/impl/OMSHeaders.java | 44 -- .../cloudevent/impl/OMSMessageWriter.java | 100 ---- .../http/producer/EventMeshProducer.java | 1 - .../client/session/send/SessionSender.java | 1 - .../protocol/tcp/client/task/HelloTask.java | 2 +- .../eventmesh/runtime/util/EventMeshUtil.java | 73 ++- .../EventMeshMessageTCPClient.java | 2 +- .../EventMeshMessageTCPPubClient.java | 2 +- .../EventMeshMessageTCPSubClient.java | 2 +- 37 files changed, 527 insertions(+), 1156 deletions(-) delete mode 100644 eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java rename eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/{org.apache.eventmesh.api.consumer.MeshMQPushConsumer => org.apache.eventmesh.api.consumer.Consumer} (100%) rename eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/{org.apache.eventmesh.api.producer.MeshMQProducer => org.apache.eventmesh.api.producer.Producer} (100%) delete mode 100644 eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint delete mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/OMSMessageFactory.java delete mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSBinaryMessageReader.java delete mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSHeaders.java delete mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSMessageWriter.java diff --git a/docs/cn/features/spi.md b/docs/cn/features/spi.md index b990d5956e..7499e655b1 100644 --- a/docs/cn/features/spi.md +++ b/docs/cn/features/spi.md @@ -101,7 +101,7 @@ public class RocketMQProducerImpl implements MeshMQProducer { ``` 同时,还需要在eventmesh-connector-rocketmq模块中resource/META-INF/eventmesh目录下创建文件名为SPI接口全限定名的文件 -org.apache.eventmesh.api.producer.MeshMQProducer +org.apache.eventmesh.api.producer.Producer 文件内容为扩展实例名和对应的实例全类名 diff --git a/docs/en/features/spi.md b/docs/en/features/spi.md index 1c26d17ed5..af1d62725e 100644 --- a/docs/en/features/spi.md +++ b/docs/en/features/spi.md @@ -105,7 +105,7 @@ public class RocketMQProducerImpl implements MeshMQProducer { At the same time, we need to create a file with the full qualified name of the SPI interface under the resource/META-INF/eventmesh directory in the eventmesh-connector-rocketmq module. -org.apache.eventmesh.api.producer.MeshMQProducer +org.apache.eventmesh.api.producer.Producer The content of the file is the extension instance name and the corresponding instance full class name diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle index 81d1568ec0..3c2c9ede9a 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle +++ b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle @@ -19,7 +19,7 @@ dependencies { implementation project(":eventmesh-spi") implementation project(":eventmesh-common") api 'io.cloudevents:cloudevents-core' - api 'io.openmessaging:openmessaging-api' +// api 'io.openmessaging:openmessaging-api' api 'io.dropwizard.metrics:metrics-core' api "io.dropwizard.metrics:metrics-healthchecks" api "io.dropwizard.metrics:metrics-annotation" diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java index 788f9f3906..eba4acdbaa 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java @@ -1,28 +1,28 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.api; - -import io.openmessaging.api.Message; - -public interface RRCallback { - - public void onSuccess(Message msg); - - public void onException(Throwable e); - -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.eventmesh.api; +// +//import io.openmessaging.api.Message; +// +//public interface RRCallback { +// +// public void onSuccess(Message msg); +// +// public void onException(Throwable e); +// +//} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java index dcf558b685..8747cd9945 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java @@ -1,44 +1,40 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.api.consumer; - -import java.util.List; -import java.util.Properties; - -import io.openmessaging.api.AsyncMessageListener; -import io.openmessaging.api.Consumer; -import io.openmessaging.api.Message; - -import org.apache.eventmesh.api.AbstractContext; -import org.apache.eventmesh.spi.EventMeshExtensionType; -import org.apache.eventmesh.spi.EventMeshSPI; - -@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) -public interface MeshMQPushConsumer extends Consumer { - - void init(Properties keyValue) throws Exception; - - void updateOffset(List msgs, AbstractContext context); - -// void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently); - - void subscribe(String topic, final AsyncMessageListener listener) throws Exception; - - @Override - void unsubscribe(String topic); -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.eventmesh.api.consumer; +// +//import java.util.List; +//import java.util.Properties; +// +//import org.apache.eventmesh.api.AbstractContext; +//import org.apache.eventmesh.spi.EventMeshExtensionType; +//import org.apache.eventmesh.spi.EventMeshSPI; +// +//@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) +//public interface MeshMQPushConsumer extends Consumer { +// +// void init(Properties keyValue) throws Exception; +// +// void updateOffset(List msgs, AbstractContext context); +// +//// void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently); +// +// void subscribe(String topic, final AsyncMessageListener listener) throws Exception; +// +// @Override +// void unsubscribe(String topic); +//} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java index d87be7d842..e44bd50a8e 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java @@ -1,45 +1,45 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.api.producer; - -import java.util.Properties; - -import io.openmessaging.api.Message; -import io.openmessaging.api.Producer; -import io.openmessaging.api.SendCallback; - -import org.apache.eventmesh.api.RRCallback; -import org.apache.eventmesh.spi.EventMeshExtensionType; -import org.apache.eventmesh.spi.EventMeshSPI; - -@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) -public interface MeshMQProducer extends Producer { - - void init(Properties properties) throws Exception; - - void send(Message message, SendCallback sendCallback) throws Exception; - - void request(Message message, RRCallback rrCallback, long timeout) throws Exception; - - boolean reply(final Message message, final SendCallback sendCallback) throws Exception; - - void checkTopicExist(String topic) throws Exception; - - void setExtFields(); - -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.eventmesh.api.producer; +// +//import java.util.Properties; +// +//import io.openmessaging.api.Message; +//import io.openmessaging.api.Producer; +//import io.openmessaging.api.SendCallback; +// +//import org.apache.eventmesh.api.RRCallback; +//import org.apache.eventmesh.spi.EventMeshExtensionType; +//import org.apache.eventmesh.spi.EventMeshSPI; +// +//@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) +//public interface MeshMQProducer extends Producer { +// +// void init(Properties properties) throws Exception; +// +// void send(Message message, SendCallback sendCallback) throws Exception; +// +// void request(Message message, RRCallback rrCallback, long timeout) throws Exception; +// +// boolean reply(final Message message, final SendCallback sendCallback) throws Exception; +// +// void checkTopicExist(String topic) throws Exception; +// +// void setExtFields(); +// +//} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java index 7a9e0320e5..5f14582eac 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java @@ -18,7 +18,6 @@ package org.apache.eventmesh.api.producer; import org.apache.eventmesh.api.LifeCycle; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; @@ -45,7 +44,7 @@ public interface Producer extends LifeCycle { void sendAsync(final CloudEvent cloudEvent, final SendCallback sendCallback); - void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception; +// void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception; void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception; diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java index 8770b7d484..06999487d3 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java @@ -1,90 +1,90 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.connector.rocketmq; - -import java.util.Properties; - -import io.openmessaging.api.Consumer; -import io.openmessaging.api.MessagingAccessPoint; -import io.openmessaging.api.Producer; -import io.openmessaging.api.PullConsumer; -import io.openmessaging.api.batch.BatchConsumer; -import io.openmessaging.api.order.OrderConsumer; -import io.openmessaging.api.order.OrderProducer; -import io.openmessaging.api.transaction.LocalTransactionChecker; -import io.openmessaging.api.transaction.TransactionProducer; - -public class MessagingAccessPointImpl implements MessagingAccessPoint { - - private Properties accessPointProperties; - - public MessagingAccessPointImpl(final Properties accessPointProperties) { - this.accessPointProperties = accessPointProperties; - } - - @Override - public String version() { - return null; - } - - @Override - public Properties attributes() { - return accessPointProperties; - } - - @Override - public Producer createProducer(Properties properties) { - return null; - } - - @Override - public OrderProducer createOrderProducer(Properties properties) { - return null; - } - - @Override - public TransactionProducer createTransactionProducer(Properties properties, LocalTransactionChecker checker) { - return null; - } - - @Override - public TransactionProducer createTransactionProducer(Properties properties) { - return null; - } - - @Override - public Consumer createConsumer(Properties properties) { - return null; - } - - @Override - public PullConsumer createPullConsumer(Properties properties) { - return null; - } - - @Override - public BatchConsumer createBatchConsumer(Properties properties) { - return null; - } - - @Override - public OrderConsumer createOrderedConsumer(Properties properties) { - return null; - } - -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.eventmesh.connector.rocketmq; +// +//import java.util.Properties; +// +//import io.openmessaging.api.Consumer; +//import io.openmessaging.api.MessagingAccessPoint; +//import io.openmessaging.api.Producer; +//import io.openmessaging.api.PullConsumer; +//import io.openmessaging.api.batch.BatchConsumer; +//import io.openmessaging.api.order.OrderConsumer; +//import io.openmessaging.api.order.OrderProducer; +//import io.openmessaging.api.transaction.LocalTransactionChecker; +//import io.openmessaging.api.transaction.TransactionProducer; +// +//public class MessagingAccessPointImpl implements MessagingAccessPoint { +// +// private Properties accessPointProperties; +// +// public MessagingAccessPointImpl(final Properties accessPointProperties) { +// this.accessPointProperties = accessPointProperties; +// } +// +// @Override +// public String version() { +// return null; +// } +// +// @Override +// public Properties attributes() { +// return accessPointProperties; +// } +// +// @Override +// public Producer createProducer(Properties properties) { +// return null; +// } +// +// @Override +// public OrderProducer createOrderProducer(Properties properties) { +// return null; +// } +// +// @Override +// public TransactionProducer createTransactionProducer(Properties properties, LocalTransactionChecker checker) { +// return null; +// } +// +// @Override +// public TransactionProducer createTransactionProducer(Properties properties) { +// return null; +// } +// +// @Override +// public Consumer createConsumer(Properties properties) { +// return null; +// } +// +// @Override +// public PullConsumer createPullConsumer(Properties properties) { +// return null; +// } +// +// @Override +// public BatchConsumer createBatchConsumer(Properties properties) { +// return null; +// } +// +// @Override +// public OrderConsumer createOrderedConsumer(Properties properties) { +// return null; +// } +// +//} diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java index aad0b49a53..f03a38a4a9 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java @@ -17,11 +17,10 @@ package org.apache.eventmesh.connector.rocketmq.config; -import io.openmessaging.api.OMSBuiltinKeys; import org.apache.eventmesh.connector.rocketmq.domain.NonStandardKeys; -public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { +public class ClientConfig implements NonStandardKeys { private String driverImpl; private String accessPoints; private String namespace; diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java index 7258131c23..dbc07030ea 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java @@ -50,7 +50,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import io.cloudevents.CloudEvent; -import io.openmessaging.api.exception.OMSRuntimeException; public class PushConsumerImpl { private final DefaultMQPushConsumer rocketmqPushConsumer; @@ -105,7 +104,7 @@ public void start() { try { this.rocketmqPushConsumer.start(); } catch (Exception e) { - throw new OMSRuntimeException(e.getMessage()); + throw new ConnectorRuntimeException(e.getMessage()); } } } @@ -137,8 +136,7 @@ public void subscribe(String topic, String subExpression, EventListener listener try { this.rocketmqPushConsumer.subscribe(topic, subExpression); } catch (MQClientException e) { - throw new OMSRuntimeException(-1, - String.format("RocketMQ push consumer can't attach to %s.", topic)); + throw new ConnectorRuntimeException(String.format("RocketMQ push consumer can't attach to %s.", topic)); } } @@ -148,8 +146,7 @@ public void unsubscribe(String topic) { try { this.rocketmqPushConsumer.unsubscribe(topic); } catch (Exception e) { - throw new OMSRuntimeException(-1, - String.format("RocketMQ push consumer fails to unsubscribe topic: %s", topic)); + throw new ConnectorRuntimeException(String.format("RocketMQ push consumer fails to unsubscribe topic: %s", topic)); } } @@ -186,8 +183,7 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic()); if (listener == null) { - throw new OMSRuntimeException(-1, - String.format("The topic/queue %s isn't attached to this consumer", + throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer", msg.getTopic())); } @@ -245,8 +241,7 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic()); if (listener == null) { - throw new OMSRuntimeException(-1, - String.format("The topic/queue %s isn't attached to this consumer", + throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer", msg.getTopic())); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java index 9626e2c0d4..1b3fe482ce 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java @@ -20,22 +20,13 @@ import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.api.EventListener; import org.apache.eventmesh.api.consumer.Consumer; -import org.apache.eventmesh.api.consumer.MeshMQPushConsumer; -import org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl; import org.apache.eventmesh.connector.rocketmq.common.Constants; import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants; import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration; import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper; -import org.apache.eventmesh.connector.rocketmq.patch.EventMeshConsumeConcurrentlyContext; -import org.apache.eventmesh.connector.rocketmq.utils.OMSUtil; - -import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService; -import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService; -import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.io.File; -import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -43,13 +34,6 @@ import org.slf4j.LoggerFactory; import io.cloudevents.CloudEvent; -import io.openmessaging.api.AsyncGenericMessageListener; -import io.openmessaging.api.AsyncMessageListener; -import io.openmessaging.api.GenericMessageListener; -import io.openmessaging.api.Message; -import io.openmessaging.api.MessageListener; -import io.openmessaging.api.MessageSelector; -import io.openmessaging.api.MessagingAccessPoint; public class RocketMQConsumerImpl implements Consumer { diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java index ec04b58bed..7831639ad6 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.connector.rocketmq.producer; -import org.apache.eventmesh.api.RRCallback; +import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.ConnectorRuntimeException; @@ -117,7 +117,7 @@ public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) { } } - public void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) + public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl()); @@ -142,12 +142,12 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac } - private RequestCallback rrCallbackConvert(final Message message, final RRCallback rrCallback) { + private RequestCallback rrCallbackConvert(final Message message, final RequestReplyCallback rrCallback) { return new RequestCallback() { @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) { - io.openmessaging.api.Message openMessage = OMSUtil.msgConvert((MessageExt) message); - rrCallback.onSuccess(openMessage); + CloudEvent event = RocketMQMessageFactory.createReader(message).toEvent(); + rrCallback.onSuccess(event); } @Override diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java index 54d06a0bf9..8ab3d969e4 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java @@ -17,12 +17,10 @@ package org.apache.eventmesh.connector.rocketmq.producer; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.producer.Producer; -import org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl; import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants; import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration; import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper; @@ -38,7 +36,6 @@ import org.slf4j.LoggerFactory; import io.cloudevents.CloudEvent; -import io.openmessaging.api.MessagingAccessPoint; public class RocketMQProducerImpl implements Producer { @@ -94,15 +91,15 @@ public void publish(CloudEvent message, SendCallback sendCallback) throws Except } @Override - public void request(CloudEvent message, RRCallback rrCallback, long timeout) + public void request(CloudEvent message, RequestReplyCallback rrCallback, long timeout) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { producer.request(message, rrCallback, timeout); } - @Override - public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { - - } +// @Override +// public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { +// +// } @Override public boolean reply(final CloudEvent message, final SendCallback sendCallback) throws Exception { diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java deleted file mode 100644 index cc30eb5ab0..0000000000 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.connector.rocketmq.promise; - -import java.util.ArrayList; -import java.util.List; - -import io.openmessaging.api.Future; -import io.openmessaging.api.FutureListener; -import io.openmessaging.api.exception.OMSRuntimeException; - -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; - -public class DefaultPromise implements Future { - private static final InternalLogger LOG = InternalLoggerFactory.getLogger(DefaultPromise.class); - private final Object lock = new Object(); - private volatile FutureState state = FutureState.DOING; - private V result = null; - private long timeout; - private long createTime; - private Throwable exception = null; - private List> promiseListenerList; - - public DefaultPromise() { - createTime = System.currentTimeMillis(); - promiseListenerList = new ArrayList<>(); - timeout = 5000; - } - - @Override - public boolean cancel(final boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return state.isCancelledState(); - } - - @Override - public boolean isDone() { - return state.isDoneState(); - } - - @Override - public V get() { - return result; - } - - @Override - public V get(final long timeout) { - synchronized (lock) { - if (!isDoing()) { - return getValueOrThrowable(); - } - - if (timeout <= 0) { - try { - lock.wait(); - } catch (Exception e) { - cancel(e); - } - return getValueOrThrowable(); - } else { - long waitTime = timeout - (System.currentTimeMillis() - createTime); - if (waitTime > 0) { - for (; ; ) { - try { - lock.wait(waitTime); - } catch (InterruptedException e) { - LOG.error("promise get value interrupted,excepiton:{}", e.getMessage()); - } - - if (!isDoing()) { - break; - } else { - waitTime = timeout - (System.currentTimeMillis() - createTime); - if (waitTime <= 0) { - break; - } - } - } - } - - if (isDoing()) { - timeoutSoCancel(); - } - } - return getValueOrThrowable(); - } - } - - public boolean set(final V value) { - if (value == null) { - return false; - } - this.result = value; - return done(); - } - - public boolean setFailure(final Throwable cause) { - if (cause == null) { - return false; - } - this.exception = cause; - return done(); - } - - @Override - public void addListener(final FutureListener listener) { - if (listener == null) { - throw new NullPointerException("FutureListener is null"); - } - - boolean notifyNow = false; - synchronized (lock) { - if (!isDoing()) { - notifyNow = true; - } else { - if (promiseListenerList == null) { - promiseListenerList = new ArrayList<>(); - } - promiseListenerList.add(listener); - } - } - - if (notifyNow) { - notifyListener(listener); - } - } - - @Override - public Throwable getThrowable() { - return exception; - } - - private void notifyListeners() { - if (promiseListenerList != null) { - for (FutureListener listener : promiseListenerList) { - notifyListener(listener); - } - } - } - - private boolean isSuccess() { - return isDone() && (exception == null); - } - - private void timeoutSoCancel() { - synchronized (lock) { - if (!isDoing()) { - return; - } - state = FutureState.CANCELLED; - exception = new RuntimeException("Get request result is timeout or interrupted"); - lock.notifyAll(); - } - notifyListeners(); - } - - private V getValueOrThrowable() { - if (exception != null) { - Throwable e = exception.getCause() != null ? exception.getCause() : exception; - throw new OMSRuntimeException("-1", e); - } - notifyListeners(); - return result; - } - - private boolean isDoing() { - return state.isDoingState(); - } - - private boolean done() { - synchronized (lock) { - if (!isDoing()) { - return false; - } - - state = FutureState.DONE; - lock.notifyAll(); - } - - notifyListeners(); - return true; - } - - private void notifyListener(final FutureListener listener) { - try { - listener.operationComplete(this); - } catch (Throwable t) { - LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t); - } - } - - private boolean cancel(Exception e) { - synchronized (lock) { - if (!isDoing()) { - return false; - } - - state = FutureState.CANCELLED; - exception = e; - lock.notifyAll(); - } - - notifyListeners(); - return true; - } -} - diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java index 00bcb85f2a..d6a1ae965a 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java @@ -24,10 +24,6 @@ import java.util.Properties; import java.util.Set; -import io.openmessaging.api.Message; -import io.openmessaging.api.OMSBuiltinKeys; -import io.openmessaging.api.SendResult; -import io.openmessaging.api.exception.OMSRuntimeException; import org.apache.eventmesh.common.Constants; import org.apache.rocketmq.common.UtilAll; @@ -45,252 +41,252 @@ public static String buildInstanceName() { return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime(); } - public static org.apache.rocketmq.common.message.Message msgConvert(Message omsMessage) { - org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); - if (omsMessage == null) { - throw new OMSRuntimeException("'message' is null"); - } else { - if (omsMessage.getTopic() != null) { - rmqMessage.setTopic(omsMessage.getTopic()); - } - if (omsMessage.getKey() != null) { - rmqMessage.setKeys(omsMessage.getKey()); - } - if (omsMessage.getTag() != null) { - rmqMessage.setTags(omsMessage.getTag()); - } - if (omsMessage.getStartDeliverTime() > 0L) { - rmqMessage.putUserProperty("TIMER_DELIVER_MS", String.valueOf(omsMessage.getStartDeliverTime())); - } - - if (omsMessage.getBody() != null) { - rmqMessage.setBody(omsMessage.getBody()); - } - - if (omsMessage.getShardingKey() != null && !omsMessage.getShardingKey().isEmpty()) { - rmqMessage.putUserProperty("__SHARDINGKEY", omsMessage.getShardingKey()); - } - } - Properties systemProperties = omsMessage.getSystemProperties(); - Properties userProperties = omsMessage.getUserProperties(); - - //All destinations in RocketMQ use Topic -// rmqMessage.setTopic(systemProperties.getProperty(BuiltinKeys.DESTINATION)); - -// if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) { -// long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0); -// if (deliverTime > 0) { -// rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); +// public static org.apache.rocketmq.common.message.Message msgConvert(Message omsMessage) { +// org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); +// if (omsMessage == null) { +// throw new OMSRuntimeException("'message' is null"); +// } else { +// if (omsMessage.getTopic() != null) { +// rmqMessage.setTopic(omsMessage.getTopic()); +// } +// if (omsMessage.getKey() != null) { +// rmqMessage.setKeys(omsMessage.getKey()); +// } +// if (omsMessage.getTag() != null) { +// rmqMessage.setTags(omsMessage.getTag()); +// } +// if (omsMessage.getStartDeliverTime() > 0L) { +// rmqMessage.putUserProperty("TIMER_DELIVER_MS", String.valueOf(omsMessage.getStartDeliverTime())); +// } +// +// if (omsMessage.getBody() != null) { +// rmqMessage.setBody(omsMessage.getBody()); +// } +// +// if (omsMessage.getShardingKey() != null && !omsMessage.getShardingKey().isEmpty()) { +// rmqMessage.putUserProperty("__SHARDINGKEY", omsMessage.getShardingKey()); // } // } - - for (String key : userProperties.stringPropertyNames()) { - MessageAccessor.putProperty(rmqMessage, key, userProperties.getProperty(key)); - } - - //System headers has a high priority - for (String key : systemProperties.stringPropertyNames()) { - MessageAccessor.putProperty(rmqMessage, key, systemProperties.getProperty(key)); - } - - return rmqMessage; - } - - public static Message msgConvert(MessageExt rmqMsg) { - Message message = new Message(); - if (rmqMsg.getTopic() != null) { - message.setTopic(rmqMsg.getTopic()); - } - - if (rmqMsg.getKeys() != null) { - message.setKey(rmqMsg.getKeys()); - } - - if (rmqMsg.getTags() != null) { - message.setTag(rmqMsg.getTags()); - } - - if (rmqMsg.getBody() != null) { - message.setBody(rmqMsg.getBody()); - } - - if (rmqMsg.getUserProperty("TIMER_DELIVER_MS") != null) { - long ms = Long.parseLong(rmqMsg.getUserProperty("TIMER_DELIVER_MS")); - rmqMsg.getProperties().remove("TIMER_DELIVER_MS"); - message.setStartDeliverTime(ms); - } - - Properties systemProperties = new Properties(); - Properties userProperties = new Properties(); - - - final Set> entries = rmqMsg.getProperties().entrySet(); - - for (final Map.Entry entry : entries) { - if (isOMSHeader(entry.getKey())) { - //sysHeader - systemProperties.put(entry.getKey(), entry.getValue()); - } else { - //userHeader - userProperties.put(entry.getKey(), entry.getValue()); - } - } - - if (rmqMsg.getMsgId() != null){ - systemProperties.put(Constants.PROPERTY_MESSAGE_MESSAGE_ID, rmqMsg.getMsgId()); - } - - if (rmqMsg.getTopic() != null){ - systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, rmqMsg.getTopic()); - } - -// omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys()); - systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_HOST, String.valueOf(rmqMsg.getBornHost())); - systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); - systemProperties.put(Constants.PROPERTY_MESSAGE_STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); - systemProperties.put("STORE_TIMESTAMP", rmqMsg.getStoreTimestamp()); - - //use in manual ack - userProperties.put(Constants.PROPERTY_MESSAGE_QUEUE_ID, rmqMsg.getQueueId()); - userProperties.put(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET, rmqMsg.getQueueOffset()); - - message.setSystemProperties(systemProperties); - message.setUserProperties(userProperties); - - return message; - } - - public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Message omsMessage) { - - org.apache.rocketmq.common.message.MessageExt rmqMessageExt = new org.apache.rocketmq.common.message.MessageExt(); - try { - if (omsMessage.getKey() != null) { - rmqMessageExt.setKeys(omsMessage.getKey()); - } - if (omsMessage.getTag() != null) { - rmqMessageExt.setTags(omsMessage.getTag()); - } - if (omsMessage.getStartDeliverTime() > 0L) { - rmqMessageExt.putUserProperty("TIMER_DELIVER_MS", String.valueOf(omsMessage.getStartDeliverTime())); - } - - if (omsMessage.getBody() != null) { - rmqMessageExt.setBody(omsMessage.getBody()); - } - - if (omsMessage.getShardingKey() != null && !omsMessage.getShardingKey().isEmpty()) { - rmqMessageExt.putUserProperty("__SHARDINGKEY", omsMessage.getShardingKey()); - } - - Properties systemProperties = omsMessage.getSystemProperties(); - Properties userProperties = omsMessage.getUserProperties(); - - //All destinations in RocketMQ use Topic - rmqMessageExt.setTopic(omsMessage.getTopic()); - - int queueId = (int) userProperties.get(Constants.PROPERTY_MESSAGE_QUEUE_ID); - long queueOffset = (long) userProperties.get(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET); - //use in manual ack - rmqMessageExt.setQueueId(queueId); - rmqMessageExt.setQueueOffset(queueOffset); - - for (String key : userProperties.stringPropertyNames()) { - MessageAccessor.putProperty(rmqMessageExt, key, userProperties.getProperty(key)); - } - - //System headers has a high priority - for (String key : systemProperties.stringPropertyNames()) { - MessageAccessor.putProperty(rmqMessageExt, key, systemProperties.getProperty(key)); - } - - } catch (Exception e) { - e.printStackTrace(); - } - return rmqMessageExt; - - } - - public static boolean isOMSHeader(String value) { - for (Field field : OMSBuiltinKeys.class.getDeclaredFields()) { - try { - if (field.get(OMSBuiltinKeys.class).equals(value)) { - return true; - } - } catch (IllegalAccessException e) { - return false; - } - } - return false; - } - - /** - * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult. - * - * @param rmqResult RocketMQ result - * @return send result - */ - public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) { - SendResult sendResult = new SendResult(); - sendResult.setTopic(rmqResult.getMessageQueue().getTopic()); - sendResult.setMessageId(rmqResult.getMsgId()); - return sendResult; - } - -// public static KeyValue buildKeyValue(KeyValue... keyValues) { -// KeyValue keyValue = OMS.newKeyValue(); -// for (KeyValue properties : keyValues) { -// for (String key : properties.keySet()) { -// keyValue.put(key, properties.getString(key)); +// Properties systemProperties = omsMessage.getSystemProperties(); +// Properties userProperties = omsMessage.getUserProperties(); +// +// //All destinations in RocketMQ use Topic +//// rmqMessage.setTopic(systemProperties.getProperty(BuiltinKeys.DESTINATION)); +// +//// if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) { +//// long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0); +//// if (deliverTime > 0) { +//// rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); +//// } +//// } +// +// for (String key : userProperties.stringPropertyNames()) { +// MessageAccessor.putProperty(rmqMessage, key, userProperties.getProperty(key)); +// } +// +// //System headers has a high priority +// for (String key : systemProperties.stringPropertyNames()) { +// MessageAccessor.putProperty(rmqMessage, key, systemProperties.getProperty(key)); +// } +// +// return rmqMessage; +// } +// +// public static Message msgConvert(MessageExt rmqMsg) { +// Message message = new Message(); +// if (rmqMsg.getTopic() != null) { +// message.setTopic(rmqMsg.getTopic()); +// } +// +// if (rmqMsg.getKeys() != null) { +// message.setKey(rmqMsg.getKeys()); +// } +// +// if (rmqMsg.getTags() != null) { +// message.setTag(rmqMsg.getTags()); +// } +// +// if (rmqMsg.getBody() != null) { +// message.setBody(rmqMsg.getBody()); +// } +// +// if (rmqMsg.getUserProperty("TIMER_DELIVER_MS") != null) { +// long ms = Long.parseLong(rmqMsg.getUserProperty("TIMER_DELIVER_MS")); +// rmqMsg.getProperties().remove("TIMER_DELIVER_MS"); +// message.setStartDeliverTime(ms); +// } +// +// Properties systemProperties = new Properties(); +// Properties userProperties = new Properties(); +// +// +// final Set> entries = rmqMsg.getProperties().entrySet(); +// +// for (final Map.Entry entry : entries) { +// if (isOMSHeader(entry.getKey())) { +// //sysHeader +// systemProperties.put(entry.getKey(), entry.getValue()); +// } else { +// //userHeader +// userProperties.put(entry.getKey(), entry.getValue()); // } // } -// return keyValue; +// +// if (rmqMsg.getMsgId() != null){ +// systemProperties.put(Constants.PROPERTY_MESSAGE_MESSAGE_ID, rmqMsg.getMsgId()); +// } +// +// if (rmqMsg.getTopic() != null){ +// systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, rmqMsg.getTopic()); +// } +// +//// omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys()); +// systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_HOST, String.valueOf(rmqMsg.getBornHost())); +// systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); +// systemProperties.put(Constants.PROPERTY_MESSAGE_STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); +// systemProperties.put("STORE_TIMESTAMP", rmqMsg.getStoreTimestamp()); +// +// //use in manual ack +// userProperties.put(Constants.PROPERTY_MESSAGE_QUEUE_ID, rmqMsg.getQueueId()); +// userProperties.put(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET, rmqMsg.getQueueOffset()); +// +// message.setSystemProperties(systemProperties); +// message.setUserProperties(userProperties); +// +// return message; +// } +// +// public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Message omsMessage) { +// +// org.apache.rocketmq.common.message.MessageExt rmqMessageExt = new org.apache.rocketmq.common.message.MessageExt(); +// try { +// if (omsMessage.getKey() != null) { +// rmqMessageExt.setKeys(omsMessage.getKey()); +// } +// if (omsMessage.getTag() != null) { +// rmqMessageExt.setTags(omsMessage.getTag()); +// } +// if (omsMessage.getStartDeliverTime() > 0L) { +// rmqMessageExt.putUserProperty("TIMER_DELIVER_MS", String.valueOf(omsMessage.getStartDeliverTime())); +// } +// +// if (omsMessage.getBody() != null) { +// rmqMessageExt.setBody(omsMessage.getBody()); +// } +// +// if (omsMessage.getShardingKey() != null && !omsMessage.getShardingKey().isEmpty()) { +// rmqMessageExt.putUserProperty("__SHARDINGKEY", omsMessage.getShardingKey()); +// } +// +// Properties systemProperties = omsMessage.getSystemProperties(); +// Properties userProperties = omsMessage.getUserProperties(); +// +// //All destinations in RocketMQ use Topic +// rmqMessageExt.setTopic(omsMessage.getTopic()); +// +// int queueId = (int) userProperties.get(Constants.PROPERTY_MESSAGE_QUEUE_ID); +// long queueOffset = (long) userProperties.get(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET); +// //use in manual ack +// rmqMessageExt.setQueueId(queueId); +// rmqMessageExt.setQueueOffset(queueOffset); +// +// for (String key : userProperties.stringPropertyNames()) { +// MessageAccessor.putProperty(rmqMessageExt, key, userProperties.getProperty(key)); +// } +// +// //System headers has a high priority +// for (String key : systemProperties.stringPropertyNames()) { +// MessageAccessor.putProperty(rmqMessageExt, key, systemProperties.getProperty(key)); +// } +// +// } catch (Exception e) { +// e.printStackTrace(); +// } +// return rmqMessageExt; +// +// } +// +// public static boolean isOMSHeader(String value) { +// for (Field field : OMSBuiltinKeys.class.getDeclaredFields()) { +// try { +// if (field.get(OMSBuiltinKeys.class).equals(value)) { +// return true; +// } +// } catch (IllegalAccessException e) { +// return false; +// } +// } +// return false; +// } +// +// /** +// * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult. +// * +// * @param rmqResult RocketMQ result +// * @return send result +// */ +// public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) { +// SendResult sendResult = new SendResult(); +// sendResult.setTopic(rmqResult.getMessageQueue().getTopic()); +// sendResult.setMessageId(rmqResult.getMsgId()); +// return sendResult; +// } +// +//// public static KeyValue buildKeyValue(KeyValue... keyValues) { +//// KeyValue keyValue = OMS.newKeyValue(); +//// for (KeyValue properties : keyValues) { +//// for (String key : properties.keySet()) { +//// keyValue.put(key, properties.getString(key)); +//// } +//// } +//// return keyValue; +//// } +// +// /** +// * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}. +// * +// * @param Target type +// * @return Iterator +// */ +// public static Iterator cycle(final Iterable iterable) { +// return new Iterator() { +// Iterator iterator = new Iterator() { +// @Override +// public synchronized boolean hasNext() { +// return false; +// } +// +// @Override +// public synchronized T next() { +// throw new NoSuchElementException(); +// } +// +// @Override +// public synchronized void remove() { +// //Ignore +// } +// }; +// +// @Override +// public synchronized boolean hasNext() { +// return iterator.hasNext() || iterable.iterator().hasNext(); +// } +// +// @Override +// public synchronized T next() { +// if (!iterator.hasNext()) { +// iterator = iterable.iterator(); +// if (!iterator.hasNext()) { +// throw new NoSuchElementException(); +// } +// } +// return iterator.next(); +// } +// +// @Override +// public synchronized void remove() { +// iterator.remove(); +// } +// }; // } - - /** - * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}. - * - * @param Target type - * @return Iterator - */ - public static Iterator cycle(final Iterable iterable) { - return new Iterator() { - Iterator iterator = new Iterator() { - @Override - public synchronized boolean hasNext() { - return false; - } - - @Override - public synchronized T next() { - throw new NoSuchElementException(); - } - - @Override - public synchronized void remove() { - //Ignore - } - }; - - @Override - public synchronized boolean hasNext() { - return iterator.hasNext() || iterable.iterator().hasNext(); - } - - @Override - public synchronized T next() { - if (!iterator.hasNext()) { - iterator = iterable.iterator(); - if (!iterator.hasNext()) { - throw new NoSuchElementException(); - } - } - return iterator.next(); - } - - @Override - public synchronized void remove() { - iterator.remove(); - } - }; - } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer similarity index 100% rename from eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.Producer similarity index 100% rename from eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.Producer diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint deleted file mode 100644 index e326c919c7..0000000000 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java index 72a9b58e65..99135aa7e8 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java @@ -18,7 +18,6 @@ package org.apache.eventmesh.connector.standalone.broker; import io.cloudevents.CloudEvent; -import io.openmessaging.api.Message; import org.apache.commons.lang3.tuple.Pair; import org.apache.eventmesh.connector.standalone.broker.model.MessageEntity; import org.apache.eventmesh.connector.standalone.broker.model.TopicMetadata; diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/MessageEntity.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/MessageEntity.java index 968053aba9..dd7468f03b 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/MessageEntity.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/MessageEntity.java @@ -18,7 +18,6 @@ package org.apache.eventmesh.connector.standalone.broker.model; import io.cloudevents.CloudEvent; -import io.openmessaging.api.Message; import java.io.Serializable; diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducer.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducer.java index 57c8a2a749..b2a6122d91 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducer.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.connector.standalone.producer; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; @@ -37,7 +36,7 @@ import io.cloudevents.CloudEvent; -public class StandaloneProducer implements Producer { +public class StandaloneProducer { private Logger logger = LoggerFactory.getLogger(StandaloneProducer.class); @@ -50,32 +49,26 @@ public StandaloneProducer(Properties properties) { this.isStarted = new AtomicBoolean(false); } - @Override public boolean isStarted() { return isStarted.get(); } - @Override public boolean isClosed() { return !isStarted.get(); } - @Override public void start() { isStarted.compareAndSet(false, true); } - @Override public void shutdown() { isStarted.compareAndSet(true, false); } - @Override - public void init(Properties properties) throws Exception { - + public StandaloneProducer init(Properties properties) throws Exception { + return new StandaloneProducer(properties); } - @Override public SendResult publish(CloudEvent cloudEvent) { Preconditions.checkNotNull(cloudEvent); try { @@ -91,7 +84,6 @@ public SendResult publish(CloudEvent cloudEvent) { } } - @Override public void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception { Preconditions.checkNotNull(cloudEvent); Preconditions.checkNotNull(sendCallback); @@ -108,12 +100,10 @@ public void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exc } } - @Override public void sendOneway(CloudEvent cloudEvent) { publish(cloudEvent); } - @Override public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) { Preconditions.checkNotNull(cloudEvent); Preconditions.checkNotNull(sendCallback); @@ -130,22 +120,19 @@ public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) { } } - @Override - public void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception { - throw new ConnectorRuntimeException("Request is not supported"); - } +// @Override +// public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { +// throw new ConnectorRuntimeException("Request is not supported"); +// } - @Override public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { throw new ConnectorRuntimeException("Request is not supported"); } - @Override public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception { throw new ConnectorRuntimeException("Reply is not supported"); } - @Override public void checkTopicExist(String topic) throws Exception { boolean exist = standaloneBroker.checkTopicExist(topic); if (!exist) { @@ -153,7 +140,6 @@ public void checkTopicExist(String topic) throws Exception { } } - @Override public void setExtFields() { } diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java index d60968cafc..8cff7e4f82 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.connector.standalone.producer; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; @@ -61,7 +60,7 @@ public void shutdown() { @Override public void init(Properties properties) throws Exception { - standaloneProducer.init(properties); + standaloneProducer = new StandaloneProducer(properties); } @Override @@ -84,10 +83,10 @@ public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) { standaloneProducer.sendAsync(cloudEvent, sendCallback); } - @Override - public void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception { - standaloneProducer.request(cloudEvent, rrCallback, timeout); - } +// @Override +// public void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception { +// standaloneProducer.request(cloudEvent, rrCallback, timeout); +// } @Override public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { diff --git a/eventmesh-examples/build.gradle b/eventmesh-examples/build.gradle index 3cb42bce3e..6e57ad6722 100644 --- a/eventmesh-examples/build.gradle +++ b/eventmesh-examples/build.gradle @@ -26,6 +26,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'io.netty:netty-all' implementation "io.cloudevents:cloudevents-core" + implementation "io.openmessaging:openmessaging-api" compileOnly 'org.projectlombok:lombok:1.18.22' annotationProcessor 'org.projectlombok:lombok:1.18.22' diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java index bd7f7cbcaa..d6f30e8502 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java @@ -22,6 +22,7 @@ import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientBroadCast; import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast; +import org.apache.eventmesh.client.tcp.common.MessageUtils; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Header; @@ -33,8 +34,9 @@ public class EventMeshTestUtils { private static final int seqLength = 10; + // generate pub-client public static UserAgent generateClient1() { - return UserAgent.builder() + UserAgent agent = UserAgent.builder() .env("test") .host("127.0.0.1") .password(generateRandomString(8)) @@ -48,10 +50,12 @@ public static UserAgent generateClient1() { .version("2.0.11") .idc("FT") .build(); + return MessageUtils.generatePubClient(agent); } + // generate sub-client public static UserAgent generateClient2() { - return UserAgent.builder() + UserAgent agent = UserAgent.builder() .env("test") .host("127.0.0.1") .password(generateRandomString(8)) @@ -65,6 +69,7 @@ public static UserAgent generateClient2() { .version("2.0.11") .idc("FT") .build(); + return MessageUtils.generateSubClient(agent); } public static Package syncRR() { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java index cb773f4eb4..bf762ebd92 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java @@ -20,7 +20,7 @@ import org.apache.eventmesh.api.acl.AclPropertyKeys; import org.apache.eventmesh.api.acl.AclService; import org.apache.eventmesh.api.exception.AclException; -import org.apache.eventmesh.api.producer.MeshMQProducer; +//import org.apache.eventmesh.api.producer.MeshMQProducer; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.spi.EventMeshExtensionFactory; import org.slf4j.Logger; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java index 58b3151578..f781071126 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java @@ -19,7 +19,6 @@ import java.util.Properties; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.factory.ConnectorPluginFactory; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/OMSMessageFactory.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/OMSMessageFactory.java deleted file mode 100644 index fcb2026fb2..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/OMSMessageFactory.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.runtime.core.protocol.cloudevent; - -import io.cloudevents.core.message.MessageReader; -import io.cloudevents.core.message.MessageWriter; -import io.cloudevents.core.message.impl.GenericStructuredMessageReader; -import io.cloudevents.core.message.impl.MessageUtils; -import io.cloudevents.lang.Nullable; -import io.cloudevents.rw.CloudEventRWException; -import io.cloudevents.rw.CloudEventWriter; -import io.openmessaging.api.Message; -import org.apache.eventmesh.runtime.core.protocol.cloudevent.impl.OMSBinaryMessageReader; -import org.apache.eventmesh.runtime.core.protocol.cloudevent.impl.OMSHeaders; -import org.apache.eventmesh.runtime.core.protocol.cloudevent.impl.OMSMessageWriter; - -import javax.annotation.ParametersAreNonnullByDefault; -import java.util.Properties; - -/** - * This class provides a collection of methods to create {@link io.cloudevents.core.message.MessageReader} - * and {@link io.cloudevents.core.message.MessageWriter} - * manually serialize/deserialize {@link io.cloudevents.CloudEvent} messages. - */ -@ParametersAreNonnullByDefault -public final class OMSMessageFactory { - - private OMSMessageFactory() { - // prevent instantiation - } - - /** - * create reader by message - * @param message - * @return - * @throws CloudEventRWException - */ - public static MessageReader createReader(final Message message) throws CloudEventRWException { - return createReader(message.getUserProperties(), message.getBody()); - } - - - public static MessageReader createReader(final Properties props, @Nullable final byte[] body) throws CloudEventRWException { - - return MessageUtils.parseStructuredOrBinaryMessage( - () -> props.getOrDefault(OMSHeaders.CONTENT_TYPE,"").toString(), - format -> new GenericStructuredMessageReader(format, body), - () -> props.getOrDefault(OMSHeaders.SPEC_VERSION,"").toString(), - sv -> new OMSBinaryMessageReader(sv, props, body) - ); - } - - - /** - * create writer by topic - * @param topic - * @return - */ - public static MessageWriter, Message> createWriter(String topic) { - return new OMSMessageWriter<>(topic); - } - - /** - * create writer by topic,keys - * @param topic - * @param keys - * @return - */ - public static MessageWriter, Message> createWriter(String topic, String keys) { - return new OMSMessageWriter<>(topic, keys); - } - - /** - * create writer by topic,keys,tags - * @param topic - * @param keys - * @param tags - * @return - */ - public static MessageWriter, Message> createWriter(String topic, String keys, String tags) { - return new OMSMessageWriter<>(topic, keys, tags); - } - -} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSBinaryMessageReader.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSBinaryMessageReader.java deleted file mode 100644 index ae035c7be8..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSBinaryMessageReader.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.runtime.core.protocol.cloudevent.impl; - -import io.cloudevents.SpecVersion; -import io.cloudevents.core.data.BytesCloudEventData; -import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl; - -import java.util.Objects; -import java.util.Properties; -import java.util.function.BiConsumer; - -/** - * binary message reader - */ -public class OMSBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl { - - private final Properties headers; - - public OMSBinaryMessageReader(SpecVersion version, Properties headers, byte[] payload) { - super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null); - - Objects.requireNonNull(headers); - this.headers = headers; - } - - /** - * whether header key is content type - * @param key - * @return - */ - @Override - protected boolean isContentTypeHeader(String key) { - return key.equals(OMSHeaders.CONTENT_TYPE); - } - - /** - * whether message header is cloudEvent header - * @param key - * @return - */ - @Override - protected boolean isCloudEventsHeader(String key) { - return key.length() > 3 && key.substring(0, OMSHeaders.CE_PREFIX.length()).startsWith(OMSHeaders.CE_PREFIX); - } - - /** - * parse message header to cloudEvent attribute - * @param key - * @return - */ - @Override - protected String toCloudEventsKey(String key) { - return key.substring(OMSHeaders.CE_PREFIX.length()).toLowerCase(); - } - - /** - * - * @param fn - */ - @Override - protected void forEachHeader(BiConsumer fn) { - this.headers.forEach((k, v) -> { - if (k != null && v != null) { - fn.accept(k.toString(), v.toString()); - } - - }); - } - - @Override - protected String toCloudEventsValue(String value) { - return value; - } -} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSHeaders.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSHeaders.java deleted file mode 100644 index 7747175165..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSHeaders.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eventmesh.runtime.core.protocol.cloudevent.impl; - -import io.cloudevents.core.message.impl.MessageUtils; -import io.cloudevents.core.v1.CloudEventV1; - -import java.util.Map; - -/** - * Define the value of CE attribute in the header of ons - */ -public class OMSHeaders { - - /** - * CE prefix - */ - public static final String CE_PREFIX = "ce_"; - - /** - * Prefix each value - */ - protected static final Map ATTRIBUTES_TO_HEADERS = MessageUtils.generateAttributesToHeadersMapping(v -> CE_PREFIX + v); - - public static final String CONTENT_TYPE = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.DATACONTENTTYPE); - - public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.SPECVERSION); - -} - diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSMessageWriter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSMessageWriter.java deleted file mode 100644 index 0d74331a06..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSMessageWriter.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.runtime.core.protocol.cloudevent.impl; - -import io.cloudevents.CloudEventData; -import io.cloudevents.SpecVersion; -import io.cloudevents.core.format.EventFormat; -import io.cloudevents.core.message.MessageWriter; -import io.cloudevents.rw.CloudEventContextWriter; -import io.cloudevents.rw.CloudEventRWException; -import io.cloudevents.rw.CloudEventWriter; -import io.openmessaging.api.Message; -import org.apache.commons.lang3.StringUtils; - -/** - * write ce to ons - * @param - */ -public final class OMSMessageWriter implements MessageWriter, Message>, CloudEventWriter { - - private Message message; - - - public OMSMessageWriter(String topic) { - message = new Message(); - message.setTopic(topic); - } - - public OMSMessageWriter(String topic, String key) { - message = new Message(); - message.setTopic(topic); - if (key != null && key.length() > 0) { - message.setKey(key); - } - } - - public OMSMessageWriter(String topic, String key, String tag) { - message = new Message(); - message.setTopic(topic); - if (StringUtils.isNotEmpty(tag)) { - message.setTag(tag); - } - - if (StringUtils.isNotEmpty(key)) { - message.setKey(key); - } - } - - - @Override - public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { - - String propName = OMSHeaders.ATTRIBUTES_TO_HEADERS.get(name); - if (propName == null) { - propName = OMSHeaders.CE_PREFIX + name; - } - message.putUserProperties(propName, value); - return this; - } - - @Override - public OMSMessageWriter create(final SpecVersion version) { - message.putUserProperties(OMSHeaders.SPEC_VERSION, version.toString()); - return this; - } - - @Override - public Message setEvent(final EventFormat format, final byte[] value) throws CloudEventRWException { - message.putUserProperties(OMSHeaders.CONTENT_TYPE, format.serializedContentType()); - message.setBody(value); - return message; - } - - @Override - public Message end(final CloudEventData data) throws CloudEventRWException { - message.setBody(data.toBytes()); - return message; - } - - @Override - public Message end() { - message.setBody(null); - return message; - } -} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java index 29cf4c81c5..34c8d7d5d2 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java @@ -20,7 +20,6 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java index 4fbea05a01..e5db1ab7e3 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java @@ -20,7 +20,6 @@ import io.cloudevents.core.builder.CloudEventBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.common.protocol.tcp.Command; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java index df4a6062b4..10e8f1ad41 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java @@ -55,7 +55,7 @@ public void run() { UserAgent user = (UserAgent) pkg.getBody(); try { //do acl check in connect - if(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable){ + if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable) { String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); Acl.doAclCheckInTcpConnect(remoteAddr, user, HELLO_REQUEST.value()); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java index 035e9d4c97..22400f7c3d 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java @@ -38,7 +38,6 @@ import com.fasterxml.jackson.databind.SerializationFeature; import io.cloudevents.CloudEvent; -import io.openmessaging.api.Message; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; @@ -200,42 +199,42 @@ public static Map getEventProp(CloudEvent event) { // return accessMessage; // } - public static EventMeshMessage encodeMessage(Message omsMessage) throws Exception { - - EventMeshMessage eventMeshMessage = new EventMeshMessage(); - eventMeshMessage.setBody(new String(omsMessage.getBody(), StandardCharsets.UTF_8)); - - Properties sysHeaders = omsMessage.getSystemProperties(); - Properties userHeaders = omsMessage.getUserProperties(); - - //All destinations in RocketMQ use Topic - eventMeshMessage.setTopic(sysHeaders.getProperty(Constants.PROPERTY_MESSAGE_DESTINATION)); - - if (sysHeaders.containsKey("START_TIME")) { - long deliverTime; - if (StringUtils.isBlank(sysHeaders.getProperty("START_TIME"))) { - deliverTime = 0; - } else { - deliverTime = Long.parseLong(sysHeaders.getProperty("START_TIME")); - } - - if (deliverTime > 0) { -// rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); - eventMeshMessage.getProperties().put("START_TIME", String.valueOf(deliverTime)); - } - } - - for (String key : userHeaders.stringPropertyNames()) { - eventMeshMessage.getProperties().put(key, userHeaders.getProperty(key)); - } - - //System headers has a high priority - for (String key : sysHeaders.stringPropertyNames()) { - eventMeshMessage.getProperties().put(key, sysHeaders.getProperty(key)); - } - - return eventMeshMessage; - } +// public static EventMeshMessage encodeMessage(Message omsMessage) throws Exception { +// +// EventMeshMessage eventMeshMessage = new EventMeshMessage(); +// eventMeshMessage.setBody(new String(omsMessage.getBody(), StandardCharsets.UTF_8)); +// +// Properties sysHeaders = omsMessage.getSystemProperties(); +// Properties userHeaders = omsMessage.getUserProperties(); +// +// //All destinations in RocketMQ use Topic +// eventMeshMessage.setTopic(sysHeaders.getProperty(Constants.PROPERTY_MESSAGE_DESTINATION)); +// +// if (sysHeaders.containsKey("START_TIME")) { +// long deliverTime; +// if (StringUtils.isBlank(sysHeaders.getProperty("START_TIME"))) { +// deliverTime = 0; +// } else { +// deliverTime = Long.parseLong(sysHeaders.getProperty("START_TIME")); +// } +// +// if (deliverTime > 0) { +//// rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); +// eventMeshMessage.getProperties().put("START_TIME", String.valueOf(deliverTime)); +// } +// } +// +// for (String key : userHeaders.stringPropertyNames()) { +// eventMeshMessage.getProperties().put(key, userHeaders.getProperty(key)); +// } +// +// //System headers has a high priority +// for (String key : sysHeaders.stringPropertyNames()) { +// eventMeshMessage.getProperties().put(key, sysHeaders.getProperty(key)); +// } +// +// return eventMeshMessage; +// } public static String getLocalAddr() { //priority of networkInterface when generating client ip diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java index ceaa4ce655..8ed1ea85b8 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java @@ -41,8 +41,8 @@ public EventMeshMessageTCPClient(EventMeshTCPClientConfig eventMeshTcpClientConf @Override public void init() throws EventMeshException { - eventMeshMessageTCPPubClient.init(); eventMeshMessageTCPSubClient.init(); + eventMeshMessageTCPPubClient.init(); } @Override diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java index 794cea66e5..a3dd6a41bd 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java @@ -56,7 +56,7 @@ class EventMeshMessageTCPPubClient extends TcpClient implements EventMeshTCPPubC public EventMeshMessageTCPPubClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) { super(eventMeshTcpClientConfig); - this.userAgent = eventMeshTcpClientConfig.getUserAgent(); + this.userAgent = MessageUtils.generatePubClient(eventMeshTcpClientConfig.getUserAgent()); } @Override diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java index 7939066e92..add12a5d85 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java @@ -55,7 +55,7 @@ class EventMeshMessageTCPSubClient extends TcpClient implements EventMeshTCPSubC public EventMeshMessageTCPSubClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) { super(eventMeshTcpClientConfig); - this.userAgent = eventMeshTcpClientConfig.getUserAgent(); + this.userAgent = MessageUtils.generateSubClient(eventMeshTcpClientConfig.getUserAgent()); } @Override