Skip to content

Commit

Permalink
Merge pull request #155 from cmgyqjj/feature_Support_reconnect_backof…
Browse files Browse the repository at this point in the history
…f_interface

feature:Support_reconnect_backoff_interface
  • Loading branch information
crossoverJie authored Sep 26, 2024
2 parents 29dcbea + 7197188 commit 26582a9
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.crossoverjie.cim.client.sdk.io.MessageListener;
import com.crossoverjie.cim.client.sdk.io.ReconnectCheck;
import java.util.concurrent.ThreadPoolExecutor;

import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy;
import okhttp3.OkHttpClient;

/**
Expand All @@ -20,4 +22,5 @@ public interface ClientBuilder {
ClientBuilder okHttpClient(OkHttpClient okHttpClient);
ClientBuilder messageListener(MessageListener messageListener);
ClientBuilder callbackThreadPool(ThreadPoolExecutor callbackThreadPool);
ClientBuilder backoffStrategy(BackoffStrategy backoffStrategy);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@
import com.crossoverjie.cim.client.sdk.Event;
import com.crossoverjie.cim.client.sdk.io.MessageListener;
import com.crossoverjie.cim.client.sdk.io.ReconnectCheck;
import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.crossoverjie.cim.common.util.StringUtil;

import java.lang.reflect.Constructor;
import java.util.ServiceLoader;
import java.util.concurrent.ThreadPoolExecutor;
import okhttp3.OkHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientBuilderImpl implements ClientBuilder {

Expand Down Expand Up @@ -79,4 +86,10 @@ public ClientBuilder callbackThreadPool(ThreadPoolExecutor callbackThreadPool) {
this.conf.setCallbackThreadPool(callbackThreadPool);
return this;
}

@Override
public ClientBuilder backoffStrategy(BackoffStrategy backoffStrategy) {
this.conf.setBackoffStrategy(backoffStrategy);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.crossoverjie.cim.client.sdk.impl;

import com.crossoverjie.cim.client.sdk.Event;
import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy;
import com.crossoverjie.cim.client.sdk.io.MessageListener;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.crossoverjie.cim.client.sdk.io.ReconnectCheck;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
Expand Down Expand Up @@ -46,4 +48,7 @@ public static class Auth{

@JsonIgnore
private ReconnectCheck reconnectCheck = (client) -> true;

@JsonIgnore
private BackoffStrategy backoffStrategy = new RandomBackoff();
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ public void reconnect() throws Exception {

this.conf.getEvent().info("cim trigger reconnecting....");

// TODO: 2024/9/13 need a backoff interface
int random = (int) (Math.random() * 7 + 3);
TimeUnit.SECONDS.sleep(random);
this.conf.getBackoffStrategy().runBackoff();

// don't set State ready, because when connect success, the State will be set to ready automate.
connectServer(v -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.crossoverjie.cim.client.sdk.io.backoff;

import java.util.concurrent.TimeUnit;

/**
* @author:qjj
* @create: 2024-09-21 12:16
* @Description: backoff strategy interface
*/

public interface BackoffStrategy {
/**
* @return the backoff time in milliseconds
*/
long nextBackoff();

/**
* Run the backoff strategy
* @throws InterruptedException
*/
default void runBackoff() throws InterruptedException {
TimeUnit.SECONDS.sleep(nextBackoff());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.crossoverjie.cim.client.sdk.io.backoff;

import java.util.concurrent.TimeUnit;

/**
* @author:qjj
* @create: 2024-09-21 12:22
* @Description: random backoff strategy
*/

public class RandomBackoff implements BackoffStrategy {

@Override
public long nextBackoff() {
int random = (int) (Math.random() * 7 + 3);
return random;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.crossoverjie.cim.client.sdk;

import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.crossoverjie.cim.client.sdk.route.AbstractRouteBaseTest;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.route.api.vo.req.P2PReqVO;
Expand Down Expand Up @@ -224,11 +225,13 @@ public void testReconnect() throws Exception {
.userName(zs)
.userId(zsId)
.build();
var backoffStrategy = new RandomBackoff();

@Cleanup
Client client1 = Client.builder()
.auth(auth1)
.routeUrl(routeUrl)
.backoffStrategy(backoffStrategy)
.build();
TimeUnit.SECONDS.sleep(3);
ClientState.State state = client1.getState();
Expand All @@ -242,6 +245,7 @@ public void testReconnect() throws Exception {
.auth(auth2)
.routeUrl(routeUrl)
.messageListener((client, message) -> client2Receive.set(message))
.backoffStrategy(backoffStrategy)
.build();
TimeUnit.SECONDS.sleep(3);
ClientState.State state2 = client2.getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.crossoverjie.cim.client.sdk.Client;
import com.crossoverjie.cim.client.sdk.Event;
import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.crossoverjie.cim.client.service.MsgLogger;
import com.crossoverjie.cim.client.service.ShutDownSign;
import com.crossoverjie.cim.client.service.impl.MsgCallBackListener;
Expand Down Expand Up @@ -61,6 +62,7 @@ public Client buildClient(@Qualifier("callBackThreadPool") ThreadPoolExecutor ca
.okHttpClient(okHttpClient)
.messageListener(new MsgCallBackListener(msgLogger))
.callbackThreadPool(callbackThreadPool)
.backoffStrategy(new RandomBackoff())
.build();
}

Expand Down
74 changes: 37 additions & 37 deletions cim-client/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -1,37 +1,37 @@
spring:
application:
name: cim-client

# web port
server:
port: 8082

logging:
level:
root: error

# enable swagger
springdoc:
swagger-ui:
enabled: true

# log path
cim:
msg:
logger:
path: /opt/logs/cim/
route:
url: http://localhost:8083 # route url suggested that this is Nginx address
user: # cim userId and userName
id: 1725714450795
userName: cj4
callback:
thread:
queue:
size: 2
pool:
size: 2
heartbeat:
time: 60 # cim heartbeat time (seconds)
reconnect:
count: 3
spring:
application:
name: cim-client

# web port
server:
port: 8082

logging:
level:
root: error

# enable swagger
springdoc:
swagger-ui:
enabled: true

# log path
cim:
msg:
logger:
path: /opt/logs/cim/
route:
url: http://localhost:8083 # route url suggested that this is Nginx address
user: # cim userId and userName
id: 1725714450795
userName: cj4
callback:
thread:
queue:
size: 2
pool:
size: 2
heartbeat:
time: 60 # cim heartbeat time (seconds)
reconnect:
count: 3

0 comments on commit 26582a9

Please sign in to comment.