diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/common/ExampleConstants.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/common/ExampleConstants.java index 04b32c6277..2befeebc5d 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/common/ExampleConstants.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/common/ExampleConstants.java @@ -20,12 +20,26 @@ public class ExampleConstants { public static final String CONFIG_FILE_NAME = "application.properties"; + public static final String CLOUDEVENT_CONTENT_TYPE = "application/cloudevents+json"; public static final String EVENTMESH_IP = "eventmesh.ip"; - public static final String EVENTMESH_HTTP_PORT = "eventmesh.http.port"; - public static final String EVENTMESH_TCP_PORT = "eventmesh.tcp.port"; - public static final String EVENTMESH_GRPC_PORT = "eventmesh.grpc.port"; + + public static final String DEFAULT_EVENTMESH_IP = "127.0.0.1"; + public static final String DEFAULT_EVENTMESH_IP_PORT = "127.0.0.1:10105"; + + public static final String EVENTMESH_GRPC_ASYNC_TEST_TOPIC = "TEST-TOPIC-GRPC-ASYNC"; + public static final String EVENTMESH_GRPC_RR_TEST_TOPIC = "TEST-TOPIC-GRPC-RR"; + public static final String EVENTMESH_GRPC_BROADCAT_TEST_TOPIC = "TEST-TOPIC-GRPC-BROADCAST"; + public static final String EVENTMESH_HTTP_ASYNC_TEST_TOPIC = "TEST-TOPIC-HTTP-ASYNC"; + public static final String EVENTMESH_HTTP_SYNC_TEST_TOPIC = "TEST-TOPIC-HTTP-SYNC"; + public static final String EVENTMESH_TCP_ASYNC_TEST_TOPIC = "TEST-TOPIC-TCP-ASYNC"; + public static final String EVENTMESH_TCP_SYNC_TEST_TOPIC = "TEST-TOPIC-TCP-SYNC"; + public static final String EVENTMESH_TCP_BROADCAST_TEST_TOPIC = "TEST-TOPIC-TCP-BROADCAST"; + + public static final String DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP = "EventMeshTest-producerGroup"; + public static final String DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP = "EventMeshTest-consumerGroup"; + } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java index e2a6262e2d..54b63f9f8f 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java @@ -48,12 +48,10 @@ public static void main(String[] args) throws Exception { final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT); - final String topic = "TEST-TOPIC-GRPC-ASYNC"; - EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .producerGroup("EventMeshTest-producerGroup") + .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP) .env("env").idc("idc") .sys("1234").build(); @@ -68,9 +66,9 @@ public static void main(String[] args) throws Exception { for (int i = 0; i < 5; i++) { CloudEvent event = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) - .withSubject(topic) + .withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC) .withSource(URI.create("/")) - .withDataContentType("application/cloudevents+json") + .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE) .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8)) .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)) diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java index 0f7139144d..ba5dbf3c16 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java @@ -49,12 +49,10 @@ public static void main(String[] args) throws Exception { final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT); - final String topic = "TEST-TOPIC-GRPC-ASYNC"; - EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .producerGroup("EventMeshTest-producerGroup") + .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP) .env("env").idc("idc") .sys("1234").build(); @@ -68,9 +66,9 @@ public static void main(String[] args) throws Exception { for (int i = 0; i < messageSize; i++) { CloudEvent event = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) - .withSubject(topic) + .withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC) .withSource(URI.create("/")) - .withDataContentType("application/cloudevents+json") + .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE) .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8)) .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)) diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestInstance.java index e6a2efb18b..3e1eb929ed 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestInstance.java @@ -49,12 +49,10 @@ public static void main(String[] args) throws Exception { final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT); - final String topic = "TEST-TOPIC-GRPC-RR"; - EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .producerGroup("EventMeshTest-producerGroup") + .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP) .env("env").idc("idc") .sys("1234").build(); @@ -68,9 +66,9 @@ public static void main(String[] args) throws Exception { for (int i = 0; i < messageSize; i++) { CloudEvent event = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) - .withSubject(topic) + .withSubject(ExampleConstants.EVENTMESH_GRPC_RR_TEST_TOPIC) .withSource(URI.create("/")) - .withDataContentType("application/cloudevents+json") + .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE) .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8)) .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)) diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishBroadcast.java index ef321e57c3..982aac9e22 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishBroadcast.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishBroadcast.java @@ -44,12 +44,10 @@ public static void main(String[] args) throws Exception { final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT); - final String topic = "TEST-TOPIC-GRPC-BROADCAST"; - EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .producerGroup("EventMeshTest-producerGroup") + .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP) .env("env").idc("idc") .sys("1234").build(); @@ -63,7 +61,7 @@ public static void main(String[] args) throws Exception { for (int i = 0; i < messageSize; i++) { EventMeshMessage eventMeshMessage = EventMeshMessage.builder() .content(JsonUtils.serialize(content)) - .topic(topic) + .topic(ExampleConstants.EVENTMESH_GRPC_BROADCAT_TEST_TOPIC) .uniqueId(RandomStringUtils.generateNum(30)) .bizSeqNo(RandomStringUtils.generateNum(30)) .build() diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java index 6d69d0960e..d8ee6a2e52 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java @@ -44,12 +44,10 @@ public static void main(String[] args) throws Exception { final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT); - final String topic = "TEST-TOPIC-GRPC-ASYNC"; - EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .producerGroup("EventMeshTest-producerGroup") + .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP) .env("env").idc("idc") .sys("1234").build(); @@ -63,7 +61,7 @@ public static void main(String[] args) throws Exception { for (int i = 0; i < messageSize; i++) { EventMeshMessage eventMeshMessage = EventMeshMessage.builder() .content(JsonUtils.serialize(content)) - .topic(topic) + .topic(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC) .uniqueId(RandomStringUtils.generateNum(30)) .bizSeqNo(RandomStringUtils.generateNum(30)) .build() diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java index 04555420d7..e7e7a17548 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java @@ -43,12 +43,10 @@ public static void main(String[] args) throws Exception { final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT); - final String topic = "TEST-TOPIC-GRPC-ASYNC"; - EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .producerGroup("EventMeshTest-producerGroup") + .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP) .env("env").idc("idc") .sys("1234").build(); @@ -62,7 +60,7 @@ public static void main(String[] args) throws Exception { List messageList = new ArrayList<>(); for (int i = 0; i < 5; i++) { EventMeshMessage message = EventMeshMessage.builder() - .topic(topic) + .topic(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC) .content((JsonUtils.serialize(content))) .uniqueId(RandomStringUtils.generateNum(30)) .bizSeqNo(RandomStringUtils.generateNum(30)) diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java index 17da61c4b9..b683913405 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java @@ -45,12 +45,10 @@ public static void main(String[] args) throws Exception { final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT); - final String topic = "TEST-TOPIC-GRPC-RR"; - EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .producerGroup("EventMeshTest-producerGroup") + .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP) .env("env").idc("idc") .sys("1234").build(); @@ -64,7 +62,7 @@ public static void main(String[] args) throws Exception { for (int i = 0; i < messageSize; i++) { EventMeshMessage eventMeshMessage = EventMeshMessage.builder() .content(JsonUtils.serialize(content)) - .topic(topic) + .topic(ExampleConstants.EVENTMESH_GRPC_RR_TEST_TOPIC) .uniqueId(RandomStringUtils.generateNum(30)) .bizSeqNo(RandomStringUtils.generateNum(30)) .build() diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java index ab3e2fc60f..a409389c82 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java @@ -45,21 +45,18 @@ public static void main(String[] args) throws InterruptedException { final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT); - final String topic = "TEST-TOPIC-GRPC-ASYNC"; - EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .consumerGroup("EventMeshTest-consumerGroup") + .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP) .env("env").idc("idc") .sys("1234").build(); org.apache.eventmesh.common.protocol.SubscriptionItem subscriptionItem = new SubscriptionItem(); - subscriptionItem.setTopic(topic); + subscriptionItem.setTopic(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC); subscriptionItem.setMode(SubscriptionMode.CLUSTERING); subscriptionItem.setType(SubscriptionType.ASYNC); - EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig); eventMeshGrpcConsumer.init(); @@ -74,7 +71,7 @@ public static void main(String[] args) throws InterruptedException { @Override public Optional handle(CloudEvent msg) { - log.info("receive async msg====================={}", msg); + log.info("receive async msg: {}", msg); return Optional.empty(); } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java index e7cdf8676d..4a9b2aa981 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java @@ -45,21 +45,18 @@ public static void main(String[] args) throws InterruptedException { final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT); - final String topic = "TEST-TOPIC-GRPC-RR"; - EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .consumerGroup("EventMeshTest-consumerGroup") + .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP) .env("env").idc("idc") .sys("1234").build(); SubscriptionItem subscriptionItem = new SubscriptionItem(); - subscriptionItem.setTopic(topic); + subscriptionItem.setTopic(ExampleConstants.EVENTMESH_GRPC_RR_TEST_TOPIC); subscriptionItem.setMode(SubscriptionMode.CLUSTERING); subscriptionItem.setType(SubscriptionType.SYNC); - EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig); eventMeshGrpcConsumer.init(); @@ -74,7 +71,7 @@ public static void main(String[] args) throws InterruptedException { @Override public Optional handle(CloudEvent msg) { - log.info("receive request-reply msg====================={}", msg); + log.info("receive request-reply msg: {}", msg); if (msg != null) { return Optional.of(msg); } else { diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java index d6506f3633..b372e079a2 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java @@ -44,17 +44,15 @@ public static void main(String[] args) throws InterruptedException { final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT); - final String topic = "TEST-TOPIC-GRPC-ASYNC"; - EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .consumerGroup("EventMeshTest-consumerGroup") + .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP) .env("env").idc("idc") .sys("1234").build(); SubscriptionItem subscriptionItem = new SubscriptionItem(); - subscriptionItem.setTopic(topic); + subscriptionItem.setTopic(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC); subscriptionItem.setMode(SubscriptionMode.CLUSTERING); subscriptionItem.setType(SubscriptionType.ASYNC); @@ -72,7 +70,7 @@ public static void main(String[] args) throws InterruptedException { @Override public Optional handle(EventMeshMessage msg) { - log.info("receive async msg====================={}", msg); + log.info("receive async msg: {}", msg); return Optional.empty(); } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeBroadcast.java index 75cbc3856b..28753b4c6c 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeBroadcast.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeBroadcast.java @@ -44,17 +44,15 @@ public static void main(String[] args) throws InterruptedException { final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT); - final String topic = "TEST-TOPIC-GRPC-BROADCAST"; - EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .consumerGroup("EventMeshTest-consumerGroup") + .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP) .env("env").idc("idc") .sys("1234").build(); SubscriptionItem subscriptionItem = new SubscriptionItem(); - subscriptionItem.setTopic(topic); + subscriptionItem.setTopic(ExampleConstants.EVENTMESH_GRPC_BROADCAT_TEST_TOPIC); subscriptionItem.setMode(SubscriptionMode.BROADCASTING); subscriptionItem.setType(SubscriptionType.ASYNC); @@ -72,7 +70,7 @@ public static void main(String[] args) throws InterruptedException { @Override public Optional handle(EventMeshMessage msg) { - log.info("receive async broadcast msg====================={}", msg); + log.info("receive async broadcast msg: {}", msg); return Optional.empty(); } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeReply.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeReply.java index 6e70328e37..082dacb9f8 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeReply.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeReply.java @@ -44,17 +44,15 @@ public static void main(String[] args) throws InterruptedException { final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT); - final String topic = "TEST-TOPIC-GRPC-RR"; - EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .consumerGroup("EventMeshTest-consumerGroup") + .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP) .env("env").idc("idc") .sys("1234").build(); SubscriptionItem subscriptionItem = new SubscriptionItem(); - subscriptionItem.setTopic(topic); + subscriptionItem.setTopic(ExampleConstants.EVENTMESH_GRPC_RR_TEST_TOPIC); subscriptionItem.setMode(SubscriptionMode.CLUSTERING); subscriptionItem.setType(SubscriptionType.SYNC); @@ -72,7 +70,7 @@ public static void main(String[] args) throws InterruptedException { @Override public Optional handle(EventMeshMessage msg) { - log.info("receive request-reply msg====================={}", msg); + log.info("receive request-reply msg: {}", msg); if (msg != null) { return Optional.of(msg); } else { diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/service/SubService.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/service/SubService.java index 270825f921..8d3fe0d075 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/service/SubService.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/service/SubService.java @@ -69,14 +69,14 @@ public void afterPropertiesSet() throws Exception { EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr(eventMeshIp) .serverPort(Integer.parseInt(eventMeshGrpcPort)) - .consumerGroup("EventMeshTest-consumerGroup2") + .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP) .env(env).idc(idc) .sys(subsys).build(); eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig); eventMeshGrpcConsumer.init(); - subscriptionItem.setTopic("TEST-TOPIC-GRPC-ASYNC"); + subscriptionItem.setTopic(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC); subscriptionItem.setMode(SubscriptionMode.CLUSTERING); subscriptionItem.setType(SubscriptionType.ASYNC); @@ -115,8 +115,8 @@ public void cleanup() { * Count the message already consumed */ public void consumeMessage(String msg) { - logger.info("consume message {}", msg); + logger.info("consume message: {}", msg); countDownLatch.countDown(); - logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount()); + logger.info("remaining number of messages to be consumed: {}", countDownLatch.getCount()); } } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java index e4cd10f0d1..acf1ab5146 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java @@ -21,6 +21,7 @@ import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.ExampleConstants; import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.ThreadUtils; @@ -42,40 +43,24 @@ @Slf4j public class AsyncPublishInstance { - - // This messageSize is also used in SubService.java (Subscriber) + public static final int MESSAGE_SIZE = 1; - - public static final String DEFAULT_IP_PORT = "127.0.0.1:10105"; - - public static final String FILE_NAME = "application.properties"; - - public static final String IP_KEY = "eventmesh.ip"; - - public static final String PORT_KEY = "eventmesh.http.port"; - - public static final String TEST_TOPIC = "TEST-TOPIC-HTTP-ASYNC"; - - public static final String TEST_GROUP = "EventMeshTest-producerGroup"; - - public static final String CONTENT_TYPE = "application/cloudevents+json"; - - + public static void main(String[] args) throws Exception { - Properties properties = Utils.readPropertiesFile(FILE_NAME); - final String eventMeshIp = properties.getProperty(IP_KEY); - final String eventMeshHttpPort = properties.getProperty(PORT_KEY); + Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME); + final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP); + final String eventMeshHttpPort = properties.getProperty(ExampleConstants.EVENTMESH_HTTP_PORT); // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105 - String eventMeshIPPort = DEFAULT_IP_PORT; + String eventMeshIPPort = ExampleConstants.DEFAULT_EVENTMESH_IP_PORT; if (StringUtils.isNotBlank(eventMeshIp) || StringUtils.isNotBlank(eventMeshHttpPort)) { eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort; } EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder() .liteEventMeshAddr(eventMeshIPPort) - .producerGroup(TEST_GROUP) + .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP) .env("env") .idc("idc") .ip(IPUtils.getLocalAddress()) @@ -92,15 +77,15 @@ public static void main(String[] args) throws Exception { CloudEvent event = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) - .withSubject(TEST_TOPIC) + .withSubject(ExampleConstants.EVENTMESH_HTTP_ASYNC_TEST_TOPIC) .withSource(URI.create("/")) - .withDataContentType(CONTENT_TYPE) + .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE) .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8)) .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)) .build(); eventMeshHttpProducer.publish(event); - log.info("publish event success content:{}", content); + log.info("publish event success content: {}", content); } Thread.sleep(30000); } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java index 4415b91f68..dd3964b732 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java @@ -50,16 +50,14 @@ public static void main(String[] args) throws Exception { final String eventMeshIPPort; if (StringUtils.isBlank(eventMeshIp) || StringUtils.isBlank(eventMeshHttpPort)) { // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105 - eventMeshIPPort = "127.0.0.1:10105"; + eventMeshIPPort = ExampleConstants.DEFAULT_EVENTMESH_IP_PORT; } else { eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort; } - final String topic = "TEST-TOPIC-HTTP-ASYNC"; - EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder() .liteEventMeshAddr(eventMeshIPPort) - .producerGroup("EventMeshTest-producerGroup") + .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP) .env("env") .idc("idc") .ip(IPUtils.getLocalAddress()) @@ -77,7 +75,7 @@ public static void main(String[] args) throws Exception { EventMeshMessage eventMeshMessage = EventMeshMessage.builder() .bizSeqNo(RandomStringUtils.generateNum(30)) .content(JsonUtils.serialize(content)) - .topic(topic) + .topic(ExampleConstants.EVENTMESH_HTTP_ASYNC_TEST_TOPIC) .uniqueId(RandomStringUtils.generateNum(30)) .build() .addProp(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncSyncRequestInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncSyncRequestInstance.java index b6ef3e580f..c3f0ff4f87 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncSyncRequestInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncSyncRequestInstance.java @@ -48,15 +48,14 @@ public static void main(String[] args) throws Exception { EventMeshHttpProducer eventMeshHttpProducer = null; try { String eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort; - final String topic = "TEST-TOPIC-TCP-ASYNC"; if (StringUtils.isBlank(eventMeshIPPort)) { // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105 - eventMeshIPPort = "127.0.0.1:10105"; + eventMeshIPPort = ExampleConstants.DEFAULT_EVENTMESH_IP_PORT; } EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder() .liteEventMeshAddr(eventMeshIPPort) - .producerGroup("EventMeshTest-producerGroup") + .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP) .env("env") .idc("idc") .ip(IPUtils.getLocalAddress()) @@ -69,13 +68,13 @@ public static void main(String[] args) throws Exception { final EventMeshMessage eventMeshMessage = EventMeshMessage.builder() .bizSeqNo(RandomStringUtils.generateNum(30)) .content("testAsyncMessage") - .topic(topic) + .topic(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC) .uniqueId(RandomStringUtils.generateNum(30)).build(); eventMeshHttpProducer.request(eventMeshMessage, new RRCallback() { @Override public void onSuccess(EventMeshMessage o) { - log.debug("sendmsg : {}, return : {}, cost:{}ms", eventMeshMessage.getContent(), o.getContent(), + log.debug("sendmsg: {}, return: {}, cost: {} ms", eventMeshMessage.getContent(), o.getContent(), System.currentTimeMillis() - startTime); } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/SyncRequestInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/SyncRequestInstance.java index e61307ccad..a9140cb624 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/SyncRequestInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/SyncRequestInstance.java @@ -20,6 +20,7 @@ import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig; import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer; import org.apache.eventmesh.common.EventMeshMessage; +import org.apache.eventmesh.common.ExampleConstants; import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.RandomStringUtils; import org.apache.eventmesh.common.utils.ThreadUtils; @@ -36,8 +37,9 @@ public class SyncRequestInstance { public static void main(String[] args) throws Exception { EventMeshHttpProducer eventMeshHttpProducer = null; - String eventMeshIPPort = "127.0.0.1:10105"; - String topic = "EventMesh.SyncRequestInstance"; + String eventMeshIPPort = ExampleConstants.DEFAULT_EVENTMESH_IP_PORT; + String topic = ExampleConstants.EVENTMESH_HTTP_SYNC_TEST_TOPIC; + try { if (args.length > 0 && StringUtils.isNotBlank(args[0])) { eventMeshIPPort = args[0]; @@ -48,12 +50,12 @@ public static void main(String[] args) throws Exception { if (StringUtils.isBlank(eventMeshIPPort)) { // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105 - eventMeshIPPort = "127.0.0.1:10105"; + eventMeshIPPort = ExampleConstants.DEFAULT_EVENTMESH_IP_PORT; } EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder() .liteEventMeshAddr(eventMeshIPPort) - .producerGroup("EventMeshTest-producerGroup") + .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP) .env("env") .idc("idc") .ip(IPUtils.getLocalAddress()) @@ -71,7 +73,7 @@ public static void main(String[] args) throws Exception { EventMeshMessage rsp = eventMeshHttpProducer.request(eventMeshMessage, 10000); if (logger.isDebugEnabled()) { - logger.debug("sendmsg : {}, return : {}, cost:{}ms", eventMeshMessage.getContent(), rsp.getContent(), + logger.debug("sendmsg: {}, return: {}, cost:{} ms", eventMeshMessage.getContent(), rsp.getContent(), System.currentTimeMillis() - startTime); } } catch (Exception e) { diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java index 58b5b632da..e0708a3419 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java @@ -53,14 +53,14 @@ public class SubController { @RequestMapping(value = "/test", method = RequestMethod.POST) public String subTest(HttpServletRequest request) { String content = request.getParameter("content"); - log.info("=======receive message======= {}", content); + log.info("receive message: {}", content); Map contentMap = JsonUtils.deserialize(content, HashMap.class); if (StringUtils.equals(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME, contentMap.get(ProtocolKey.PROTOCOL_TYPE))) { CloudEvent event = EventFormatProvider.getInstance() .resolveFormat(JsonFormat.CONTENT_TYPE) .deserialize(content.getBytes(StandardCharsets.UTF_8)); String data = new String(event.getData().toBytes(), StandardCharsets.UTF_8); - log.info("=======receive data======= {}", data); + log.info("receive data: {}", data); } subService.consumeMessage(content); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index fd6f085ce3..bb1eaaae11 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -54,7 +54,7 @@ public class SubService implements InitializingBean { final Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME); final List topicList = Lists.newArrayList( - new SubscriptionItem("TEST-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC) + new SubscriptionItem(ExampleConstants.EVENTMESH_HTTP_SYNC_TEST_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC) ); final String localIp = IPUtils.getLocalAddress(); final String localPort = properties.getProperty("server.port"); @@ -74,7 +74,7 @@ public void afterPropertiesSet() throws Exception { final String eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort; EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder() .liteEventMeshAddr(eventMeshIPPort) - .consumerGroup("EventMeshTest-consumerGroup") + .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP) .env(env) .idc(idc) .ip(IPUtils.getLocalAddress()) @@ -122,8 +122,8 @@ public void cleanup() { * Count the message already consumed */ public void consumeMessage(String msg) { - logger.info("consume message {}", msg); + logger.info("consume message: {}", msg); countDownLatch.countDown(); - logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount()); + logger.info("remaining number: {} of messages to be consumed", countDownLatch.getCount()); } } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestCaseTopicSet.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestCaseTopicSet.java deleted file mode 100644 index 7fce2cab18..0000000000 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestCaseTopicSet.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.tcp.common; - -/** - * Testcase set - */ -public class EventMeshTestCaseTopicSet { - - public static final String TOPIC_PRX_WQ2ClientBroadCast = "TEST-TOPIC-TCP-BROADCAST"; - - public static final String TOPIC_PRX_SyncSubscribeTest = "TEST-TOPIC-TCP-SYNC"; - - public static final String TOPIC_PRX_WQ2ClientUniCast = "TEST-TOPIC-TCP-ASYNC"; - -} diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java index dbb2729bd1..db3182a446 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java @@ -18,12 +18,10 @@ package org.apache.eventmesh.tcp.common; import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER; -import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_SyncSubscribeTest; -import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientBroadCast; -import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.client.tcp.common.MessageUtils; +import org.apache.eventmesh.common.ExampleConstants; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Header; @@ -42,6 +40,7 @@ import io.cloudevents.core.builder.CloudEventBuilder; public class EventMeshTestUtils { + private static final int seqLength = 10; // generate pub-client @@ -117,7 +116,7 @@ public static Package rrResponse(EventMeshMessage request) { public static EventMeshMessage generateSyncRRMqMsg() { EventMeshMessage mqMsg = new EventMeshMessage(); - mqMsg.setTopic(TOPIC_PRX_SyncSubscribeTest); + mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC); mqMsg.getProperties().put("msgtype", "persistent"); mqMsg.getProperties().put("ttl", "300000"); mqMsg.getProperties().put("keys", generateRandomString(16)); @@ -128,7 +127,7 @@ public static EventMeshMessage generateSyncRRMqMsg() { private static EventMeshMessage generateAsyncRRMqMsg() { EventMeshMessage mqMsg = new EventMeshMessage(); - mqMsg.setTopic(TOPIC_PRX_SyncSubscribeTest); + mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC); mqMsg.getProperties().put("replyto", "localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI"); mqMsg.getProperties().put("ttl", "300000"); mqMsg.getProperties().put("propertymessagereplyto", "notnull"); @@ -138,7 +137,7 @@ private static EventMeshMessage generateAsyncRRMqMsg() { public static EventMeshMessage generateAsyncEventMqMsg() { EventMeshMessage mqMsg = new EventMeshMessage(); - mqMsg.setTopic(TOPIC_PRX_WQ2ClientUniCast); + mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC); mqMsg.getProperties().put("replyto", "localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI"); mqMsg.getProperties().put("ttl", "30000"); mqMsg.getProperties().put("propertymessagereplyto", "notnull"); @@ -148,7 +147,7 @@ public static EventMeshMessage generateAsyncEventMqMsg() { public static EventMeshMessage generateBroadcastMqMsg() { EventMeshMessage mqMsg = new EventMeshMessage(); - mqMsg.setTopic(TOPIC_PRX_WQ2ClientBroadCast); + mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC); mqMsg.getProperties().put("replyto", "localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI"); mqMsg.getProperties().put("ttl", "30000"); mqMsg.getProperties().put("propertymessagereplyto", "notnull"); @@ -170,7 +169,7 @@ public static CloudEvent generateCloudEventV1Async() { CloudEvent event = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) - .withSubject(TOPIC_PRX_WQ2ClientUniCast) + .withSubject(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC) .withSource(URI.create("/")) .withDataContentType("application/cloudevents+json") .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) @@ -186,9 +185,9 @@ public static CloudEvent generateCloudEventV1SyncRR() { CloudEvent event = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) - .withSubject(TOPIC_PRX_SyncSubscribeTest) + .withSubject(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC) .withSource(URI.create("/")) - .withDataContentType("application/cloudevents+json") + .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE) .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8)) .withExtension("ttl", "30000") diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java index 636e6848d8..3cca2341c0 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java @@ -56,7 +56,7 @@ public static void main(String[] args) throws Exception { for (int i = 0; i < 2; i++) { CloudEvent event = EventMeshTestUtils.generateCloudEventV1Async(); - logger.info("begin send async msg[{}]==================={}", i, event); + logger.info("begin send async msg[{}]: {}", i, event); client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); Thread.sleep(1000); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/SyncRequest.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/SyncRequest.java index ee927bdcc8..2dceddfcf3 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/SyncRequest.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/SyncRequest.java @@ -57,12 +57,12 @@ public static void main(String[] args) throws Exception { client.init(); CloudEvent event = EventMeshTestUtils.generateCloudEventV1SyncRR(); - log.info("begin send rr msg=================={}", event); + log.info("begin send rr msg: {}", event); Package response = client.rr(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); CloudEvent replyEvent = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE) .deserialize(response.getBody().toString().getBytes(StandardCharsets.UTF_8)); String content = new String(replyEvent.getData().toBytes(), StandardCharsets.UTF_8); - log.info("receive rr reply==================={}|{}", response, content); + log.info("receive rr reply: {}|{}", response, content); } catch (Exception e) { log.warn("SyncRequest failed", e); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java index 33003604af..5a61c2d317 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java @@ -56,7 +56,7 @@ public static void main(String[] args) throws Exception { for (int i = 0; i < 5; i++) { EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateAsyncEventMqMsg(); - logger.info("begin send async msg[{}]==================={}", i, eventMeshMessage); + logger.info("begin send async msg[{}]: {}", i, eventMeshMessage); client.publish(eventMeshMessage, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); Thread.sleep(1000); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java index bfeef20afd..b84701b3a0 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java @@ -51,7 +51,7 @@ public static void main(String[] args) throws Exception { client.init(); EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateBroadcastMqMsg(); - logger.info("begin send broadcast msg============={}", eventMeshMessage); + logger.info("begin send broadcast msg: {}", eventMeshMessage); client.broadcast(eventMeshMessage, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); Thread.sleep(2000); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java index 4bbd81524a..582c6f7ca9 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java @@ -53,9 +53,9 @@ public static void main(String[] args) throws Exception { client.init(); EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateSyncRRMqMsg(); - log.info("begin send rr msg=================={}", eventMeshMessage); + log.info("begin send rr msg: {}", eventMeshMessage); Package response = client.rr(eventMeshMessage, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - log.info("receive rr reply==================={}", response); + log.info("receive rr reply: {}", response); } catch (Exception e) { log.warn("SyncRequest failed", e); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java index 116056699e..a4f46a606c 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java @@ -25,7 +25,6 @@ import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.UserAgent; -import org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet; import org.apache.eventmesh.tcp.common.EventMeshTestUtils; import org.apache.eventmesh.util.Utils; @@ -58,7 +57,7 @@ public static void main(String[] args) throws Exception { client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class); client.init(); - client.subscribe(EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); + client.subscribe(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); client.registerSubBusiHandler(handler); client.listen(); @@ -70,7 +69,7 @@ public static void main(String[] args) throws Exception { @Override public Optional handle(CloudEvent msg) { String content = new String(msg.getData().toBytes(), StandardCharsets.UTF_8); - log.info("receive async msg====================={}|{}", msg, content); + log.info("receive async msg: {}|{}", msg, content); return Optional.empty(); } } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/SyncResponse.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/SyncResponse.java index 42883a0372..8314b59926 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/SyncResponse.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/SyncResponse.java @@ -58,7 +58,7 @@ public static void main(String[] args) throws Exception { .createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class); client.init(); - client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC); + client.subscribe(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.SYNC); // Synchronize RR messages client.registerSubBusiHandler(handler); @@ -72,7 +72,7 @@ public static void main(String[] args) throws Exception { @Override public Optional handle(CloudEvent event) { String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); - log.info("receive sync rr msg================{}|{}", event, content); + log.info("receive sync rr msg: {}|{}", event, content); return Optional.of(event); } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java index a3e3c79dd1..8a3c325ddb 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java @@ -26,7 +26,6 @@ import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.UserAgent; -import org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet; import org.apache.eventmesh.tcp.common.EventMeshTestUtils; import org.apache.eventmesh.util.Utils; @@ -57,17 +56,12 @@ public static void main(String[] args) throws Exception { EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class); client.init(); - client.subscribe(EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast, SubscriptionMode.CLUSTERING, + client.subscribe(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); client.registerSubBusiHandler(handler); client.listen(); - //client.unsubscribe(); - - // release resource and close client - // client.close(); - } catch (Exception e) { log.warn("AsyncSubscribe failed", e); } @@ -75,7 +69,7 @@ public static void main(String[] args) throws Exception { @Override public Optional handle(EventMeshMessage msg) { - log.info("receive async msg====================={}", msg); + log.info("receive async msg: {}", msg); return Optional.empty(); } } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java index e2929ea632..2a26947e5a 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java @@ -53,7 +53,7 @@ public static void main(String[] args) throws Exception { eventMeshTcpClientConfig, EventMeshMessage.class)) { client.init(); - client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC); + client.subscribe(ExampleConstants.EVENTMESH_TCP_BROADCAST_TEST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC); client.registerSubBusiHandler(handler); client.listen(); @@ -65,7 +65,7 @@ public static void main(String[] args) throws Exception { @Override public Optional handle(EventMeshMessage msg) { - log.info("receive broadcast msg==============={}", msg); + log.info("receive broadcast msg: {}", msg); return Optional.empty(); } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java index 1b8f4fe802..34517ad270 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java @@ -56,7 +56,7 @@ public static void main(String[] args) throws Exception { .createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class); client.init(); - client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC); + client.subscribe(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.SYNC); // Synchronize RR messages client.registerSubBusiHandler(handler); @@ -69,7 +69,7 @@ public static void main(String[] args) throws Exception { @Override public Optional handle(EventMeshMessage msg) { - log.info("receive sync rr msg================{}", msg); + log.info("receive sync rr msg: {}", msg); return Optional.ofNullable(msg); } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java index cb121dad8a..003ac5f610 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java @@ -17,6 +17,8 @@ package org.apache.eventmesh.util; +import org.apache.eventmesh.common.ExampleConstants; + import org.apache.commons.lang3.SystemUtils; import java.io.InputStream; @@ -76,7 +78,7 @@ private static String getLinuxLocalIp() { } } } catch (SocketException ex) { - ip = "127.0.0.1"; + ip = ExampleConstants.DEFAULT_EVENTMESH_IP; ex.printStackTrace(); } return ip;