diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java index b7b6995a49..8c3a06f513 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java @@ -17,11 +17,15 @@ package org.apache.eventmesh.tcp.common; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.v1.CloudEventV1; import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER; import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_SyncSubscribeTest; import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientBroadCast; import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast; +import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.client.tcp.common.MessageUtils; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; @@ -29,6 +33,9 @@ import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; public class EventMeshTestUtils { @@ -155,4 +162,16 @@ private static String generateRandomString(int length) { } return builder.toString(); } + + public static CloudEvent generateCloudEventV1() { + CloudEvent event = CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSubject(TOPIC_PRX_WQ2ClientBroadCast) + .withSource(URI.create("/")) + .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) + .withData("testAsyncMessage".getBytes(StandardCharsets.UTF_8)) + .withExtension("ttl", "30000") + .build(); + return event; + } } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java new file mode 100644 index 0000000000..f279308a53 --- /dev/null +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.tcp.demo.pub.cloudevents; + +import io.cloudevents.CloudEvent; + +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; +import org.apache.eventmesh.client.tcp.common.EventMeshCommon; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; +import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.tcp.common.EventMeshTestUtils; +import org.apache.eventmesh.util.Utils; + +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AsyncPublish { + + public static Logger logger = LoggerFactory.getLogger(AsyncPublish.class); + + private static EventMeshTCPClient client; + + public static AsyncPublish handler = new AsyncPublish(); + + public static void main(String[] agrs) throws Exception { + Properties properties = Utils.readPropertiesFile("application.properties"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port")); + try { + UserAgent userAgent = EventMeshTestUtils.generateClient1(); + EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder() + .host(eventMeshIp) + .port(eventMeshTcpPort) + .userAgent(userAgent) + .build(); + client = + EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class); + client.init(); + client.heartbeat(); + + for (int i = 0; i < 5; i++) { + CloudEvent event = EventMeshTestUtils.generateCloudEventV1(); + + logger.info("begin send async msg[{}]==================={}", i, event); + client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + + Thread.sleep(1000); + } + Thread.sleep(2000); + } catch (Exception e) { + logger.warn("AsyncPublish failed", e); + } + } +} diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java new file mode 100644 index 0000000000..b94d12fb44 --- /dev/null +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.tcp.demo.sub.cloudevents; + +import org.apache.eventmesh.client.tcp.EventMeshTCPClient; +import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory; +import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.SubscriptionType; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; +import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.tcp.common.EventMeshTestUtils; +import org.apache.eventmesh.util.Utils; + +import java.util.Properties; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AsyncSubscribe implements ReceiveMsgHook { + + public static AsyncSubscribe handler = new AsyncSubscribe(); + + private static EventMeshTCPClient client; + + public static void main(String[] agrs) throws Exception { + Properties properties = Utils.readPropertiesFile("application.properties"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port")); + UserAgent userAgent = EventMeshTestUtils.generateClient2(); + EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder() + .host(eventMeshIp) + .port(eventMeshTcpPort) + .userAgent(userAgent) + .build(); + try { + client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class); + client.init(); + client.heartbeat(); + + client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); + client.registerSubBusiHandler(handler); + + client.listen(); + + //client.unsubscribe(); + + // release resource and close client + // client.close(); + + } catch (Exception e) { + log.warn("AsyncSubscribe failed", e); + } + } + + @Override + public void handle(Package msg, ChannelHandlerContext ctx) { + CloudEvent event = convertToProtocolMessage(msg); + log.info("receive async msg====================={}", event); + } + + @Override + public CloudEvent convertToProtocolMessage(Package pkg) { + return CloudEventBuilder.from((CloudEvent) pkg.getBody()).build(); + } +}