Skip to content

Commit

Permalink
Add tcp protocol client
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Nov 21, 2021
1 parent 82d8f1f commit 059eb37
Show file tree
Hide file tree
Showing 49 changed files with 1,163 additions and 645 deletions.
6 changes: 3 additions & 3 deletions docs/cn/features/https.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ config env varible

```
//创建producer
LiteClientConfig liteClientConfig = new liteClientConfig();
LiteClientConfig eventMeshHttpClientConfig = new eventMeshHttpClientConfig();
...
//设置开启TLS
liteClientConfig.setUseTls(true);
LiteProducer producer = new LiteProducer(liteClientConfig);
eventMeshHttpClientConfig.setUseTls(true);
LiteProducer producer = new LiteProducer(eventMeshHttpClientConfig);
//配置环境变量
Expand Down
6 changes: 3 additions & 3 deletions docs/en/features/https.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ config env varible

```
// create producer
LiteClientConfig liteClientConfig = new liteClientConfig();
LiteClientConfig eventMeshHttpClientConfig = new eventMeshHttpClientConfig();
...
// enable TLS
liteClientConfig.setUseTls(true);
LiteProducer producer = new LiteProducer(liteClientConfig);
eventMeshHttpClientConfig.setUseTls(true);
LiteProducer producer = new LiteProducer(eventMeshHttpClientConfig);
config env varible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,189 +17,27 @@

package org.apache.eventmesh.common.protocol.tcp;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class UserAgent {

private String env;
private String subsystem;
private String path;
private int pid;
private int pid;
private String host;
private int port;
private int port;
private String version;
private String username;
private String password;
private String idc;
private String producerGroup;
private String consumerGroup;
private String purpose;
private int unack = 0;

public UserAgent() {
}

public String getProducerGroup() {
return producerGroup;
}

public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}

public String getConsumerGroup() {
return consumerGroup;
}

public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}

public String getEnv() {
return env;
}

public void setEnv(String env) {
this.env = env;
}

public String getPurpose() {
return purpose;
}

public void setPurpose(String purpose) {
this.purpose = purpose;
}

public String getSubsystem() {
return subsystem;
}

public void setSubsystem(String subsystem) {
this.subsystem = subsystem;
}

public String getPath() {
return path;
}

public void setPath(String path) {
this.path = path;
}

public int getPid() {
return pid;
}

public void setPid(int pid) {
this.pid = pid;
}

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public String getVersion() {
return version;
}

