Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventMeshTCPClient in sdk #611

Merged
merged 1 commit into from
Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,25 @@

package org.apache.eventmesh.tcp.demo.pub.eventmeshmessage;

import java.util.Properties;

import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig;
import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPPubClient;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;

import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncPublish {

public static Logger logger = LoggerFactory.getLogger(AsyncPublish.class);

private static EventMeshMessageTCPPubClient client;
private static EventMeshTCPClient<EventMeshMessage> client;

public static AsyncPublish handler = new AsyncPublish();

Expand All @@ -43,12 +45,13 @@ public static void main(String[] agrs) throws Exception {
final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port"));
try {
UserAgent userAgent = EventMeshTestUtils.generateClient1();
EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder()
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
.host(eventMeshIp)
.port(eventMeshTcpPort)
.userAgent(userAgent)
.build();
client = new EventMeshMessageTCPPubClient(eventMeshTcpClientConfig);
client =
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class);
client.init();
client.heartbeat();

Expand All @@ -60,10 +63,8 @@ public static void main(String[] agrs) throws Exception {

Thread.sleep(1000);
}

client.listen();
Thread.sleep(2000);
// release resource and close client
// client.close();
} catch (Exception e) {
logger.warn("AsyncPublish failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.eventmesh.tcp.demo.pub.eventmeshmessage;

import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig;
import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPPubClient;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
Expand All @@ -39,12 +40,13 @@ public static void main(String[] agrs) throws Exception {
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port"));
UserAgent userAgent = EventMeshTestUtils.generateClient1();
EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder()
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
.host(eventMeshIp)
.port(eventMeshTcpPort)
.userAgent(userAgent)
.build();
try (final EventMeshMessageTCPPubClient client = new EventMeshMessageTCPPubClient(eventMeshTcpClientConfig)) {
try (final EventMeshTCPClient<EventMeshMessage> client =
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class)) {
client.init();
client.heartbeat();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.eventmesh.tcp.demo.pub.eventmeshmessage;

import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig;
import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPPubClient;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
Expand All @@ -30,16 +31,15 @@
@Slf4j
public class SyncRequest {

private static EventMeshMessageTCPPubClient client;

public static void main(String[] agrs) throws Exception {
UserAgent userAgent = EventMeshTestUtils.generateClient1();
EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder()
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
.host("127.0.0.1")
.port(10000)
.userAgent(userAgent)
.build();
try (EventMeshMessageTCPPubClient client = new EventMeshMessageTCPPubClient(eventMeshTcpClientConfig)) {
try (EventMeshTCPClient<EventMeshMessage> client = EventMeshTCPClientFactory.createEventMeshTCPClient(
eventMeshTcpClientConfig, EventMeshMessage.class)) {
client.init();
client.heartbeat();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.eventmesh.tcp.demo.sub.eventmeshmessage;

import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig;
import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPSubClient;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
Expand All @@ -36,26 +37,25 @@
@Slf4j
public class AsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {

private static EventMeshMessageTCPSubClient client;

public static AsyncSubscribe handler = new AsyncSubscribe();

public static void main(String[] agrs) throws Exception {
Properties properties = Utils.readPropertiesFile("application.properties");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port"));
UserAgent userAgent = EventMeshTestUtils.generateClient2();
EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder()
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
.host(eventMeshIp)
.port(eventMeshTcpPort)
.userAgent(userAgent)
.build();
try (EventMeshMessageTCPSubClient client = new EventMeshMessageTCPSubClient(eventMeshTcpClientConfig)) {
try (EventMeshTCPClient<EventMeshMessage> client = EventMeshTCPClientFactory.createEventMeshTCPClient(
eventMeshTcpClientConfig, EventMeshMessage.class)) {
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerBusiHandler(handler);
client.registerSubBusiHandler(handler);

client.listen();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.eventmesh.tcp.demo.sub.eventmeshmessage;

import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig;
import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPSubClient;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
Expand All @@ -43,17 +44,18 @@ public static void main(String[] agrs) throws Exception {
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port"));
UserAgent userAgent = EventMeshTestUtils.generateClient2();
EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder()
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
.host(eventMeshIp)
.port(eventMeshTcpPort)
.userAgent(userAgent)
.build();
try (EventMeshMessageTCPSubClient client = new EventMeshMessageTCPSubClient(eventMeshTcpClientConfig)) {
try (EventMeshTCPClient<EventMeshMessage> client = EventMeshTCPClientFactory.createEventMeshTCPClient(
eventMeshTcpClientConfig, EventMeshMessage.class)) {
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
client.registerBusiHandler(handler);
client.registerSubBusiHandler(handler);

client.listen();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.eventmesh.tcp.demo.sub.eventmeshmessage;

import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig;
import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPSubClient;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
Expand All @@ -37,18 +38,19 @@ public class SyncResponse implements ReceiveMsgHook<EventMeshMessage> {

public static void main(String[] agrs) throws Exception {
UserAgent userAgent = EventMeshTestUtils.generateClient2();
EventMeshTcpClientConfig eventMeshTcpClientConfig = EventMeshTcpClientConfig.builder()
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
.host("127.0.0.1")
.port(10000)
.userAgent(userAgent)
.build();
try (EventMeshMessageTCPSubClient client = new EventMeshMessageTCPSubClient(eventMeshTcpClientConfig)) {
try (EventMeshTCPClient<EventMeshMessage> client = EventMeshTCPClientFactory
.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class)) {
client.init();
client.heartbeat();

client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
// Synchronize RR messages
client.registerBusiHandler(handler);
client.registerSubBusiHandler(handler);

client.listen();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.client.tcp;

import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;

/**
* EventMesh TCP client, used to sub/pub message by tcp.
* You can use {@link org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory} to create a target client.
*
* @param <ProtocolMessage>
*/
public interface EventMeshTCPClient<ProtocolMessage> extends AutoCloseable {

void init() throws EventMeshException;

Package rr(ProtocolMessage msg, long timeout) throws EventMeshException;

void asyncRR(ProtocolMessage msg, AsyncRRCallback callback, long timeout) throws EventMeshException;

Package publish(ProtocolMessage msg, long timeout) throws EventMeshException;

void broadcast(ProtocolMessage msg, long timeout) throws EventMeshException;

void heartbeat() throws EventMeshException;

void listen() throws EventMeshException;

void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType)
throws EventMeshException;

void unsubscribe() throws EventMeshException;

void registerPubBusiHandler(ReceiveMsgHook<ProtocolMessage> handler) throws EventMeshException;

void registerSubBusiHandler(ReceiveMsgHook<ProtocolMessage> handler) throws EventMeshException;

void close() throws EventMeshException;

EventMeshTCPPubClient<ProtocolMessage> getPubClient();

EventMeshTCPSubClient<ProtocolMessage> getSubClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public interface EventMeshTCPPubClient<ProtocolMessage> extends AutoCloseable {
void reconnect() throws EventMeshException;

// todo: Hide package method, use ProtocolMessage
Package rr(ProtocolMessage msg, long timeout) throws EventMeshException;
Package rr(ProtocolMessage event, long timeout) throws EventMeshException;

void asyncRR(ProtocolMessage msg, AsyncRRCallback callback, long timeout) throws EventMeshException;
void asyncRR(ProtocolMessage event, AsyncRRCallback callback, long timeout) throws EventMeshException;

Package publish(ProtocolMessage cloudEvent, long timeout) throws EventMeshException;
Package publish(ProtocolMessage event, long timeout) throws EventMeshException;

void broadcast(ProtocolMessage cloudEvent, long timeout) throws EventMeshException;
void broadcast(ProtocolMessage event, long timeout) throws EventMeshException;

void registerBusiHandler(ReceiveMsgHook<ProtocolMessage> handler) throws EventMeshException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* <li>{@link org.apache.eventmesh.client.tcp.impl.openmessage.OpenMessageTCPSubClient}</li>
* </ul>
*/
public interface EventMeshTCPSubClient<ProtocolMessage> {
public interface EventMeshTCPSubClient<ProtocolMessage> extends AutoCloseable {

void init() throws EventMeshException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.eventmesh.client.tcp.common;

import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.codec.Codec;

Expand All @@ -42,7 +40,6 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -73,7 +70,7 @@ public abstract class TcpClient implements Closeable {
Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setNameFormat("TCPClientScheduler").setDaemon(true).build());

public TcpClient(EventMeshTcpClientConfig eventMeshTcpClientConfig) {
public TcpClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) {
Preconditions.checkNotNull(eventMeshTcpClientConfig, "EventMeshTcpClientConfig cannot be null");
Preconditions.checkNotNull(eventMeshTcpClientConfig.getHost(), "Host cannot be null");
Preconditions.checkState(eventMeshTcpClientConfig.getPort() > 0, "port is not validated");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.client.tcp.conf;

import org.apache.eventmesh.common.protocol.tcp.UserAgent;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class EventMeshTCPClientConfig {
private String host;
private int port;
private UserAgent userAgent;
}
Loading