Skip to content

Commit

Permalink
Java sdk update cloudevents pub/sub (#620)
Browse files Browse the repository at this point in the history
1.cloudevents protocol tcp pub/sub for sdk
  • Loading branch information
xwm1992 authored Nov 30, 2021
1 parent b46026d commit b50c42a
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

package org.apache.eventmesh.tcp.common;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.v1.CloudEventV1;
import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER;
import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_SyncSubscribeTest;
import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientBroadCast;
import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast;

import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

public class EventMeshTestUtils {
Expand Down Expand Up @@ -155,4 +162,16 @@ private static String generateRandomString(int length) {
}
return builder.toString();
}

public static CloudEvent generateCloudEventV1() {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(TOPIC_PRX_WQ2ClientBroadCast)
.withSource(URI.create("/"))
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData("testAsyncMessage".getBytes(StandardCharsets.UTF_8))
.withExtension("ttl", "30000")
.build();
return event;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.tcp.demo.pub.cloudevents;

import io.cloudevents.CloudEvent;

import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;

import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncPublish {

public static Logger logger = LoggerFactory.getLogger(AsyncPublish.class);

private static EventMeshTCPClient<CloudEvent> client;

public static AsyncPublish handler = new AsyncPublish();

public static void main(String[] agrs) throws Exception {
Properties properties = Utils.readPropertiesFile("application.properties");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port"));
try {
UserAgent userAgent = EventMeshTestUtils.generateClient1();
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
.host(eventMeshIp)
.port(eventMeshTcpPort)
.userAgent(userAgent)
.build();
client =
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
client.init();
client.heartbeat();

for (int i = 0; i < 5; i++) {
CloudEvent event = EventMeshTestUtils.generateCloudEventV1();

logger.info("begin send async msg[{}]==================={}", i, event);
client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);

Thread.sleep(1000);
}
Thread.sleep(2000);
} catch (Exception e) {
logger.warn("AsyncPublish failed", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.tcp.demo.sub.cloudevents;

import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;

import java.util.Properties;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class AsyncSubscribe implements ReceiveMsgHook<CloudEvent> {

public static AsyncSubscribe handler = new AsyncSubscribe();

private static EventMeshTCPClient client;

public static void main(String[] agrs) throws Exception {
Properties properties = Utils.readPropertiesFile("application.properties");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port"));
UserAgent userAgent = EventMeshTestUtils.generateClient2();
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
.host(eventMeshIp)
.port(eventMeshTcpPort)
.userAgent(userAgent)
.build();
try {
client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);

client.listen();

//client.unsubscribe();

// release resource and close client
// client.close();

} catch (Exception e) {
log.warn("AsyncSubscribe failed", e);
}
}

@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
CloudEvent event = convertToProtocolMessage(msg);
log.info("receive async msg====================={}", event);
}

@Override
public CloudEvent convertToProtocolMessage(Package pkg) {
return CloudEventBuilder.from((CloudEvent) pkg.getBody()).build();
}
}

0 comments on commit b50c42a

Please sign in to comment.