diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java index 52ccc15a75..a01bb250b4 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java @@ -17,15 +17,17 @@ package org.apache.eventmesh.tcp.demo.pub.eventmeshmessage; -import java.util.Properties; - +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; -import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; -import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPPubClient; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.tcp.common.EventMeshTestUtils; import org.apache.eventmesh.util.Utils; + +import java.util.Properties; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +35,7 @@ public class AsyncPublish { public static Logger logger = LoggerFactory.getLogger(AsyncPublish.class); - private static EventMeshMessageTCPPubClient client; + private static EventMeshTCPClient client; public static AsyncPublish handler = new AsyncPublish(); @@ -43,12 +45,13 @@ public static void main(String[] agrs) throws Exception { final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port")); try { UserAgent userAgent = EventMeshTestUtils.generateClient1(); - EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder() + EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder() .host(eventMeshIp) .port(eventMeshTcpPort) .userAgent(userAgent) .build(); - client = new EventMeshMessageTCPPubClient(eventMeshTcpClientConfig); + client = + EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class); client.init(); client.heartbeat(); @@ -60,10 +63,8 @@ public static void main(String[] agrs) throws Exception { Thread.sleep(1000); } - + client.listen(); Thread.sleep(2000); - // release resource and close client - // client.close(); } catch (Exception e) { logger.warn("AsyncPublish failed", e); } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java index 12071dea92..07c4de5a3b 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java @@ -17,9 +17,10 @@ package org.apache.eventmesh.tcp.demo.pub.eventmeshmessage; +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; -import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; -import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPPubClient; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.tcp.common.EventMeshTestUtils; @@ -39,12 +40,13 @@ public static void main(String[] agrs) throws Exception { final String eventMeshIp = properties.getProperty("eventmesh.ip"); final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port")); UserAgent userAgent = EventMeshTestUtils.generateClient1(); - EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder() + EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder() .host(eventMeshIp) .port(eventMeshTcpPort) .userAgent(userAgent) .build(); - try (final EventMeshMessageTCPPubClient client = new EventMeshMessageTCPPubClient(eventMeshTcpClientConfig)) { + try (final EventMeshTCPClient client = + EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class)) { client.init(); client.heartbeat(); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java index 844b9cf6cc..f13ee46757 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java @@ -17,9 +17,10 @@ package org.apache.eventmesh.tcp.demo.pub.eventmeshmessage; +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; -import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; -import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPPubClient; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; @@ -30,16 +31,15 @@ @Slf4j public class SyncRequest { - private static EventMeshMessageTCPPubClient client; - public static void main(String[] agrs) throws Exception { UserAgent userAgent = EventMeshTestUtils.generateClient1(); - EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder() + EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder() .host("127.0.0.1") .port(10000) .userAgent(userAgent) .build(); - try (EventMeshMessageTCPPubClient client = new EventMeshMessageTCPPubClient(eventMeshTcpClientConfig)) { + try (EventMeshTCPClient client = EventMeshTCPClientFactory.createEventMeshTCPClient( + eventMeshTcpClientConfig, EventMeshMessage.class)) { client.init(); client.heartbeat(); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java index ec56aa5c2f..f27dbd1b75 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java @@ -17,9 +17,10 @@ package org.apache.eventmesh.tcp.demo.sub.eventmeshmessage; +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; -import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPSubClient; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; @@ -36,8 +37,6 @@ @Slf4j public class AsyncSubscribe implements ReceiveMsgHook { - private static EventMeshMessageTCPSubClient client; - public static AsyncSubscribe handler = new AsyncSubscribe(); public static void main(String[] agrs) throws Exception { @@ -45,17 +44,18 @@ public static void main(String[] agrs) throws Exception { final String eventMeshIp = properties.getProperty("eventmesh.ip"); final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port")); UserAgent userAgent = EventMeshTestUtils.generateClient2(); - EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder() + EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder() .host(eventMeshIp) .port(eventMeshTcpPort) .userAgent(userAgent) .build(); - try (EventMeshMessageTCPSubClient client = new EventMeshMessageTCPSubClient(eventMeshTcpClientConfig)) { + try (EventMeshTCPClient client = EventMeshTCPClientFactory.createEventMeshTCPClient( + eventMeshTcpClientConfig, EventMeshMessage.class)) { client.init(); client.heartbeat(); client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); - client.registerBusiHandler(handler); + client.registerSubBusiHandler(handler); client.listen(); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java index 8d642e19ee..31355e0980 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java @@ -17,9 +17,10 @@ package org.apache.eventmesh.tcp.demo.sub.eventmeshmessage; +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; -import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPSubClient; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; @@ -43,17 +44,18 @@ public static void main(String[] agrs) throws Exception { final String eventMeshIp = properties.getProperty("eventmesh.ip"); final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port")); UserAgent userAgent = EventMeshTestUtils.generateClient2(); - EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder() + EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder() .host(eventMeshIp) .port(eventMeshTcpPort) .userAgent(userAgent) .build(); - try (EventMeshMessageTCPSubClient client = new EventMeshMessageTCPSubClient(eventMeshTcpClientConfig)) { + try (EventMeshTCPClient client = EventMeshTCPClientFactory.createEventMeshTCPClient( + eventMeshTcpClientConfig, EventMeshMessage.class)) { client.init(); client.heartbeat(); client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC); - client.registerBusiHandler(handler); + client.registerSubBusiHandler(handler); client.listen(); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java index b796c4b4aa..5ed9dae349 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java @@ -17,9 +17,10 @@ package org.apache.eventmesh.tcp.demo.sub.eventmeshmessage; +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; -import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPSubClient; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; @@ -37,18 +38,19 @@ public class SyncResponse implements ReceiveMsgHook { public static void main(String[] agrs) throws Exception { UserAgent userAgent = EventMeshTestUtils.generateClient2(); - EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder() + EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder() .host("127.0.0.1") .port(10000) .userAgent(userAgent) .build(); - try (EventMeshMessageTCPSubClient client = new EventMeshMessageTCPSubClient(eventMeshTcpClientConfig)) { + try (EventMeshTCPClient client = EventMeshTCPClientFactory + .createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class)) { client.init(); client.heartbeat(); client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC); // Synchronize RR messages - client.registerBusiHandler(handler); + client.registerSubBusiHandler(handler); client.listen(); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java new file mode 100644 index 0000000000..bec64ca08b --- /dev/null +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.client.tcp; + +import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; +import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.SubscriptionType; +import org.apache.eventmesh.common.protocol.tcp.Package; + +/** + * EventMesh TCP client, used to sub/pub message by tcp. + * You can use {@link org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory} to create a target client. + * + * @param + */ +public interface EventMeshTCPClient extends AutoCloseable { + + void init() throws EventMeshException; + + Package rr(ProtocolMessage msg, long timeout) throws EventMeshException; + + void asyncRR(ProtocolMessage msg, AsyncRRCallback callback, long timeout) throws EventMeshException; + + Package publish(ProtocolMessage msg, long timeout) throws EventMeshException; + + void broadcast(ProtocolMessage msg, long timeout) throws EventMeshException; + + void heartbeat() throws EventMeshException; + + void listen() throws EventMeshException; + + void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) + throws EventMeshException; + + void unsubscribe() throws EventMeshException; + + void registerPubBusiHandler(ReceiveMsgHook handler) throws EventMeshException; + + void registerSubBusiHandler(ReceiveMsgHook handler) throws EventMeshException; + + void close() throws EventMeshException; + + EventMeshTCPPubClient getPubClient(); + + EventMeshTCPSubClient getSubClient(); +} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java index 821119065e..bbe9a837cd 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java @@ -39,13 +39,13 @@ public interface EventMeshTCPPubClient extends AutoCloseable { void reconnect() throws EventMeshException; // todo: Hide package method, use ProtocolMessage - Package rr(ProtocolMessage msg, long timeout) throws EventMeshException; + Package rr(ProtocolMessage event, long timeout) throws EventMeshException; - void asyncRR(ProtocolMessage msg, AsyncRRCallback callback, long timeout) throws EventMeshException; + void asyncRR(ProtocolMessage event, AsyncRRCallback callback, long timeout) throws EventMeshException; - Package publish(ProtocolMessage cloudEvent, long timeout) throws EventMeshException; + Package publish(ProtocolMessage event, long timeout) throws EventMeshException; - void broadcast(ProtocolMessage cloudEvent, long timeout) throws EventMeshException; + void broadcast(ProtocolMessage event, long timeout) throws EventMeshException; void registerBusiHandler(ReceiveMsgHook handler) throws EventMeshException; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java index f8b6c7fff9..21186486a5 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java @@ -30,7 +30,7 @@ *
  • {@link org.apache.eventmesh.client.tcp.impl.openmessage.OpenMessageTCPSubClient}
  • * */ -public interface EventMeshTCPSubClient { +public interface EventMeshTCPSubClient extends AutoCloseable { void init() throws EventMeshException; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java index 46cf370e23..61098ad50f 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java @@ -17,9 +17,7 @@ package org.apache.eventmesh.client.tcp.common; -import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; -import org.apache.eventmesh.common.exception.EventMeshException; -import org.apache.eventmesh.common.protocol.tcp.Command; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.codec.Codec; @@ -42,7 +40,6 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -73,7 +70,7 @@ public abstract class TcpClient implements Closeable { Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder().setNameFormat("TCPClientScheduler").setDaemon(true).build()); - public TcpClient(EventMeshTcpClientConfig eventMeshTcpClientConfig) { + public TcpClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) { Preconditions.checkNotNull(eventMeshTcpClientConfig, "EventMeshTcpClientConfig cannot be null"); Preconditions.checkNotNull(eventMeshTcpClientConfig.getHost(), "Host cannot be null"); Preconditions.checkState(eventMeshTcpClientConfig.getPort() > 0, "port is not validated"); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/conf/EventMeshTCPClientConfig.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/conf/EventMeshTCPClientConfig.java new file mode 100644 index 0000000000..56b94f5e52 --- /dev/null +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/conf/EventMeshTCPClientConfig.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.client.tcp.conf; + +import org.apache.eventmesh.common.protocol.tcp.UserAgent; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class EventMeshTCPClientConfig { + private String host; + private int port; + private UserAgent userAgent; +} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/conf/EventMeshTcpClientConfig.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/conf/EventMeshTcpClientConfig.java deleted file mode 100644 index 6b916a1934..0000000000 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/conf/EventMeshTcpClientConfig.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.apache.eventmesh.client.tcp.conf; - -import org.apache.eventmesh.common.protocol.tcp.UserAgent; - -import lombok.Builder; -import lombok.Data; - -@Data -@Builder -public class EventMeshTcpClientConfig { - private String host; - private int port; - private UserAgent userAgent; -} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/EventMeshTCPClientFactory.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/EventMeshTCPClientFactory.java new file mode 100644 index 0000000000..cd11a010f0 --- /dev/null +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/EventMeshTCPClientFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.client.tcp.impl; + +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.client.tcp.impl.cloudevent.CloudEventTCPClient; +import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPClient; +import org.apache.eventmesh.client.tcp.impl.openmessage.OpenMessageTCPClient; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; + +import com.google.common.base.Preconditions; + +import io.cloudevents.CloudEvent; +import io.openmessaging.api.Message; +import lombok.experimental.UtilityClass; + +@UtilityClass +public class EventMeshTCPClientFactory { + + /** + * Create target {@link EventMeshTCPClient}. + * + * @param eventMeshTcpClientConfig client config + * @param protocolMessageClass target message protocol class + * @param target message protocol type + * @return Target client + */ + @SuppressWarnings("unchecked") + public static EventMeshTCPClient createEventMeshTCPClient( + EventMeshTCPClientConfig eventMeshTcpClientConfig, Class protocolMessageClass) { + Preconditions.checkNotNull(protocolMessageClass, "ProtocolMessage type cannot be null"); + Preconditions.checkNotNull(eventMeshTcpClientConfig, "EventMeshTcpClientConfig cannot be null"); + + if (protocolMessageClass.isAssignableFrom(EventMeshMessage.class)) { + return (EventMeshTCPClient) new EventMeshMessageTCPClient(eventMeshTcpClientConfig); + } + if (protocolMessageClass.isAssignableFrom(CloudEvent.class)) { + return (EventMeshTCPClient) new CloudEventTCPClient(eventMeshTcpClientConfig); + } + if (protocolMessageClass.isAssignableFrom(Message.class)) { + return (EventMeshTCPClient) new OpenMessageTCPClient(eventMeshTcpClientConfig); + } + throw new IllegalArgumentException( + String.format("ProtocolMessageClass: %s is not supported", protocolMessageClass)); + } +} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java new file mode 100644 index 0000000000..39faa72ddd --- /dev/null +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.client.tcp.impl.cloudevent; + +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; +import org.apache.eventmesh.client.tcp.EventMeshTCPPubClient; +import org.apache.eventmesh.client.tcp.EventMeshTCPSubClient; +import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; +import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.SubscriptionType; +import org.apache.eventmesh.common.protocol.tcp.Package; + +import io.cloudevents.CloudEvent; + +public class CloudEventTCPClient implements EventMeshTCPClient { + + private final CloudEventTCPPubClient cloudEventTCPPubClient; + + private final CloudEventTCPSubClient cloudEventTCPSubClient; + + public CloudEventTCPClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) { + cloudEventTCPPubClient = new CloudEventTCPPubClient(eventMeshTcpClientConfig); + cloudEventTCPSubClient = new CloudEventTCPSubClient(eventMeshTcpClientConfig); + } + + @Override + public void init() throws EventMeshException { + cloudEventTCPPubClient.init(); + cloudEventTCPSubClient.init(); + } + + @Override + public Package rr(CloudEvent cloudEvent, long timeout) throws EventMeshException { + return cloudEventTCPPubClient.rr(cloudEvent, timeout); + } + + @Override + public void asyncRR(CloudEvent cloudEvent, AsyncRRCallback callback, long timeout) throws EventMeshException { + cloudEventTCPPubClient.asyncRR(cloudEvent, callback, timeout); + } + + @Override + public Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshException { + return cloudEventTCPPubClient.publish(cloudEvent, timeout); + } + + @Override + public void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshException { + cloudEventTCPPubClient.broadcast(cloudEvent, timeout); + } + + @Override + public void heartbeat() throws EventMeshException { + cloudEventTCPPubClient.heartbeat(); + cloudEventTCPSubClient.heartbeat(); + } + + @Override + public void listen() throws EventMeshException { + cloudEventTCPSubClient.listen(); + } + + @Override + public void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) + throws EventMeshException { + cloudEventTCPSubClient.subscribe(topic, subscriptionMode, subscriptionType); + } + + @Override + public void unsubscribe() throws EventMeshException { + cloudEventTCPSubClient.unsubscribe(); + } + + @Override + public void registerPubBusiHandler(ReceiveMsgHook handler) throws EventMeshException { + cloudEventTCPPubClient.registerBusiHandler(handler); + } + + @Override + public void registerSubBusiHandler(ReceiveMsgHook handler) throws EventMeshException { + cloudEventTCPSubClient.registerBusiHandler(handler); + } + + @Override + public void close() throws EventMeshException { + try (final EventMeshTCPPubClient pubClient = cloudEventTCPPubClient; + final EventMeshTCPSubClient subClient = cloudEventTCPSubClient) { + // close client + } + } + + @Override + public EventMeshTCPPubClient getPubClient() { + return cloudEventTCPPubClient; + } + + @Override + public EventMeshTCPSubClient getSubClient() { + return cloudEventTCPSubClient; + } +} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java index 7a1d03b1bc..d4fbec7b36 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java @@ -1,41 +1,60 @@ -package org.apache.eventmesh.client.tcp.impl.cloudevent; - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eventmesh.client.tcp.impl.cloudevent; import org.apache.eventmesh.client.tcp.EventMeshTCPPubClient; -import org.apache.eventmesh.client.tcp.common.*; -import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; -import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPPubClient; -import org.apache.eventmesh.common.EventMeshMessage; +import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; +import org.apache.eventmesh.client.tcp.common.EventMeshCommon; +import org.apache.eventmesh.client.tcp.common.MessageUtils; +import org.apache.eventmesh.client.tcp.common.PropertyConst; +import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.client.tcp.common.RequestContext; +import org.apache.eventmesh.client.tcp.common.TcpClient; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import io.cloudevents.CloudEvent; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.cloudevents.CloudEvent; import lombok.extern.slf4j.Slf4j; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - /** * A CloudEvent TCP publish client implementation. */ @Slf4j -public class CloudEventTCPPubClient extends TcpClient implements EventMeshTCPPubClient { +class CloudEventTCPPubClient extends TcpClient implements EventMeshTCPPubClient { private final UserAgent userAgent; - private ReceiveMsgHook callback; + private ReceiveMsgHook callback; private final ConcurrentHashMap callbackConcurrentHashMap = new ConcurrentHashMap<>(); - private ScheduledFuture task; + private ScheduledFuture task; - public CloudEventTCPPubClient(EventMeshTcpClientConfig eventMeshTcpClientConfig) { + public CloudEventTCPPubClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) { super(eventMeshTcpClientConfig); this.userAgent = eventMeshTcpClientConfig.getUserAgent(); } @@ -53,7 +72,7 @@ public void init() throws EventMeshException { @Override public void heartbeat() throws EventMeshException { if (task != null) { - synchronized (EventMeshMessageTCPPubClient.class) { + synchronized (CloudEventTCPPubClient.class) { task = scheduler.scheduleAtFixedRate(() -> { try { if (!isActive()) { @@ -131,7 +150,7 @@ public void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshExcep @Override public void registerBusiHandler(ReceiveMsgHook handler) throws EventMeshException { - + callback = handler; } @Override diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java index 716feae3cb..d19f66326b 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java @@ -1,13 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.client.tcp.impl.cloudevent; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import org.apache.commons.collections4.CollectionUtils; import org.apache.eventmesh.client.tcp.EventMeshTCPSubClient; -import org.apache.eventmesh.client.tcp.common.*; -import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; -import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPSubClient; -import org.apache.eventmesh.common.EventMeshMessage; +import org.apache.eventmesh.client.tcp.common.EventMeshCommon; +import org.apache.eventmesh.client.tcp.common.MessageUtils; +import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.client.tcp.common.RequestContext; +import org.apache.eventmesh.client.tcp.common.TcpClient; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; @@ -16,8 +32,7 @@ import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; -import io.cloudevents.CloudEvent; -import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import java.util.ArrayList; import java.util.Collections; @@ -25,18 +40,23 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import io.cloudevents.CloudEvent; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.extern.slf4j.Slf4j; + /** * CloudEvent TCP subscribe client implementation. */ @Slf4j -public class CloudEventTCPSubClient extends TcpClient implements EventMeshTCPSubClient { +class CloudEventTCPSubClient extends TcpClient implements EventMeshTCPSubClient { - private final UserAgent userAgent; - private final List subscriptionItems = Collections.synchronizedList(new ArrayList<>()); + private final UserAgent userAgent; + private final List subscriptionItems = Collections.synchronizedList(new ArrayList<>()); private ReceiveMsgHook callback; - private ScheduledFuture task; + private ScheduledFuture task; - public CloudEventTCPSubClient(EventMeshTcpClientConfig eventMeshTcpClientConfig) { + public CloudEventTCPSubClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) { super(eventMeshTcpClientConfig); this.userAgent = eventMeshTcpClientConfig.getUserAgent(); } @@ -55,7 +75,7 @@ public void init() throws EventMeshException { @Override public void heartbeat() throws EventMeshException { if (task == null) { - synchronized (EventMeshMessageTCPSubClient.class) { + synchronized (CloudEventTCPSubClient.class) { task = scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java new file mode 100644 index 0000000000..ceaa4ce655 --- /dev/null +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.client.tcp.impl.eventmeshmessage; + +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; +import org.apache.eventmesh.client.tcp.EventMeshTCPPubClient; +import org.apache.eventmesh.client.tcp.EventMeshTCPSubClient; +import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; +import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.SubscriptionType; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; +import org.apache.eventmesh.common.protocol.tcp.Package; + +public class EventMeshMessageTCPClient implements EventMeshTCPClient { + + private final EventMeshTCPPubClient eventMeshMessageTCPPubClient; + private final EventMeshTCPSubClient eventMeshMessageTCPSubClient; + + public EventMeshMessageTCPClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) { + eventMeshMessageTCPPubClient = new EventMeshMessageTCPPubClient(eventMeshTcpClientConfig); + eventMeshMessageTCPSubClient = new EventMeshMessageTCPSubClient(eventMeshTcpClientConfig); + } + + @Override + public void init() throws EventMeshException { + eventMeshMessageTCPPubClient.init(); + eventMeshMessageTCPSubClient.init(); + } + + @Override + public Package rr(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { + return eventMeshMessageTCPPubClient.rr(eventMeshMessage, timeout); + } + + @Override + public void asyncRR(EventMeshMessage eventMeshMessage, AsyncRRCallback callback, long timeout) + throws EventMeshException { + eventMeshMessageTCPPubClient.asyncRR(eventMeshMessage, callback, timeout); + } + + @Override + public Package publish(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { + return eventMeshMessageTCPPubClient.publish(eventMeshMessage, timeout); + } + + @Override + public void broadcast(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { + eventMeshMessageTCPPubClient.broadcast(eventMeshMessage, timeout); + } + + @Override + public void heartbeat() throws EventMeshException { + eventMeshMessageTCPPubClient.heartbeat(); + } + + @Override + public void listen() throws EventMeshException { + eventMeshMessageTCPSubClient.listen(); + } + + @Override + public void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) + throws EventMeshException { + eventMeshMessageTCPSubClient.subscribe(topic, subscriptionMode, subscriptionType); + } + + @Override + public void unsubscribe() throws EventMeshException { + eventMeshMessageTCPSubClient.unsubscribe(); + } + + @Override + public void registerPubBusiHandler(ReceiveMsgHook handler) throws EventMeshException { + eventMeshMessageTCPPubClient.registerBusiHandler(handler); + } + + @Override + public void registerSubBusiHandler(ReceiveMsgHook handler) throws EventMeshException { + eventMeshMessageTCPSubClient.registerBusiHandler(handler); + } + + @Override + public void close() throws EventMeshException { + try (final EventMeshTCPPubClient eventMeshTCPPubClient = eventMeshMessageTCPPubClient; + final EventMeshTCPSubClient eventMeshTCPSubClient = eventMeshMessageTCPSubClient) { + // close client + } + } + + @Override + public EventMeshTCPPubClient getPubClient() { + return eventMeshMessageTCPPubClient; + } + + @Override + public EventMeshTCPSubClient getSubClient() { + return eventMeshMessageTCPSubClient; + } +} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java index baa50137c6..794cea66e5 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.client.tcp.impl.eventmeshmessage; import org.apache.eventmesh.client.tcp.EventMeshTCPPubClient; @@ -8,7 +25,7 @@ import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.common.RequestContext; import org.apache.eventmesh.client.tcp.common.TcpClient; -import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; @@ -28,7 +45,7 @@ * EventMeshMessage TCP publish client implementation. */ @Slf4j -public class EventMeshMessageTCPPubClient extends TcpClient implements EventMeshTCPPubClient { +class EventMeshMessageTCPPubClient extends TcpClient implements EventMeshTCPPubClient { private final UserAgent userAgent; @@ -37,7 +54,7 @@ public class EventMeshMessageTCPPubClient extends TcpClient implements EventMesh private final ConcurrentHashMap callbackConcurrentHashMap = new ConcurrentHashMap<>(); private ScheduledFuture task; - public EventMeshMessageTCPPubClient(EventMeshTcpClientConfig eventMeshTcpClientConfig) { + public EventMeshMessageTCPPubClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) { super(eventMeshTcpClientConfig); this.userAgent = eventMeshTcpClientConfig.getUserAgent(); } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java index 5891e4b615..7939066e92 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.client.tcp.impl.eventmeshmessage; import org.apache.eventmesh.client.tcp.EventMeshTCPSubClient; @@ -6,7 +23,7 @@ import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.common.RequestContext; import org.apache.eventmesh.client.tcp.common.TcpClient; -import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; @@ -29,14 +46,14 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class EventMeshMessageTCPSubClient extends TcpClient implements EventMeshTCPSubClient { +class EventMeshMessageTCPSubClient extends TcpClient implements EventMeshTCPSubClient { private final UserAgent userAgent; private final List subscriptionItems = Collections.synchronizedList(new ArrayList<>()); private ReceiveMsgHook callback; private ScheduledFuture task; - public EventMeshMessageTCPSubClient(EventMeshTcpClientConfig eventMeshTcpClientConfig) { + public EventMeshMessageTCPSubClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) { super(eventMeshTcpClientConfig); this.userAgent = eventMeshTcpClientConfig.getUserAgent(); } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java new file mode 100644 index 0000000000..6381b6f0ed --- /dev/null +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.client.tcp.impl.openmessage; + +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; +import org.apache.eventmesh.client.tcp.EventMeshTCPPubClient; +import org.apache.eventmesh.client.tcp.EventMeshTCPSubClient; +import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; +import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.SubscriptionType; +import org.apache.eventmesh.common.protocol.tcp.Package; + +import io.openmessaging.api.Message; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class OpenMessageTCPClient implements EventMeshTCPClient { + + private final EventMeshTCPPubClient eventMeshTCPPubClient; + private final EventMeshTCPSubClient eventMeshTCPSubClient; + + public OpenMessageTCPClient(EventMeshTCPClientConfig eventMeshTCPClientConfig) { + eventMeshTCPPubClient = new OpenMessageTCPPubClient(eventMeshTCPClientConfig); + eventMeshTCPSubClient = new OpenMessageTCPSubClient(eventMeshTCPClientConfig); + } + + @Override + public void init() throws EventMeshException { + eventMeshTCPPubClient.init(); + eventMeshTCPSubClient.init(); + } + + @Override + public Package rr(Message openMessage, long timeout) throws EventMeshException { + return eventMeshTCPPubClient.rr(openMessage, timeout); + } + + @Override + public void asyncRR(Message openMessage, AsyncRRCallback callback, long timeout) throws EventMeshException { + eventMeshTCPPubClient.asyncRR(openMessage, callback, timeout); + } + + @Override + public Package publish(Message openMessage, long timeout) throws EventMeshException { + return eventMeshTCPPubClient.publish(openMessage, timeout); + } + + @Override + public void broadcast(Message openMessage, long timeout) throws EventMeshException { + eventMeshTCPPubClient.broadcast(openMessage, timeout); + } + + @Override + public void heartbeat() throws EventMeshException { + eventMeshTCPPubClient.heartbeat(); + eventMeshTCPSubClient.heartbeat(); + } + + @Override + public void listen() throws EventMeshException { + eventMeshTCPSubClient.listen(); + } + + @Override + public void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) + throws EventMeshException { + eventMeshTCPSubClient.subscribe(topic, subscriptionMode, subscriptionType); + } + + @Override + public void unsubscribe() throws EventMeshException { + eventMeshTCPSubClient.unsubscribe(); + } + + @Override + public void registerPubBusiHandler(ReceiveMsgHook handler) throws EventMeshException { + eventMeshTCPPubClient.registerBusiHandler(handler); + } + + @Override + public void registerSubBusiHandler(ReceiveMsgHook handler) throws EventMeshException { + eventMeshTCPSubClient.registerBusiHandler(handler); + } + + @Override + public void close() throws EventMeshException { + try (final EventMeshTCPPubClient pubClient = eventMeshTCPPubClient; + final EventMeshTCPSubClient subClient = eventMeshTCPSubClient) { + log.info("Close OpenMessageTCPClient"); + } + } + + @Override + public EventMeshTCPPubClient getPubClient() { + return eventMeshTCPPubClient; + } + + @Override + public EventMeshTCPSubClient getSubClient() { + return eventMeshTCPSubClient; + } +} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java index 80e0fbb85a..7e3d40ddde 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java @@ -1,17 +1,40 @@ -package org.apache.eventmesh.client.tcp.impl.openmessage; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import io.openmessaging.api.Message; -import lombok.extern.slf4j.Slf4j; +package org.apache.eventmesh.client.tcp.impl.openmessage; import org.apache.eventmesh.client.tcp.EventMeshTCPPubClient; import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; + +import io.openmessaging.api.Message; +import lombok.extern.slf4j.Slf4j; @Slf4j -public class OpenMessageTCPPubClient implements EventMeshTCPPubClient { +class OpenMessageTCPPubClient implements EventMeshTCPPubClient { + + private final EventMeshTCPClientConfig eventMeshTCPClientConfig; + + public OpenMessageTCPPubClient(final EventMeshTCPClientConfig eventMeshTCPClientConfig) { + this.eventMeshTCPClientConfig = eventMeshTCPClientConfig; + } @Override public void init() throws EventMeshException { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java index c4ae11c0b8..c0de8b14bd 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java @@ -1,17 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.client.tcp.impl.openmessage; import org.apache.eventmesh.client.tcp.EventMeshTCPSubClient; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; import io.openmessaging.api.Message; import lombok.extern.slf4j.Slf4j; @Slf4j -public class OpenMessageTCPSubClient implements EventMeshTCPSubClient { +class OpenMessageTCPSubClient implements EventMeshTCPSubClient { + + private final EventMeshTCPClientConfig eventMeshTCPClientConfig; + + public OpenMessageTCPSubClient(EventMeshTCPClientConfig eventMeshTCPClientConfig) { + this.eventMeshTCPClientConfig = eventMeshTCPClientConfig; + } + @Override public void init() throws EventMeshException { diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java index f6397b8c99..79093e76b3 100644 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java +++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java @@ -130,7 +130,7 @@ private static EventMeshMessage generateAsyncEventMqMsg() { return mqMsg; } - private static EventMeshMessage generateBroadcastMqMsg() { + public static EventMeshMessage generateBroadcastMqMsg() { EventMeshMessage mqMsg = new EventMeshMessage(); mqMsg.setTopic(TOPIC_PRX_WQ2ClientBroadCast); mqMsg.getProperties().put("REPLY_TO", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI"); diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncPublish.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncPublish.java deleted file mode 100644 index 01ccbfb6b0..0000000000 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncPublish.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.client.tcp.demo; - -import org.apache.eventmesh.client.tcp.EventMeshTCPClient; -import org.apache.eventmesh.client.tcp.common.EventMeshCommon; -import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils; -import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshTCPClient; -import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AsyncPublish { - - public static Logger logger = LoggerFactory.getLogger(AsyncPublish.class); - - private static EventMeshTCPClient client; - - public static AsyncPublish handler = new AsyncPublish(); - - public static void main(String[] agrs) throws Exception { - try { - UserAgent userAgent = EventMeshTestUtils.generateClient1(); - client = new DefaultEventMeshTCPClient("127.0.0.1", 10002, userAgent); - client.init(); - client.heartbeat(); - - for (int i = 0; i < 5; i++) { - Package asyncMsg = EventMeshTestUtils.asyncMessage(); - logger.info("begin send async msg[{}]==================={}", i, asyncMsg); - client.publish(asyncMsg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - - Thread.sleep(1000); - } - - Thread.sleep(2000); - // release resource and close client - // client.close(); - } catch (Exception e) { - logger.warn("AsyncPublish failed", e); - } - } -} diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncPublishBroadcast.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncPublishBroadcast.java deleted file mode 100644 index e674c02ee8..0000000000 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncPublishBroadcast.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.client.tcp.demo; - -import org.apache.eventmesh.client.tcp.EventMeshTCPClient; -import org.apache.eventmesh.client.tcp.common.EventMeshCommon; -import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils; -import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshTCPClient; -import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AsyncPublishBroadcast { - - public static Logger logger = LoggerFactory.getLogger(AsyncPublishBroadcast.class); - - private static EventMeshTCPClient client; - - public static void main(String[] agrs) throws Exception { - try { - UserAgent userAgent = EventMeshTestUtils.generateClient1(); - client = new DefaultEventMeshTCPClient("127.0.0.1", 10002, userAgent); - client.init(); - client.heartbeat(); - - Package broadcastMsg = EventMeshTestUtils.broadcastMessage(); - logger.info("begin send broadcast msg============={}", broadcastMsg); - client.broadcast(broadcastMsg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - - Thread.sleep(2000); - // release resource and close client - // client.close(); - } catch (Exception e) { - logger.warn("AsyncPublishBroadcast failed", e); - } - } -} diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java deleted file mode 100644 index 0652826d58..0000000000 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.client.tcp.demo; - - -import io.netty.channel.ChannelHandlerContext; - -import org.apache.eventmesh.client.tcp.EventMeshTCPClient; -import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils; -import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshTCPClient; -import org.apache.eventmesh.common.protocol.SubscriptionType; -import org.apache.eventmesh.common.protocol.SubscriptionMode; -import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; -import org.apache.eventmesh.common.protocol.tcp.Package; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AsyncSubscribe implements ReceiveMsgHook { - - public static Logger logger = LoggerFactory.getLogger(AsyncSubscribe.class); - - private static EventMeshTCPClient client; - - public static AsyncSubscribe handler = new AsyncSubscribe(); - - public static void main(String[] agrs) throws Exception { - try { - UserAgent userAgent = EventMeshTestUtils.generateClient2(); - client = new DefaultEventMeshTCPClient("127.0.0.1", 10002, userAgent); - client.init(); - client.heartbeat(); - - client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); - client.registerSubBusiHandler(handler); - - client.listen(); - - //client.unsubscribe(); - - // release resource and close client - // client.close(); - } catch (Exception e) { - logger.warn("AsyncSubscribe failed", e); - } - } - - @Override - public void handle(Package msg, ChannelHandlerContext ctx) { - EventMeshMessage eventMeshMessage = convertToProtocolMessage(msg); - logger.info("receive async msg====================={}", eventMeshMessage); - } - - @Override - public EventMeshMessage convertToProtocolMessage(Package pkg) { - return (EventMeshMessage) pkg.getBody(); - } -} diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java deleted file mode 100644 index 1ab68e543d..0000000000 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.client.tcp.demo; - -import org.apache.eventmesh.client.tcp.EventMeshTCPClient; -import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils; -import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshTCPClient; -import org.apache.eventmesh.common.protocol.SubscriptionMode; -import org.apache.eventmesh.common.protocol.SubscriptionType; -import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; -import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; - -import io.netty.channel.ChannelHandlerContext; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class AsyncSubscribeBroadcast implements ReceiveMsgHook { - - private static EventMeshTCPClient client; - - public static AsyncSubscribeBroadcast handler = new AsyncSubscribeBroadcast(); - - public static void main(String[] agrs) { - try { - UserAgent userAgent = EventMeshTestUtils.generateClient2(); - client = new DefaultEventMeshTCPClient("127.0.0.1", 10002, userAgent); - client.init(); - client.heartbeat(); - - client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC); - client.registerSubBusiHandler(handler); - - client.listen(); - - } catch (Exception e) { - log.warn("AsyncSubscribeBroadcast failed", e); - } - } - - @Override - public void handle(Package msg, ChannelHandlerContext ctx) { - EventMeshMessage eventMeshMessage = convertToProtocolMessage(msg); - log.info("receive broadcast msg==============={}", eventMeshMessage); - } - - @Override - public EventMeshMessage convertToProtocolMessage(Package pkg) { - return (EventMeshMessage) pkg.getBody(); - } -} diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncRequest.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncRequest.java deleted file mode 100644 index 7b6b4cadc4..0000000000 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncRequest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.client.tcp.demo; - -import org.apache.eventmesh.client.tcp.common.EventMeshCommon; -import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils; -import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; -import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPPubClient; -import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; -import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SyncRequest { - - public static Logger logger = LoggerFactory.getLogger(SyncRequest.class); - - private static EventMeshMessageTCPPubClient client; - - public static void main(String[] agrs) { - try { - UserAgent userAgent = EventMeshTestUtils.generateClient1(); - EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder() - .host("127.0.0.1") - .port(10000) - .userAgent(userAgent) - .build(); - client = new EventMeshMessageTCPPubClient(eventMeshTcpClientConfig); - client.init(); - client.heartbeat(); - - EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateSyncRRMqMsg(); - logger.info("begin send rr msg=================={}", eventMeshMessage); - Package response = client.rr(eventMeshMessage, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - logger.info("receive rr reply==================={}", response); - - // release resource and close client - // client.close(); - } catch (Exception e) { - logger.warn("SyncRequest failed", e); - } - } -} diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java deleted file mode 100644 index 65d921dc89..0000000000 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.client.tcp.demo; - -import org.apache.eventmesh.client.tcp.EventMeshTCPClient; -import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils; -import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshTCPClient; -import org.apache.eventmesh.common.protocol.SubscriptionMode; -import org.apache.eventmesh.common.protocol.SubscriptionType; -import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; -import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; - -import io.netty.channel.ChannelHandlerContext; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class SyncResponse implements ReceiveMsgHook { - - private static EventMeshTCPClient client; - - public static SyncResponse handler = new SyncResponse(); - - public static void main(String[] agrs) { - try { - UserAgent userAgent = EventMeshTestUtils.generateClient2(); - client = new DefaultEventMeshTCPClient("127.0.0.1", 10000, userAgent); - client.init(); - client.heartbeat(); - - client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC); - // Synchronize RR messages - client.registerSubBusiHandler(handler); - - client.listen(); - - } catch (Exception e) { - log.warn("SyncResponse failed", e); - } - } - - @Override - public void handle(Package msg, ChannelHandlerContext ctx) { - log.info("receive sync rr msg================{}", msg); - Package pkg = EventMeshTestUtils.rrResponse(msg); - ctx.writeAndFlush(pkg); - } - - @Override - public EventMeshMessage convertToProtocolMessage(Package pkg) { - return null; - } -} diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/impl/EventMeshTCPClientFactoryTest.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/impl/EventMeshTCPClientFactoryTest.java new file mode 100644 index 0000000000..193251f606 --- /dev/null +++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/impl/EventMeshTCPClientFactoryTest.java @@ -0,0 +1,36 @@ +package org.apache.eventmesh.client.tcp.impl; + +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.client.tcp.impl.cloudevent.CloudEventTCPClient; +import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPClient; +import org.apache.eventmesh.client.tcp.impl.openmessage.OpenMessageTCPClient; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; + +import org.junit.Assert; +import org.junit.Test; + +import io.cloudevents.CloudEvent; +import io.openmessaging.api.Message; + +public class EventMeshTCPClientFactoryTest { + + @Test + public void createEventMeshTCPClient() { + EventMeshTCPClientConfig meshTCPClientConfig = EventMeshTCPClientConfig.builder() + .host("localhost") + .port(1234) + .build(); + EventMeshTCPClient eventMeshMessageTCPClient = + EventMeshTCPClientFactory.createEventMeshTCPClient(meshTCPClientConfig, EventMeshMessage.class); + Assert.assertEquals(EventMeshMessageTCPClient.class, eventMeshMessageTCPClient.getClass()); + + EventMeshTCPClient cloudEventTCPClient = + EventMeshTCPClientFactory.createEventMeshTCPClient(meshTCPClientConfig, CloudEvent.class); + Assert.assertEquals(CloudEventTCPClient.class, cloudEventTCPClient.getClass()); + + EventMeshTCPClient openMessageTCPClient = + EventMeshTCPClientFactory.createEventMeshTCPClient(meshTCPClientConfig, Message.class); + Assert.assertEquals(OpenMessageTCPClient.class, openMessageTCPClient.getClass()); + } +} \ No newline at end of file