public void setVersion(String version) {
this.version = version;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getIdc() {
return idc;
}

public void setIdc(String idc) {
this.idc = idc;
}

public int getUnack() {
return unack;
}

public void setUnack(int unack) {
this.unack = unack;
}

@Override
public String toString() {
return "UserAgent{" +
"env='" + env + '\'' +
"subsystem='" + subsystem + '\'' +
", path='" + path + '\'' +
", pid=" + pid +
", host='" + host + '\'' +
", port=" + port +
", version='" + version + '\'' +
", idc='" + idc + '\'' +
", purpose='" + purpose + '\'' +
", unack='" + unack + '\'' +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

UserAgent userAgent = (UserAgent) o;

if (pid != userAgent.pid) return false;
if (port != userAgent.port) return false;
if (unack != userAgent.unack) return false;
if (subsystem != null ? !subsystem.equals(userAgent.subsystem) : userAgent.subsystem != null) return false;
if (path != null ? !path.equals(userAgent.path) : userAgent.path != null) return false;
if (host != null ? !host.equals(userAgent.host) : userAgent.host != null) return false;
if (purpose != null ? !purpose.equals(userAgent.purpose) : userAgent.purpose != null) return false;
if (version != null ? !version.equals(userAgent.version) : userAgent.version != null) return false;
if (username != null ? !username.equals(userAgent.username) : userAgent.username != null) return false;
if (password != null ? !password.equals(userAgent.password) : userAgent.password != null) return false;
if (env != null ? !env.equals(userAgent.env) : userAgent.env != null) return false;
return idc != null ? idc.equals(userAgent.idc) : userAgent.idc == null;
}
@Builder.Default
private int unack = 0;

@Override
public int hashCode() {
int result = subsystem != null ? subsystem.hashCode() : 0;
result = 31 * result + (path != null ? path.hashCode() : 0);
result = 31 * result + pid;
result = 31 * result + (host != null ? host.hashCode() : 0);
result = 31 * result + (purpose != null ? purpose.hashCode() : 0);
result = 31 * result + port;
result = 31 * result + (version != null ? version.hashCode() : 0);
result = 31 * result + (username != null ? username.hashCode() : 0);
result = 31 * result + (password != null ? password.hashCode() : 0);
result = 31 * result + (idc != null ? idc.hashCode() : 0);
result = 31 * result + (env != null ? env.hashCode() : 0);
result = 31 * result + unack;
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.eventmesh.common.utils;

import org.apache.logging.log4j.util.ProcessIdUtil;

import java.util.concurrent.ThreadLocalRandom;

public class ThreadUtils {

private static long currentPID = -1;
private static volatile long currentPID = -1;

public static void randomSleep(int min, int max) throws Exception {
// nextInt is normally exclusive of the top value, so add 1 to make it inclusive
Expand All @@ -35,22 +37,18 @@ public static void randomSleep(int max) throws Exception {
}

/**
* get current process id only once.
* get current process id.
*
* @return process id
*/
public static long getPID() {
if (currentPID >= 0) {
return currentPID;
}
String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
if (processName != null && processName.length() > 0) {
try {
currentPID = Long.parseLong(processName.split("@")[0]);
} catch (Exception e) {
return 0;
if (currentPID == -1) {
synchronized (ThreadUtils.class) {
if (currentPID == -1) {
currentPID = Long.parseLong(ProcessIdUtil.getProcessId());
}
}
}
return 0;
return currentPID;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.eventmesh.http.demo;

import org.apache.eventmesh.client.http.conf.LiteClientConfig;
import org.apache.eventmesh.client.http.producer.LiteProducer;
import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.EventMeshMessage;
import org.apache.eventmesh.common.utils.IPUtils;
Expand Down Expand Up @@ -55,7 +55,7 @@ public static void main(String[] args) throws Exception {

final String topic = "TEST-TOPIC-HTTP-ASYNC";

LiteClientConfig eventMeshClientConfig = LiteClientConfig.builder()
EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
.liteEventMeshAddr(eventMeshIPPort)
.producerGroup("EventMeshTest-producerGroup")
.env("env")
Expand All @@ -64,7 +64,7 @@ public static void main(String[] args) throws Exception {
.sys("1234")
.pid(String.valueOf(ThreadUtils.getPID())).build();

try (LiteProducer liteProducer = new LiteProducer(eventMeshClientConfig);) {
try (EventMeshHttpProducer eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig);) {
for (int i = 0; i < messageSize; i++) {
EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
.bizSeqNo(RandomStringUtils.generateNum(30))
Expand All @@ -73,7 +73,7 @@ public static void main(String[] args) throws Exception {
.uniqueId(RandomStringUtils.generateNum(30))
.build()
.addProp(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000));
liteProducer.publish(eventMeshMessage);
eventMeshHttpProducer.publish(eventMeshMessage);
}
Thread.sleep(30000);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.eventmesh.http.demo;

import org.apache.eventmesh.client.http.conf.LiteClientConfig;
import org.apache.eventmesh.client.http.producer.LiteProducer;
import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
import org.apache.eventmesh.client.http.producer.RRCallback;
import org.apache.eventmesh.common.EventMeshMessage;
import org.apache.eventmesh.common.utils.IPUtils;
Expand All @@ -44,7 +44,7 @@ public static void main(String[] args) throws Exception {
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final String eventMeshHttpPort = properties.getProperty("eventmesh.http.port");

LiteProducer liteProducer = null;
EventMeshHttpProducer eventMeshHttpProducer = null;
try {
String eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort;
final String topic = "TEST-TOPIC-TCP-ASYNC";
Expand All @@ -53,7 +53,7 @@ public static void main(String[] args) throws Exception {
eventMeshIPPort = "127.0.0.1:10105";
}

LiteClientConfig eventMeshClientConfig = LiteClientConfig.builder()
EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
.liteEventMeshAddr(eventMeshIPPort)
.producerGroup("EventMeshTest-producerGroup")
.env("env")
Expand All @@ -62,7 +62,7 @@ public static void main(String[] args) throws Exception {
.sys("1234")
.pid(String.valueOf(ThreadUtils.getPID())).build();

liteProducer = new LiteProducer(eventMeshClientConfig);
eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig);

final long startTime = System.currentTimeMillis();
final EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
Expand All @@ -71,7 +71,7 @@ public static void main(String[] args) throws Exception {
.topic(topic)
.uniqueId(RandomStringUtils.generateNum(30)).build();

liteProducer.request(eventMeshMessage, new RRCallback<EventMeshMessage>() {
eventMeshHttpProducer.request(eventMeshMessage, new RRCallback<EventMeshMessage>() {
@Override
public void onSuccess(EventMeshMessage o) {
log.debug("sendmsg : {}, return : {}, cost:{}ms", eventMeshMessage.getContent(), o.getContent(),
Expand All @@ -90,7 +90,7 @@ public void onException(Throwable e) {
}

Thread.sleep(30000);
try (final LiteProducer ignore = liteProducer) {
try (final EventMeshHttpProducer ignore = eventMeshHttpProducer) {
// close producer
} catch (Exception e1) {
log.warn("producer shutdown exception", e1);
Expand Down
Loading

0 comments on commit 059eb37

Please sign in to comment.