diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/HTTPClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/HTTPClientHandler.java index e80107a7f7..3ca9d5c55c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/HTTPClientHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/HTTPClientHandler.java @@ -77,7 +77,7 @@ void delete(HttpExchange httpExchange) throws IOException { DeleteHTTPClientRequest deleteHTTPClientRequest = JsonUtils.toObject(request, DeleteHTTPClientRequest.class); String url = deleteHTTPClientRequest.url; - for (List clientList : eventMeshHTTPServer.localClientInfoMapping.values()) { + for (List clientList : eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().values()) { clientList.removeIf(client -> Objects.equals(client.getUrl(), url)); } @@ -118,7 +118,7 @@ void list(HttpExchange httpExchange) throws IOException { // Get the list of HTTP clients List getClientResponseList = new ArrayList<>(); - for (List clientList : eventMeshHTTPServer.localClientInfoMapping.values()) { + for (List clientList : eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().values()) { for (Client client : clientList) { GetClientResponse getClientResponse = new GetClientResponse( Optional.ofNullable(client.getEnv()).orElse(""), diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java index 3c1b216882..ec375a182a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java @@ -29,7 +29,7 @@ import org.apache.eventmesh.runtime.common.ServiceState; import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; import org.apache.eventmesh.runtime.constants.EventMeshConstants; -import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; +import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager; import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager; import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor; import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor; @@ -86,6 +86,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer { private transient ConsumerManager consumerManager; + private transient SubscriptionManager subscriptionManager; + private transient ProducerManager producerManager; private transient HttpRetryer httpRetryer; @@ -112,12 +114,6 @@ public class EventMeshHTTPServer extends AbstractHTTPServer { public transient HTTPClientPool httpClientPool = new HTTPClientPool(10); - public final ConcurrentHashMap localConsumerGroupMapping = - new ConcurrentHashMap<>(); - - public final ConcurrentHashMap> localClientInfoMapping = - new ConcurrentHashMap<>(); - public EventMeshHTTPServer(final EventMeshServer eventMeshServer, final EventMeshHTTPConfiguration eventMeshHttpConfiguration) { super(eventMeshHttpConfiguration.httpServerPort, eventMeshHttpConfiguration.eventMeshServerUseTls, eventMeshHttpConfiguration); @@ -246,6 +242,8 @@ private void init() throws Exception { this.setMetrics(new HTTPMetricsServer(this, metricsRegistries)); + subscriptionManager = new SubscriptionManager(); + consumerManager = new ConsumerManager(this); consumerManager.init(); @@ -409,6 +407,10 @@ private void initWebhook() throws Exception { this.getHandlerService().register(webHookProcessor, webhookExecutor); } + public SubscriptionManager getSubscriptionManager() { + return subscriptionManager; + } + public ConsumerManager getConsumerManager() { return consumerManager; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/ClientInfo.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/ClientInfo.java new file mode 100644 index 0000000000..60be5a1f14 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/ClientInfo.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.consumer; + +public class ClientInfo { + private String env; + + private String idc; + + private String sys; + + private String pid; + + private String ip; + + public String getEnv() { + return env; + } + + public void setEnv(String env) { + this.env = env; + } + + public String getIdc() { + return idc; + } + + public void setIdc(String idc) { + this.idc = idc; + } + + public String getSys() { + return sys; + } + + public void setSys(String sys) { + this.sys = sys; + } + + public String getPid() { + return pid; + } + + public void setPid(String pid) { + this.pid = pid; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java new file mode 100644 index 0000000000..04f7b8c578 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.consumer; + +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; +import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf; +import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SubscriptionManager { + private static final Logger logger = LoggerFactory.getLogger(SubscriptionManager.class); + private final ConcurrentHashMap localConsumerGroupMapping = + new ConcurrentHashMap<>(); + + private final ConcurrentHashMap> localClientInfoMapping = + new ConcurrentHashMap<>(); + + public ConcurrentHashMap getLocalConsumerGroupMapping() { + return localConsumerGroupMapping; + } + + public ConcurrentHashMap> getLocalClientInfoMapping() { + return localClientInfoMapping; + } + + public void registerClient(final ClientInfo clientInfo, final String consumerGroup, + final List subscriptionItems, final String url) { + for (final SubscriptionItem subscription : subscriptionItems) { + final String groupTopicKey = consumerGroup + "@" + subscription.getTopic(); + + List localClients = localClientInfoMapping.get(groupTopicKey); + + if (localClients == null) { + localClientInfoMapping.putIfAbsent(groupTopicKey, new ArrayList<>()); + localClients = localClientInfoMapping.get(groupTopicKey); + } + + boolean isContains = false; + for (final Client localClient : localClients) { + //TODO: compare the whole Client would be better? + if (StringUtils.equals(localClient.getUrl(), url)) { + isContains = true; + localClient.setLastUpTime(new Date()); + break; + } + } + + if (!isContains) { + Client client = new Client(); + client.setEnv(clientInfo.getEnv()); + client.setIdc(clientInfo.getIdc()); + client.setSys(clientInfo.getSys()); + client.setIp(clientInfo.getIp()); + client.setPid(clientInfo.getPid()); + client.setConsumerGroup(consumerGroup); + client.setTopic(subscription.getTopic()); + client.setUrl(url); + client.setLastUpTime(new Date()); + localClients.add(client); + } + } + } + + public void updateSubscription(ClientInfo clientInfo, String consumerGroup, + String url, List subscriptionList) { + for (final SubscriptionItem subscription : subscriptionList) { + final List groupTopicClients = localClientInfoMapping + .get(consumerGroup + "@" + subscription.getTopic()); + + if (CollectionUtils.isEmpty(groupTopicClients)) { + logger.error("group {} topic {} clients is empty", consumerGroup, subscription); + } + + ConsumerGroupConf consumerGroupConf = localConsumerGroupMapping.get(consumerGroup); + if (consumerGroupConf == null) { + // new subscription + ConsumerGroupConf prev = localConsumerGroupMapping.putIfAbsent(consumerGroup, new ConsumerGroupConf(consumerGroup)); + if (prev == null) { + logger.info("add new subscription, consumer group: {}", consumerGroup); + } + consumerGroupConf = localConsumerGroupMapping.get(consumerGroup); + } + + ConsumerGroupTopicConf consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf() + .get(subscription.getTopic()); + if (consumerGroupTopicConf == null) { + consumerGroupConf.getConsumerGroupTopicConf().computeIfAbsent(subscription.getTopic(), (topic) -> { + ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf(); + newTopicConf.setConsumerGroup(consumerGroup); + newTopicConf.setTopic(topic); + newTopicConf.setSubscriptionItem(subscription); + logger.info("add new {}", newTopicConf); + return newTopicConf; + }); + consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf().get(subscription.getTopic()); + } + + consumerGroupTopicConf.getUrls().add(url); + if (!consumerGroupTopicConf.getIdcUrls().containsKey(clientInfo.getIdc())) { + consumerGroupTopicConf.getIdcUrls().putIfAbsent(clientInfo.getIdc(), new ArrayList<>()); + } + //TODO: idcUrl list is not thread-safe + consumerGroupTopicConf.getIdcUrls().get(clientInfo.getIdc()).add(url); + } + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java index 2c44b5958e..289256c76f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import com.google.common.collect.Maps; @@ -27,7 +28,8 @@ public class ConsumerGroupConf implements Serializable { //eg . 5013-1A0 private String consumerGroup; - private Map consumerGroupTopicConf = Maps.newConcurrentMap(); + private final ConcurrentHashMap consumerGroupTopicConf + = new ConcurrentHashMap(); public ConsumerGroupConf(String consumerGroup) { this.consumerGroup = consumerGroup; @@ -45,10 +47,6 @@ public Map getConsumerGroupTopicConf() { return consumerGroupTopicConf; } - public void setConsumerGroupTopicConf(Map consumerGroupTopicConf) { - this.consumerGroupTopicConf = consumerGroupTopicConf; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java index 43103672d2..a0b2186be1 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HttpClientGroupMapping.java @@ -229,9 +229,7 @@ private boolean addSubscriptionByTopic(final String consumerGroup, final String urls.add(url); idcUrls.put(clientIdc, urls); consumeTopicConfig.setIdcUrls(idcUrls); - final Map map = new HashMap<>(); - map.put(subTopic.getTopic(), consumeTopicConfig); - consumerGroupConf.setConsumerGroupTopicConf(map); + consumerGroupConf.getConsumerGroupTopicConf().put(subTopic.getTopic(), consumeTopicConfig); localConsumerGroupMapping.put(consumerGroup, consumerGroupConf); isChange = true; } else { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java index a606f8e045..3bd84f0556 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java @@ -162,17 +162,17 @@ public void processRequest(final ChannelHandlerContext ctx, final AsyncContext> groupTopicClientMapping : tmpMap.entrySet()) { final List localClientList = - eventMeshHTTPServer.localClientInfoMapping.get(groupTopicClientMapping.getKey()); + eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().get(groupTopicClientMapping.getKey()); if (CollectionUtils.isEmpty(localClientList)) { - eventMeshHTTPServer.localClientInfoMapping + eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping() .put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue()); } else { final List tmpClientList = groupTopicClientMapping.getValue(); supplyClientInfoList(tmpClientList, localClientList); - eventMeshHTTPServer.localClientInfoMapping.put(groupTopicClientMapping.getKey(), localClientList); + eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().put(groupTopicClientMapping.getKey(), localClientList); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java index afc2cad28f..3a2e57845c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java @@ -29,21 +29,14 @@ import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; import org.apache.eventmesh.runtime.common.EventMeshTrace; import org.apache.eventmesh.runtime.constants.EventMeshConstants; -import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; -import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf; +import org.apache.eventmesh.runtime.core.consumer.ClientInfo; +import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.AbstractEventProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client; import org.apache.eventmesh.runtime.util.RemotingHelper; import org.apache.eventmesh.runtime.util.WebhookUtil; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; - -import java.util.ArrayList; import java.util.Collections; -import java.util.Date; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -173,87 +166,17 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final return; } - synchronized (eventMeshHTTPServer.localClientInfoMapping) { - - registerClient(requestWrapper, consumerGroup, subscriptionList, url); - - for (final SubscriptionItem subTopic : subscriptionList) { - final List groupTopicClients = eventMeshHTTPServer.localClientInfoMapping - .get(consumerGroup + "@" + subTopic.getTopic()); - - if (CollectionUtils.isEmpty(groupTopicClients)) { - if (log.isErrorEnabled()) { - log.error("group {} topic {} clients is empty", consumerGroup, subTopic); - } - } - - final Map> idcUrls = new HashMap<>(); - for (final Client client : groupTopicClients) { - if (idcUrls.containsKey(client.getIdc())) { - idcUrls.get(client.getIdc()).add(StringUtils.deleteWhitespace(client.getUrl())); - } else { - final List urls = new ArrayList<>(); - urls.add(client.getUrl()); - idcUrls.put(client.getIdc(), urls); - } - } - - ConsumerGroupConf consumerGroupConf = - eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup); - if (consumerGroupConf == null) { - // new subscription - consumerGroupConf = new ConsumerGroupConf(consumerGroup); - final ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf(); - consumeTopicConfig.setConsumerGroup(consumerGroup); - consumeTopicConfig.setTopic(subTopic.getTopic()); - consumeTopicConfig.setSubscriptionItem(subTopic); - consumeTopicConfig.setUrls(new HashSet<>(Collections.singletonList(url))); - consumeTopicConfig.setIdcUrls(idcUrls); - - final Map map = new HashMap<>(); - map.put(subTopic.getTopic(), consumeTopicConfig); - consumerGroupConf.setConsumerGroupTopicConf(map); - } else { - // already subscribed - final Map map = - consumerGroupConf.getConsumerGroupTopicConf(); - if (!map.containsKey(subTopic.getTopic())) { - //If there are multiple topics, append it - final ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf(); - newTopicConf.setConsumerGroup(consumerGroup); - newTopicConf.setTopic(subTopic.getTopic()); - newTopicConf.setSubscriptionItem(subTopic); - newTopicConf.setUrls(new HashSet<>(Collections.singletonList(url))); - newTopicConf.setIdcUrls(idcUrls); - map.put(subTopic.getTopic(), newTopicConf); - } - - for (final Map.Entry set : map.entrySet()) { - if (!StringUtils.equals(subTopic.getTopic(), set.getKey())) { - continue; - } - - final ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf(); - latestTopicConf.setConsumerGroup(consumerGroup); - latestTopicConf.setTopic(subTopic.getTopic()); - latestTopicConf.setSubscriptionItem(subTopic); - latestTopicConf.setUrls(new HashSet<>(Collections.singletonList(url))); - - final ConsumerGroupTopicConf currentTopicConf = set.getValue(); - latestTopicConf.getUrls().addAll(currentTopicConf.getUrls()); - latestTopicConf.setIdcUrls(idcUrls); - - map.put(set.getKey(), latestTopicConf); - } - } - eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf); - } + synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) { + ClientInfo clientInfo = getClientInfo(requestWrapper); + SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager(); + subscriptionManager.registerClient(clientInfo, consumerGroup, subscriptionList, url); + subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, subscriptionList); final long startTime = System.currentTimeMillis(); try { // subscription relationship change notification eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, - eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup)); + eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup)); responseBodyMap.put("retCode", EventMeshRetCode.SUCCESS.getRetCode()); responseBodyMap.put("retMsg", EventMeshRetCode.SUCCESS.getErrMsg()); @@ -282,44 +205,14 @@ public String[] paths() { return new String[]{RequestURI.SUBSCRIBE_LOCAL.getRequestURI()}; } - private void registerClient(final HttpEventWrapper requestWrapper, final String consumerGroup, - final List subscriptionItems, final String url) { + private ClientInfo getClientInfo(final HttpEventWrapper requestWrapper) { final Map requestHeaderMap = requestWrapper.getSysHeaderMap(); - for (final SubscriptionItem item : subscriptionItems) { - final Client client = new Client(); - client.setEnv(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString()); - client.setIdc(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString()); - client.setSys(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString()); - client.setIp(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString()); - client.setPid(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString()); - client.setConsumerGroup(consumerGroup); - client.setTopic(item.getTopic()); - client.setUrl(url); - client.setLastUpTime(new Date()); - - final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic(); - - List localClients = - eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey); - if (localClients == null) { - localClients = new ArrayList<>(); - eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, localClients); - } - - boolean isContains = false; - for (final Client localClient : localClients) { - if (StringUtils.equals(localClient.getUrl(), client.getUrl())) { - isContains = true; - localClient.setLastUpTime(client.getLastUpTime()); - break; - } - } - - if (!isContains) { - localClients.add(client); - } - - } + ClientInfo clientInfo = new ClientInfo(); + clientInfo.setEnv(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.ENV).toString()); + clientInfo.setIdc(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString()); + clientInfo.setSys(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString()); + clientInfo.setIp(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.IP).toString()); + clientInfo.setPid(requestHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString()); + return clientInfo; } - } \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java index ae703436fd..1810b842bc 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java @@ -127,13 +127,13 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final final String pid = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString(); - synchronized (eventMeshHTTPServer.localClientInfoMapping) { + synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) { boolean isChange = true; registerClient(requestWrapper, consumerGroup, unSubTopicList, unSubscribeUrl); for (final String unSubTopic : unSubTopicList) { - final List groupTopicClients = eventMeshHTTPServer.localClientInfoMapping + final List groupTopicClients = eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping() .get(consumerGroup + "@" + unSubTopic); final Iterator clientIterator = groupTopicClients.iterator(); while (clientIterator.hasNext()) { @@ -167,9 +167,9 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final } - synchronized (eventMeshHTTPServer.localConsumerGroupMapping) { + synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping()) { final ConsumerGroupConf consumerGroupConf = - eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup); + eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup); final Map map = consumerGroupConf.getConsumerGroupTopicConf(); for (final Map.Entry entry : map.entrySet()) { @@ -184,7 +184,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final map.put(unSubTopic, latestTopicConf); } } - eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf); + eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().put(consumerGroup, consumerGroupConf); } } else { isChange = false; @@ -195,7 +195,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final if (isChange) { try { eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, - eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup)); + eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup)); responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode()); responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg()); @@ -221,10 +221,10 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap); // clean ClientInfo - eventMeshHTTPServer.localClientInfoMapping.keySet() + eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().keySet() .removeIf(s -> StringUtils.contains(s, consumerGroup)); // clean ConsumerGroupInfo - eventMeshHTTPServer.localConsumerGroupMapping.keySet() + eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().keySet() .removeIf(s -> StringUtils.equals(consumerGroup, s)); } catch (Exception e) { if (log.isErrorEnabled()) { @@ -272,11 +272,11 @@ private void registerClient(final HttpEventWrapper requestWrapper, final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic(); List localClients = - eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey); + eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().get(groupTopicKey); if (localClients == null) { localClients = new ArrayList<>(); - eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, localClients); + eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().put(groupTopicKey, localClients); } boolean isContains = false; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java index f1935815ac..3f6c24387c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java @@ -31,11 +31,10 @@ import org.apache.eventmesh.runtime.acl.Acl; import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; -import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; -import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf; +import org.apache.eventmesh.runtime.core.consumer.ClientInfo; +import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler; -import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.RemotingHelper; @@ -44,13 +43,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -189,85 +182,17 @@ public void processRequest(final ChannelHandlerContext ctx, final AsyncContext groupTopicClients = eventMeshHTTPServer.localClientInfoMapping - .get(consumerGroup + "@" + subTopic.getTopic()); - - if (CollectionUtils.isEmpty(groupTopicClients)) { - LOGGER.error("group {} topic {} clients is empty", consumerGroup, subTopic); - } - - final Map> idcUrls = new HashMap<>(); - for (final Client client : groupTopicClients) { - List urls = idcUrls.get(client.getIdc()); - if (urls == null) { - urls = new ArrayList<>(); - idcUrls.put(client.getIdc(), urls); - } - urls.add(StringUtils.deleteWhitespace(client.getUrl())); - } - - ConsumerGroupConf consumerGroupConf = - eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup); - - if (consumerGroupConf == null) { - // new subscription - consumerGroupConf = new ConsumerGroupConf(consumerGroup); - final ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf(); - consumeTopicConfig.setConsumerGroup(consumerGroup); - consumeTopicConfig.setTopic(subTopic.getTopic()); - consumeTopicConfig.setSubscriptionItem(subTopic); - consumeTopicConfig.setUrls(new HashSet<>(Collections.singletonList(url))); - - consumeTopicConfig.setIdcUrls(idcUrls); - - final Map map = new HashMap<>(); - map.put(subTopic.getTopic(), consumeTopicConfig); - consumerGroupConf.setConsumerGroupTopicConf(map); - } else { - // already subscribed - final Map map = - consumerGroupConf.getConsumerGroupTopicConf(); - - if (!map.containsKey(subTopic.getTopic())) { - //If there are multiple topics, append it - final ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf(); - newTopicConf.setConsumerGroup(consumerGroup); - newTopicConf.setTopic(subTopic.getTopic()); - newTopicConf.setSubscriptionItem(subTopic); - newTopicConf.setUrls(new HashSet<>(Collections.singletonList(url))); - newTopicConf.setIdcUrls(idcUrls); - map.put(subTopic.getTopic(), newTopicConf); - } - - for (final Map.Entry set : map.entrySet()) { - if (StringUtils.equals(subTopic.getTopic(), set.getKey())) { - final ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf(); - latestTopicConf.setConsumerGroup(consumerGroup); - latestTopicConf.setTopic(subTopic.getTopic()); - latestTopicConf.setSubscriptionItem(subTopic); - latestTopicConf.setUrls(new HashSet<>(Collections.singletonList(url))); - - final ConsumerGroupTopicConf currentTopicConf = set.getValue(); - latestTopicConf.getUrls().addAll(currentTopicConf.getUrls()); - latestTopicConf.setIdcUrls(idcUrls); - - map.put(set.getKey(), latestTopicConf); - } - } - } - eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf); - } + synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) { + ClientInfo clientInfo = getClientInfo(subscribeRequestHeader); + SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager(); + subscriptionManager.registerClient(clientInfo, consumerGroup, subTopicList, url); + subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, subTopicList); final long startTime = System.currentTimeMillis(); try { // subscription relationship change notification eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, - eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup)); + eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup)); final CompleteHandler handler = new CompleteHandler() { @Override @@ -316,44 +241,13 @@ public boolean rejectRequest() { return false; } - private void registerClient(final SubscribeRequestHeader subscribeRequestHeader, final String consumerGroup, - final List subscriptionItems, final String url) { - for (final SubscriptionItem item : subscriptionItems) { - final Client client = new Client(); - client.setEnv(subscribeRequestHeader.getEnv()); - client.setIdc(subscribeRequestHeader.getIdc()); - client.setSys(subscribeRequestHeader.getSys()); - client.setIp(subscribeRequestHeader.getIp()); - client.setPid(subscribeRequestHeader.getPid()); - client.setConsumerGroup(consumerGroup); - client.setTopic(item.getTopic()); - client.setUrl(url); - client.setLastUpTime(new Date()); - - final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic(); - - List localClients = - eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey); - - if (localClients == null) { - localClients = new ArrayList<>(); - eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, localClients); - } - - boolean isContains = false; - for (final Client localClient : localClients) { - if (StringUtils.equals(localClient.getUrl(), client.getUrl())) { - isContains = true; - localClient.setLastUpTime(client.getLastUpTime()); - break; - } - } - - if (!isContains) { - localClients.add(client); - } - - } + private ClientInfo getClientInfo(final SubscribeRequestHeader subscribeRequestHeader) { + ClientInfo clientInfo = new ClientInfo(); + clientInfo.setEnv(subscribeRequestHeader.getEnv()); + clientInfo.setIdc(subscribeRequestHeader.getIdc()); + clientInfo.setSys(subscribeRequestHeader.getSys()); + clientInfo.setIp(subscribeRequestHeader.getIp()); + clientInfo.setPid(subscribeRequestHeader.getPid()); + return clientInfo; } - } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java index 3f6e6c18d8..0e6049d629 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java @@ -136,13 +136,13 @@ public void onResponse(final HttpCommand httpCommand) { } }; - synchronized (eventMeshHTTPServer.localClientInfoMapping) { + synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) { boolean isChange = true; registerClient(unSubscribeRequestHeader, consumerGroup, unSubTopicList, unSubscribeUrl); for (final String unSubTopic : unSubTopicList) { - final List groupTopicClients = eventMeshHTTPServer.localClientInfoMapping + final List groupTopicClients = eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping() .get(consumerGroup + "@" + unSubTopic); final Iterator clientIterator = groupTopicClients.iterator(); @@ -173,9 +173,9 @@ public void onResponse(final HttpCommand httpCommand) { } } - synchronized (eventMeshHTTPServer.localConsumerGroupMapping) { + synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping()) { final ConsumerGroupConf consumerGroupConf = - eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup); + eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup); final Map map = consumerGroupConf.getConsumerGroupTopicConf(); @@ -195,7 +195,7 @@ public void onResponse(final HttpCommand httpCommand) { map.put(unSubTopic, latestTopicConf); } } - eventMeshHTTPServer.localConsumerGroupMapping + eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping() .put(consumerGroup, consumerGroupConf); } } else { @@ -208,7 +208,7 @@ public void onResponse(final HttpCommand httpCommand) { if (isChange) { try { eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, - eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup)); + eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup)); responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS); @@ -239,10 +239,10 @@ public void onResponse(final HttpCommand httpCommand) { asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS); asyncContext.onComplete(responseEventMeshCommand, handler); // clean ClientInfo - eventMeshHTTPServer.localClientInfoMapping.keySet() + eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().keySet() .removeIf(s -> StringUtils.contains(s, consumerGroup)); // clean ConsumerGroupInfo - eventMeshHTTPServer.localConsumerGroupMapping.keySet() + eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().keySet() .removeIf(s -> StringUtils.equals(consumerGroup, s)); } catch (Exception e) { final HttpCommand err = asyncContext.getRequest().createHttpCommandResponse( @@ -286,9 +286,9 @@ private void registerClient(final UnSubscribeRequestHeader unSubscribeRequestHea client.setLastUpTime(new Date()); final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic(); - if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) { + if (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().containsKey(groupTopicKey)) { final List localClients = - eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey); + eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().get(groupTopicKey); boolean isContains = false; for (final Client localClient : localClients) { if (StringUtils.equals(localClient.getUrl(), client.getUrl())) { @@ -303,7 +303,7 @@ private void registerClient(final UnSubscribeRequestHeader unSubscribeRequestHea } else { final List clients = new ArrayList<>(); clients.add(client); - eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, clients); + eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().put(groupTopicKey, clients); } } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java index 4cc65a77be..5828b52489 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java @@ -84,7 +84,7 @@ protected void updateMetadata() { Map metadata = new HashMap<>(1 << 4); for (Map.Entry consumerGroupMap : - eventMeshHTTPServer.localConsumerGroupMapping.entrySet()) { + eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().entrySet()) { String consumerGroupKey = consumerGroupMap.getKey(); ConsumerGroupConf consumerGroupConf = consumerGroupMap.getValue();