Skip to content

Commit

Permalink
[ISSUE apache#397]Remove subscription session failed error (apache#398)
Browse files Browse the repository at this point in the history
* [ISSUE apache#325]Update gradle configuration for publishing package to maven repository

* update build.gradle

* update build.gradle and gradle.properties

* update build.gradle and gradle.properties for publish to maven repository

* * update gradle version for instructions
* fix: dist task exception

* [ISSUE apache#329]Missing Log4j dependency

* update eventmesh-runtime.png

* support unsubscribe topics while delconsumer in http mode

* [ISSUE apache#397]Remove subscription session failed error

* [ISSUE apache#397]Remove subscription session failed error
close apache#397
  • Loading branch information
xwm1992 authored and jjz921024 committed Jul 25, 2021
1 parent 741cdfb commit aedfd94
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public synchronized void initClientGroupPersistentConsumer() throws Exception {
keyValue.put("isBroadcast", "false");
keyValue.put("consumerGroup", consumerGroup);
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId,"SUB", eventMeshTCPConfiguration.eventMeshCluster));
keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));

persistentMsgConsumer.init(keyValue);
// persistentMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
Expand Down Expand Up @@ -458,7 +458,7 @@ public synchronized void initClientGroupBroadcastConsumer() throws Exception {
keyValue.put("isBroadcast", "true");
keyValue.put("consumerGroup", consumerGroup);
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId,"SUB", eventMeshTCPConfiguration.eventMeshCluster));
keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
broadCastMsgConsumer.init(keyValue);
// broadCastMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
// @Override
Expand Down Expand Up @@ -536,7 +536,7 @@ public void consume(Message message, AsyncConsumeContext context) {
message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);

EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
if (CollectionUtils.isEmpty(groupConsumerSessions)) {
logger.warn("found no session to downstream broadcast msg");
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
Expand Down Expand Up @@ -586,7 +586,7 @@ public void consume(Message message, AsyncConsumeContext context) {
message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);

EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
Session session = downstreamDispatchStrategy.select(consumerGroup, topic, groupConsumerSessions);
String bizSeqNo = EventMeshUtil.getMessageBizSeq(message);
if (session == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,10 @@ public boolean isAvailable(String topic) {
return true;
}

@Override
public int hashCode() {
int code = 37 + (client != null ? client.hashCode() : 0) + (context != null ? context.hashCode() : 0)
+ (sessionState != null ? sessionState.hashCode() : 0);
return code;
}
// @Override
// public int hashCode() {
// int code = 37 + (client != null ? client.hashCode() : 0) + (context != null ? context.hashCode() : 0)
// + (sessionState != null ? sessionState.hashCode() : 0);
// return code;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static Package responseToClientAck(Package in) {

public static UserAgent generateSubClient(UserAgent agent) {
UserAgent user = new UserAgent();
user.setEnv(agent.getEnv());
user.setHost(agent.getHost());
user.setPassword(agent.getPassword());
user.setUsername(agent.getUsername());
Expand All @@ -116,6 +117,7 @@ public static UserAgent generateSubClient(UserAgent agent) {

public static UserAgent generatePubClient(UserAgent agent) {
UserAgent user = new UserAgent();
user.setEnv(agent.getEnv());
user.setHost(agent.getHost());
user.setPassword(agent.getPassword());
user.setUsername(agent.getUsername());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class EventMeshTestUtils {

public static UserAgent generateClient1() {
UserAgent user = new UserAgent();
user.setEnv("test");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
Expand All @@ -52,6 +53,7 @@ public static UserAgent generateClient1() {

public static UserAgent generateClient2() {
UserAgent user = new UserAgent();
user.setEnv("test");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
Expand Down

0 comments on commit aedfd94

Please sign in to comment.