Skip to content

Commit

Permalink
[Issue #344] Fixing racing condition issue in SubscribeProcessor and …
Browse files Browse the repository at this point in the history
…UnSubscribeProcessor (#345)

* [Issue #337] Fix HttpSubscriber startup issue

* [Issue #337] test commit

* [Issue #337] revert test commit

* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook

* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook

* [Issue #344] Fixing racing condition issue in SubscribeProcessor and UnSubscribeProcessor

* [Issue #344] Fix import statements

* [Issue #337] Address code review comment for Subscriber Demo App

* [Issue #344] Enhance client registration logic in SubscribeProcessor and UnsubscriberProcessor

* [Issue #344] Minor code clean up in SubscribeProcessor and UnsubscriberProcessor

* [Issue #344] Fix NullPointerException in ConsumerManager occurs during subscribe/unsunscribe iteration testing

* [Issue #344] Fix bugs in subscribe/unsunscribe code path

* [Issue #344] use client.pid instead of client.ip for client comparasion in UnSubscribeProcessor

Co-authored-by: j00441484 <jin.rong.luo@huawei.com>
  • Loading branch information
jinrongluo and j00441484 authored May 19, 2021
1 parent d599eea commit 16b3c62
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,19 @@ public void run() {
public void notifyConsumerManager(String consumerGroup, ConsumerGroupConf latestConsumerGroupConfig,
ConcurrentHashMap<String, ConsumerGroupConf> localConsumerGroupMapping) throws Exception {
ConsumerGroupManager cgm = eventMeshHTTPServer.getConsumerManager().getConsumer(consumerGroup);
if (cgm == null) {
if (latestConsumerGroupConfig == null) {
ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent();
notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.NEW;
notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.DELETE;
notification.consumerGroup = consumerGroup;
notification.consumerGroupConfig = latestConsumerGroupConfig;
eventMeshHTTPServer.getEventBus().post(notification);
return;
}

if (latestConsumerGroupConfig == null) {
if (cgm == null) {
ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent();
notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.DELETE;
notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.NEW;
notification.consumerGroup = consumerGroup;
notification.consumerGroupConfig = latestConsumerGroupConfig;
eventMeshHTTPServer.getEventBus().post(notification);
return;
}
Expand Down Expand Up @@ -217,8 +217,10 @@ public synchronized void addConsumer(String consumerGroup, ConsumerGroupConf con
* restart consumer
*/
public synchronized void restartConsumer(String consumerGroup, ConsumerGroupConf consumerGroupConfig) throws Exception {
ConsumerGroupManager cgm = consumerTable.get(consumerGroup);
cgm.refresh(consumerGroupConfig);
if(consumerTable.containsKey(consumerGroup)) {
ConsumerGroupManager cgm = consumerTable.get(consumerGroup);
cgm.refresh(consumerGroupConfig);
}
}

/**
Expand All @@ -235,8 +237,10 @@ public ConsumerGroupManager getConsumer(String consumerGroup) throws Exception {
* @param consumerGroup
*/
public synchronized void delConsumer(String consumerGroup) throws Exception {
ConsumerGroupManager cgm = consumerTable.remove(consumerGroup);
cgm.shutdown();
if(consumerTable.containsKey(consumerGroup)) {
ConsumerGroupManager cgm = consumerTable.remove(consumerGroup);
cgm.shutdown();
}
}

@Subscribe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import com.alibaba.fastjson.JSONObject;

import io.netty.channel.ChannelHandlerContext;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;
Expand Down Expand Up @@ -108,6 +106,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>

synchronized (eventMeshHTTPServer.localClientInfoMapping) {

registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url);

for (String subTopic : subTopicList) {
List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + subTopic);
Expand Down Expand Up @@ -206,4 +205,41 @@ public boolean rejectRequest() {
return false;
}

private void registerClient(SubscribeRequestHeader subscribeRequestHeader, String consumerGroup,
List<String> topicList, String url) {
for(String topic: topicList) {
Client client = new Client();
client.env = subscribeRequestHeader.getEnv();
client.dcn = subscribeRequestHeader.getDcn();
client.idc = subscribeRequestHeader.getIdc();
client.sys = subscribeRequestHeader.getSys();
client.ip = subscribeRequestHeader.getIp();
client.pid = subscribeRequestHeader.getPid();
client.consumerGroup = consumerGroup;
client.topic = topic;
client.url = url;
client.lastUpTime = new Date();

String groupTopicKey = client.consumerGroup + "@" + client.topic;

if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
List<Client> localClients = eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
boolean isContains = false;
for (Client localClient : localClients) {
if (StringUtils.equals(localClient.url, client.url)) {
isContains = true;
localClient.lastUpTime = client.lastUpTime;
break;
}
}
if (!isContains) {
localClients.add(client);
}
} else {
List<Client> clients = new ArrayList<>();
clients.add(client);
eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, clients);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@
package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.fastjson.JSONObject;

import io.netty.channel.ChannelHandlerContext;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;
Expand All @@ -44,10 +41,8 @@
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupStateEvent;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
Expand Down Expand Up @@ -131,12 +126,15 @@ public void onResponse(HttpCommand httpCommand) {

synchronized (eventMeshHTTPServer.localClientInfoMapping) {
boolean isChange = true;

registerClient(unSubscribeRequestHeader, consumerGroup, unSubTopicList, unSubscribeUrl);

for (String unSubTopic : unSubTopicList) {
List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + unSubTopic);
Iterator<Client> clientIterator = groupTopicClients.iterator();
while (clientIterator.hasNext()) {
Client client = clientIterator.next();
if (StringUtils.equals(client.ip, ip)) {
if (StringUtils.equals(client.pid, pid) && StringUtils.equals(client.url, unSubscribeUrl)) {
httpLogger.warn("client {} start unsubscribe", JSONObject.toJSONString(client));
clientIterator.remove();
}
Expand Down Expand Up @@ -239,4 +237,41 @@ public void onResponse(HttpCommand httpCommand) {
public boolean rejectRequest() {
return false;
}

private void registerClient(UnSubscribeRequestHeader unSubscribeRequestHeader, String consumerGroup,
List<String> topicList, String url) {
for(String topic: topicList) {
Client client = new Client();
client.env = unSubscribeRequestHeader.getEnv();
client.dcn = unSubscribeRequestHeader.getDcn();
client.idc = unSubscribeRequestHeader.getIdc();
client.sys = unSubscribeRequestHeader.getSys();
client.ip = unSubscribeRequestHeader.getIp();
client.pid = unSubscribeRequestHeader.getPid();
client.consumerGroup = consumerGroup;
client.topic = topic;
client.url = url;
client.lastUpTime = new Date();

String groupTopicKey = client.consumerGroup + "@" + client.topic;
if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
List<Client> localClients = eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
boolean isContains = false;
for (Client localClient : localClients) {
if (StringUtils.equals(localClient.url, client.url)) {
isContains = true;
localClient.lastUpTime = client.lastUpTime;
break;
}
}
if (!isContains) {
localClients.add(client);
}
} else {
List<Client> clients = new ArrayList<>();
clients.add(client);
eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, clients);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,12 @@ public boolean subscribe(List<String> topicList, String url) throws Exception {
start();
}

RequestParam heartBeatParam = generateHeartBeatRequestParam(topicList, url);
RequestParam subscribeParam = generateSubscribeRequestParam(topicList, url);

long startTime = System.currentTimeMillis();
String target = selectEventMesh();
String subRes = "";
String heartRes = "";
try {
heartRes = HttpUtil.post(httpClient, target, heartBeatParam);
subRes = HttpUtil.post(httpClient, target, subscribeParam);
} catch (Exception ex) {
throw new EventMeshException(ex);
Expand Down Expand Up @@ -239,15 +236,12 @@ public void run() {

public boolean unsubscribe(List<String> topicList, String url) throws EventMeshException {
subscription.removeAll(topicList);
RequestParam heartBeatParam = generateHeartBeatRequestParam(topicList, url);
RequestParam unSubscribeParam = generateUnSubscribeRequestParam(topicList, url);

long startTime = System.currentTimeMillis();
String target = selectEventMesh();
String unSubRes = "";
String heartRes = "";
try {
heartRes = HttpUtil.post(httpClient, target, heartBeatParam);
unSubRes = HttpUtil.post(httpClient, target, unSubscribeParam);
} catch (Exception ex) {
throw new EventMeshException(ex);
Expand Down

0 comments on commit 16b3c62

Please sign in to comment.