diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 4ffb5b77d5424..6747bbb916d93 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1176,7 +1176,7 @@ protected void startLeaderElectionService() { protected void acquireSLANamespace() { try { // Namespace not created hence no need to unload it - NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config); + NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getLookupServiceAddress(), config); if (!this.pulsarResources.getNamespaceResources().namespaceExists(nsName)) { LOG.info("SLA Namespace = {} doesn't exist.", nsName); return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java index a632a47f05116..c1fe2a4930c34 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java @@ -64,7 +64,7 @@ public Map getBundleData() { public Map getBundleDataForLoadShedding() { return bundleData.entrySet().stream() - .filter(e -> !NamespaceService.filterNamespaceForShedding( + .filter(e -> !NamespaceService.isSLAOrHeartbeatNamespace( NamespaceBundle.getBundleNamespace(e.getKey()))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index cba499eb8eedb..85baf9ec4fbdf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -44,6 +44,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -86,6 +87,7 @@ import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies; import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; @@ -95,7 +97,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.slf4j.Logger; @@ -152,6 +153,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { @Getter private final List brokerFilterPipeline; + /** * The load data reporter. */ @@ -181,10 +183,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { // record split metrics private final AtomicReference> splitMetrics = new AtomicReference<>(); - private final ConcurrentOpenHashMap>> - lookupRequests = ConcurrentOpenHashMap.>>newBuilder() - .build(); + private final ConcurrentHashMap>> + lookupRequests = new ConcurrentHashMap<>(); private final CountDownLatch initWaiter = new CountDownLatch(1); /** @@ -197,7 +197,7 @@ public Set getOwnedServiceUnits() { } Set> entrySet = serviceUnitStateChannel.getOwnershipEntrySet(); String brokerId = brokerRegistry.getBrokerId(); - return entrySet.stream() + Set ownedServiceUnits = entrySet.stream() .filter(entry -> { var stateData = entry.getValue(); return stateData.state() == ServiceUnitState.Owned @@ -207,6 +207,36 @@ public Set getOwnedServiceUnits() { var bundle = entry.getKey(); return getNamespaceBundle(pulsar, bundle); }).collect(Collectors.toSet()); + // Add heartbeat and SLA monitor namespace bundle. + NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration()); + try { + NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory() + .getFullBundle(heartbeatNamespace); + ownedServiceUnits.add(fullBundle); + } catch (Exception e) { + log.warn("Failed to get heartbeat namespace bundle.", e); + } + NamespaceName heartbeatNamespaceV2 = NamespaceService + .getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration()); + try { + NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory() + .getFullBundle(heartbeatNamespaceV2); + ownedServiceUnits.add(fullBundle); + } catch (Exception e) { + log.warn("Failed to get heartbeat namespace V2 bundle.", e); + } + + NamespaceName slaMonitorNamespace = NamespaceService + .getSLAMonitorNamespace(brokerId, pulsar.getConfiguration()); + try { + NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory() + .getFullBundle(slaMonitorNamespace); + ownedServiceUnits.add(fullBundle); + } catch (Exception e) { + log.warn("Failed to get SLA Monitor namespace bundle.", e); + } + + return ownedServiceUnits; } public enum Role { @@ -261,102 +291,108 @@ public void start() throws PulsarServerException { if (this.started) { return; } - this.brokerRegistry = new BrokerRegistryImpl(pulsar); - this.leaderElectionService = new LeaderElectionService( - pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT, - state -> { - pulsar.getLoadManagerExecutor().execute(() -> { - if (state == LeaderElectionState.Leading) { - playLeader(); - } else { - playFollower(); - } + try { + this.brokerRegistry = new BrokerRegistryImpl(pulsar); + this.leaderElectionService = new LeaderElectionService( + pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT, + state -> { + pulsar.getLoadManagerExecutor().execute(() -> { + if (state == LeaderElectionState.Leading) { + playLeader(); + } else { + playFollower(); + } + }); }); - }); - this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); - this.brokerRegistry.start(); - this.splitManager = new SplitManager(splitCounter); - this.unloadManager = new UnloadManager(unloadCounter); - this.serviceUnitStateChannel.listen(unloadManager); - this.serviceUnitStateChannel.listen(splitManager); - this.leaderElectionService.start(); - this.serviceUnitStateChannel.start(); - this.antiAffinityGroupPolicyHelper = - new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel); - antiAffinityGroupPolicyHelper.listenFailureDomainUpdate(); - this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper); - this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter); - SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar); - this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies); - this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper)); - - createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC); - createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); + this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); + this.brokerRegistry.start(); + this.splitManager = new SplitManager(splitCounter); + this.unloadManager = new UnloadManager(unloadCounter); + this.serviceUnitStateChannel.listen(unloadManager); + this.serviceUnitStateChannel.listen(splitManager); + this.leaderElectionService.start(); + this.serviceUnitStateChannel.start(); + this.antiAffinityGroupPolicyHelper = + new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel); + antiAffinityGroupPolicyHelper.listenFailureDomainUpdate(); + this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper); + this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter); + SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar); + this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies); + this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper)); + + createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC); + createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); - try { - this.brokerLoadDataStore = LoadDataStoreFactory - .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); - this.brokerLoadDataStore.startTableView(); - this.topBundlesLoadDataStore = LoadDataStoreFactory - .create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); - } catch (LoadDataStoreException e) { - throw new PulsarServerException(e); - } + try { + this.brokerLoadDataStore = LoadDataStoreFactory + .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); + this.brokerLoadDataStore.startTableView(); + this.topBundlesLoadDataStore = LoadDataStoreFactory + .create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); + } catch (LoadDataStoreException e) { + throw new PulsarServerException(e); + } - this.context = LoadManagerContextImpl.builder() - .configuration(conf) - .brokerRegistry(brokerRegistry) - .brokerLoadDataStore(brokerLoadDataStore) - .topBundleLoadDataStore(topBundlesLoadDataStore).build(); - - this.brokerLoadDataReporter = - new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore); - - this.topBundleLoadDataReporter = - new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore); - this.serviceUnitStateChannel.listen(brokerLoadDataReporter); - this.serviceUnitStateChannel.listen(topBundleLoadDataReporter); - var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); - this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor() - .scheduleAtFixedRate(() -> { - try { - brokerLoadDataReporter.reportAsync(false); - // TODO: update broker load metrics using getLocalData - } catch (Throwable e) { - log.error("Failed to run the broker load manager executor job.", e); - } - }, - interval, - interval, TimeUnit.MILLISECONDS); - - this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor() - .scheduleAtFixedRate(() -> { - try { - // TODO: consider excluding the bundles that are in the process of split. - topBundleLoadDataReporter.reportAsync(false); - } catch (Throwable e) { - log.error("Failed to run the top bundles load manager executor job.", e); - } - }, - interval, - interval, TimeUnit.MILLISECONDS); - - this.monitorTask = this.pulsar.getLoadManagerExecutor() - .scheduleAtFixedRate(() -> { - monitor(); - }, - MONITOR_INTERVAL_IN_MILLIS, - MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); - - this.unloadScheduler = new UnloadScheduler( - pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context, - serviceUnitStateChannel, unloadCounter, unloadMetrics); - this.unloadScheduler.start(); - this.splitScheduler = new SplitScheduler( - pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context); - this.splitScheduler.start(); - this.initWaiter.countDown(); - this.started = true; + this.context = LoadManagerContextImpl.builder() + .configuration(conf) + .brokerRegistry(brokerRegistry) + .brokerLoadDataStore(brokerLoadDataStore) + .topBundleLoadDataStore(topBundlesLoadDataStore).build(); + + this.brokerLoadDataReporter = + new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore); + + this.topBundleLoadDataReporter = + new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore); + this.serviceUnitStateChannel.listen(brokerLoadDataReporter); + this.serviceUnitStateChannel.listen(topBundleLoadDataReporter); + var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); + this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor() + .scheduleAtFixedRate(() -> { + try { + brokerLoadDataReporter.reportAsync(false); + // TODO: update broker load metrics using getLocalData + } catch (Throwable e) { + log.error("Failed to run the broker load manager executor job.", e); + } + }, + interval, + interval, TimeUnit.MILLISECONDS); + + this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor() + .scheduleAtFixedRate(() -> { + try { + // TODO: consider excluding the bundles that are in the process of split. + topBundleLoadDataReporter.reportAsync(false); + } catch (Throwable e) { + log.error("Failed to run the top bundles load manager executor job.", e); + } + }, + interval, + interval, TimeUnit.MILLISECONDS); + + this.monitorTask = this.pulsar.getLoadManagerExecutor() + .scheduleAtFixedRate(() -> { + monitor(); + }, + MONITOR_INTERVAL_IN_MILLIS, + MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); + + this.unloadScheduler = new UnloadScheduler( + pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context, + serviceUnitStateChannel, unloadCounter, unloadMetrics); + this.unloadScheduler.start(); + this.splitScheduler = new SplitScheduler( + pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context); + this.splitScheduler.start(); + this.initWaiter.countDown(); + this.started = true; + } catch (Exception ex) { + if (this.brokerRegistry != null) { + brokerRegistry.close(); + } + } } @Override @@ -377,25 +413,38 @@ public CompletableFuture> assign(Optional getOwnerAsync( - ServiceUnitId serviceUnit, String bundle, boolean ownByLocalBrokerIfAbsent) { + private String getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) { + // Check if this is Heartbeat or SLAMonitor namespace + String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit); + if (candidateBroker == null) { + candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit); + } + if (candidateBroker == null) { + candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit); + } + if (candidateBroker != null) { + return candidateBroker.substring(candidateBroker.lastIndexOf('/') + 1); + } + return candidateBroker; + } + + private CompletableFuture getOrSelectOwnerAsync(ServiceUnitId serviceUnit, + String bundle) { return serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> { // If the bundle not assign yet, select and publish assign event to channel. if (broker.isEmpty()) { - CompletableFuture> selectedBroker; - if (ownByLocalBrokerIfAbsent) { - String brokerId = this.brokerRegistry.getBrokerId(); - selectedBroker = CompletableFuture.completedFuture(Optional.of(brokerId)); - } else { - selectedBroker = this.selectAsync(serviceUnit); - } - return selectedBroker.thenCompose(brokerOpt -> { + return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> { if (brokerOpt.isPresent()) { assignCounter.incrementSuccess(); log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle); @@ -425,7 +474,8 @@ private CompletableFuture> getBrokerLookupData( }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> { if (brokerLookupData.isEmpty()) { String errorMsg = String.format( - "Failed to look up a broker registry:%s for bundle:%s", broker, bundle); + "Failed to lookup broker:%s for bundle:%s, the broker has not been registered.", + broker, bundle); log.error(errorMsg); throw new IllegalStateException(errorMsg); } @@ -443,30 +493,37 @@ private CompletableFuture> getBrokerLookupData( public CompletableFuture tryAcquiringOwnership(NamespaceBundle namespaceBundle) { log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, brokerRegistry.getBrokerId()); final String bundle = namespaceBundle.toString(); - return dedupeLookupRequest(bundle, k -> { - final CompletableFuture owner = - this.getOwnerAsync(namespaceBundle, bundle, true); - return getBrokerLookupData(owner.thenApply(Optional::ofNullable), bundle); - }).thenApply(brokerLookupData -> { - if (brokerLookupData.isEmpty()) { - throw new IllegalStateException( - "Failed to get the broker lookup data for bundle: " + bundle); - } - return brokerLookupData.get().toNamespaceEphemeralData(); - }); + return assign(Optional.empty(), namespaceBundle) + .thenApply(brokerLookupData -> { + if (brokerLookupData.isEmpty()) { + String errorMsg = String.format( + "Failed to get the broker lookup data for bundle:%s", bundle); + log.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + return brokerLookupData.get().toNamespaceEphemeralData(); + }); } private CompletableFuture> dedupeLookupRequest( String key, Function>> provider) { - CompletableFuture> future = lookupRequests.computeIfAbsent(key, provider); - future.whenComplete((r, t) -> { - if (t != null) { + final MutableObject>> newFutureCreated = new MutableObject<>(); + try { + return lookupRequests.computeIfAbsent(key, k -> { + CompletableFuture> future = provider.apply(k); + newFutureCreated.setValue(future); + return future; + }); + } finally { + if (newFutureCreated.getValue() != null) { + newFutureCreated.getValue().whenComplete((v, ex) -> { + if (ex != null) { assignCounter.incrementFailure(); } - lookupRequests.remove(key); - } - ); - return future; + lookupRequests.remove(key, newFutureCreated.getValue()); + }); + } + } } public CompletableFuture> selectAsync(ServiceUnitId bundle) { @@ -521,15 +578,16 @@ public CompletableFuture checkOwnershipAsync(Optional to } public CompletableFuture> getOwnershipAsync(Optional topic, - ServiceUnitId bundleUnit) { - final String bundle = bundleUnit.toString(); - CompletableFuture> owner; + ServiceUnitId serviceUnit) { + final String bundle = serviceUnit.toString(); if (topic.isPresent() && isInternalTopic(topic.get().toString())) { - owner = serviceUnitStateChannel.getChannelOwnerAsync(); - } else { - owner = serviceUnitStateChannel.getOwnerAsync(bundle); + return serviceUnitStateChannel.getChannelOwnerAsync(); } - return owner; + String candidateBroker = getHeartbeatOrSLAMonitorBrokerId(serviceUnit); + if (candidateBroker != null) { + return CompletableFuture.completedFuture(Optional.of(candidateBroker)); + } + return serviceUnitStateChannel.getOwnerAsync(bundle); } public CompletableFuture> getOwnershipWithLookupDataAsync(ServiceUnitId bundleUnit) { @@ -543,6 +601,10 @@ public CompletableFuture> getOwnershipWithLookupDataA public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, Optional destinationBroker) { + if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) { + log.info("Skip unloading namespace bundle: {}.", bundle); + return CompletableFuture.completedFuture(null); + } return getOwnershipAsync(Optional.empty(), bundle) .thenCompose(brokerOpt -> { if (brokerOpt.isEmpty()) { @@ -577,6 +639,10 @@ private CompletableFuture unloadAsync(UnloadDecision unloadDecision, public CompletableFuture splitNamespaceBundleAsync(ServiceUnitId bundle, NamespaceBundleSplitAlgorithm splitAlgorithm, List boundaries) { + if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) { + log.info("Skip split namespace bundle: {}.", bundle); + return CompletableFuture.completedFuture(null); + } final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle.toString()); final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle.toString()); NamespaceBundle namespaceBundle = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 98aa02d4e72b4..d71513652e9b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -41,8 +41,6 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2; import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost; @@ -94,7 +92,6 @@ import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; @@ -1216,48 +1213,19 @@ private synchronized void doCleanup(String broker) { log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; long totalCleanupErrorCntStart = totalCleanupErrorCnt.get(); - String heartbeatNamespace = - NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), broker)).toString(); - String heartbeatNamespaceV2 = - NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, broker)).toString(); - Map orphanSystemServiceUnits = new HashMap<>(); for (var etr : tableview.entrySet()) { var stateData = etr.getValue(); var serviceUnit = etr.getKey(); var state = state(stateData); - if (StringUtils.equals(broker, stateData.dstBroker())) { - if (isActiveState(state)) { - if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) { - orphanSystemServiceUnits.put(serviceUnit, stateData); - } else if (serviceUnit.startsWith(heartbeatNamespace) - || serviceUnit.startsWith(heartbeatNamespaceV2)) { - // Skip the heartbeat namespace - log.info("Skip override heartbeat namespace bundle" - + " serviceUnit:{}, stateData:{}", serviceUnit, stateData); - tombstoneAsync(serviceUnit).whenComplete((__, e) -> { - if (e != null) { - log.error("Failed cleaning the heartbeat namespace ownership serviceUnit:{}, " - + "stateData:{}, cleanupErrorCnt:{}.", - serviceUnit, stateData, - totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e); - } - }); - } else { - overrideOwnership(serviceUnit, stateData, broker); - } - orphanServiceUnitCleanupCnt++; - } - - } else if (StringUtils.equals(broker, stateData.sourceBroker())) { - if (isInFlightState(state)) { - if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) { - orphanSystemServiceUnits.put(serviceUnit, stateData); - } else { - overrideOwnership(serviceUnit, stateData, broker); - } - orphanServiceUnitCleanupCnt++; + if (StringUtils.equals(broker, stateData.dstBroker()) && isActiveState(state) + || StringUtils.equals(broker, stateData.sourceBroker()) && isInFlightState(state)) { + if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) { + orphanSystemServiceUnits.put(serviceUnit, stateData); + } else { + overrideOwnership(serviceUnit, stateData, broker); } + orphanServiceUnitCleanupCnt++; } } @@ -1401,16 +1369,21 @@ protected void monitorOwnerships(List brokers) { String srcBroker = stateData.sourceBroker(); var state = stateData.state(); - if (isActiveState(state)) { - if (StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) { - inactiveBrokers.add(srcBroker); - } else if (StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) { - inactiveBrokers.add(dstBroker); - } else if (isInFlightState(state) - && now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) { - orphanServiceUnits.put(serviceUnit, stateData); - } - } else if (now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) { + if (isActiveState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) { + inactiveBrokers.add(srcBroker); + continue; + } + if (isActiveState(state) && StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) { + inactiveBrokers.add(dstBroker); + continue; + } + if (isActiveState(state) && isInFlightState(state) + && now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) { + orphanServiceUnits.put(serviceUnit, stateData); + continue; + } + + if (now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) { log.info("Found semi-terminal states to tombstone" + " serviceUnit:{}, stateData:{}", serviceUnit, stateData); tombstoneAsync(serviceUnit).whenComplete((__, e) -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java index 2f5c32197c1fd..624546fdff837 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java @@ -30,6 +30,8 @@ import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; @@ -70,7 +72,8 @@ public void update(Map bundleStats, int topk) { pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled(); for (var etr : bundleStats.entrySet()) { String bundle = etr.getKey(); - if (bundle.startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) { + // TODO: do not filter system topic while shedding + if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) { continue; } if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled && hasPolicies(bundle)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index d66e3c3b65d76..57c0cc7c0464b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -135,7 +135,7 @@ public class NamespaceService implements AutoCloseable { public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)"); public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s"; public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s"; - public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s:%s"; + public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s"; private final ConcurrentOpenHashMap namespaceClients; @@ -189,7 +189,7 @@ public CompletableFuture> getBrokerServiceUrlAsync(TopicN CompletableFuture> future = getBundleAsync(topic) .thenCompose(bundle -> { // Do redirection if the cluster is in rollback or deploying. - return redirectManager.findRedirectLookupResultAsync().thenCompose(optResult -> { + return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> { if (optResult.isPresent()) { LOG.info("[{}] Redirect lookup request to {} for topic {}", pulsar.getSafeWebServiceAddress(), optResult.get(), topic); @@ -221,6 +221,13 @@ public CompletableFuture> getBrokerServiceUrlAsync(TopicN return future; } + private CompletableFuture> findRedirectLookupResultAsync(ServiceUnitId bundle) { + if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) { + return CompletableFuture.completedFuture(Optional.empty()); + } + return redirectManager.findRedirectLookupResultAsync(); + } + public CompletableFuture getBundleAsync(TopicName topic) { return bundleFactory.getBundlesAsync(topic.getNamespaceObject()) .thenApply(bundles -> bundles.findBundle(topic)); @@ -288,8 +295,7 @@ public Optional getWebServiceUrl(ServiceUnitId suName, LookupOptions option private CompletableFuture> internalGetWebServiceUrl(@Nullable ServiceUnitId topic, NamespaceBundle bundle, LookupOptions options) { - - return redirectManager.findRedirectLookupResultAsync().thenCompose(optResult -> { + return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> { if (optResult.isPresent()) { LOG.info("[{}] Redirect lookup request to {} for topic {}", pulsar.getSafeWebServiceAddress(), optResult.get(), topic); @@ -695,7 +701,7 @@ public CompletableFuture createLookupResult(String candidateBroker return lookupFuture; } - private boolean isBrokerActive(String candidateBroker) { + public boolean isBrokerActive(String candidateBroker) { String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker); Set availableBrokers = getAvailableBrokers(); if (availableBrokers.contains(candidateBrokerHostAndPort)) { @@ -1564,7 +1570,7 @@ public CompletableFuture checkOwnershipPresentAsync(NamespaceBundle bun } public void unloadSLANamespace() throws Exception { - NamespaceName namespaceName = getSLAMonitorNamespace(host, config); + NamespaceName namespaceName = getSLAMonitorNamespace(pulsar.getLookupServiceAddress(), config); LOG.info("Checking owner for SLA namespace {}", namespaceName); @@ -1589,14 +1595,8 @@ public static NamespaceName getHeartbeatNamespaceV2(String lookupBroker, Service return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, lookupBroker)); } - public static NamespaceName getSLAMonitorNamespace(String host, ServiceConfiguration config) { - Integer port = null; - if (config.getWebServicePort().isPresent()) { - port = config.getWebServicePort().get(); - } else if (config.getWebServicePortTls().isPresent()) { - port = config.getWebServicePortTls().get(); - } - return NamespaceName.get(String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, port)); + public static NamespaceName getSLAMonitorNamespace(String lookupBroker, ServiceConfiguration config) { + return NamespaceName.get(String.format(SLA_NAMESPACE_FMT, config.getClusterName(), lookupBroker)); } public static String checkHeartbeatNamespace(ServiceUnitId ns) { @@ -1640,7 +1640,7 @@ public static boolean isSystemServiceNamespace(String namespace) { * @param namespace the namespace name * @return True if namespace is HEARTBEAT_NAMESPACE or SLA_NAMESPACE */ - public static boolean filterNamespaceForShedding(String namespace) { + public static boolean isSLAOrHeartbeatNamespace(String namespace) { return SLA_NAMESPACE_PATTERN.matcher(namespace).matches() || HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches() || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches(); @@ -1653,14 +1653,16 @@ public static boolean isHeartbeatNamespace(ServiceUnitId ns) { } public boolean registerSLANamespace() throws PulsarServerException { - boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false); + String lookupServiceAddress = pulsar.getLookupServiceAddress(); + boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(lookupServiceAddress, config), false); if (isNameSpaceRegistered) { if (LOG.isDebugEnabled()) { LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}", - getSLAMonitorNamespace(host, config)); + getSLAMonitorNamespace(lookupServiceAddress, config)); } } else if (LOG.isDebugEnabled()) { - LOG.debug("SLA Monitoring not owned by the broker: ns={}", getSLAMonitorNamespace(host, config)); + LOG.debug("SLA Monitoring not owned by the broker: ns={}", + getSLAMonitorNamespace(lookupServiceAddress, config)); } return isNameSpaceRegistered; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 9ce57a88540c3..011e7174cbec2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -35,6 +35,9 @@ import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown; +import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespace; +import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespaceV2; +import static org.apache.pulsar.broker.namespace.NamespaceService.getSLAMonitorNamespace; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -594,6 +597,18 @@ public void testDeployAndRollbackLoadManager() throws Exception { assertTrue(webServiceUrl3.isPresent()); assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString()); + List pulsarServices = List.of(pulsar1, pulsar2, pulsar3); + for (PulsarService pulsarService : pulsarServices) { + // Test lookup heartbeat namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupHeartbeatOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl()); + } + // Test lookup SLA namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupSLANamespaceOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl()); + } + } + // Test deploy new broker with new load manager ServiceConfiguration conf = getDefaultConf(); conf.setAllowAutoTopicCreation(true); @@ -642,10 +657,48 @@ public void testDeployAndRollbackLoadManager() throws Exception { assertTrue(webServiceUrl4.isPresent()); assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString()); + pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4); + for (PulsarService pulsarService : pulsarServices) { + // Test lookup heartbeat namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupHeartbeatOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl()); + } + // Test lookup SLA namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupSLANamespaceOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl()); + } + } } } } + private void assertLookupHeartbeatOwner(PulsarService pulsar, + String lookupServiceAddress, + String expectedBrokerServiceUrl) throws Exception { + NamespaceName heartbeatNamespaceV1 = + getHeartbeatNamespace(lookupServiceAddress, pulsar.getConfiguration()); + + String heartbeatV1Topic = heartbeatNamespaceV1.getPersistentTopicName("test"); + assertEquals(pulsar.getAdminClient().lookups().lookupTopic(heartbeatV1Topic), expectedBrokerServiceUrl); + + NamespaceName heartbeatNamespaceV2 = + getHeartbeatNamespaceV2(lookupServiceAddress, pulsar.getConfiguration()); + + String heartbeatV2Topic = heartbeatNamespaceV2.getPersistentTopicName("test"); + assertEquals(pulsar.getAdminClient().lookups().lookupTopic(heartbeatV2Topic), expectedBrokerServiceUrl); + } + + private void assertLookupSLANamespaceOwner(PulsarService pulsar, + String lookupServiceAddress, + String expectedBrokerServiceUrl) throws Exception { + NamespaceName slaMonitorNamespace = getSLAMonitorNamespace(lookupServiceAddress, pulsar.getConfiguration()); + String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test"); + String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + log.info("Topic {} Lookup result: {}", slaMonitorTopic, result); + assertNotNull(result); + assertEquals(result, expectedBrokerServiceUrl); + } + @Test public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Exception { var topBundlesLoadDataStorePrimary = @@ -1043,15 +1096,15 @@ public void testListTopic() throws Exception { admin.namespaces().deleteNamespace(namespace, true); } - @Test(timeOut = 30 * 1000) + @Test(timeOut = 30 * 1000, priority = -1) public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exception { NamespaceName heartbeatNamespacePulsar1V1 = - NamespaceService.getHeartbeatNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration()); + getHeartbeatNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration()); NamespaceName heartbeatNamespacePulsar1V2 = NamespaceService.getHeartbeatNamespaceV2(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration()); NamespaceName heartbeatNamespacePulsar2V1 = - NamespaceService.getHeartbeatNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration()); + getHeartbeatNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration()); NamespaceName heartbeatNamespacePulsar2V2 = NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration()); @@ -1068,22 +1121,22 @@ public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exceptio Set ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits(); log.info("Owned service units: {}", ownedServiceUnitsByPulsar1); // heartbeat namespace bundle will own by pulsar1 - assertEquals(ownedServiceUnitsByPulsar1.size(), 2); + assertEquals(ownedServiceUnitsByPulsar1.size(), 3); assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1)); assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2)); Set ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits(); log.info("Owned service units: {}", ownedServiceUnitsByPulsar2); - assertEquals(ownedServiceUnitsByPulsar2.size(), 2); + assertEquals(ownedServiceUnitsByPulsar2.size(), 3); assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3)); assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4)); Map ownedNamespacesByPulsar1 = admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress()); Map ownedNamespacesByPulsar2 = admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress()); - assertEquals(ownedNamespacesByPulsar1.size(), 2); + assertEquals(ownedNamespacesByPulsar1.size(), 3); assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString())); assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString())); - assertEquals(ownedNamespacesByPulsar2.size(), 2); + assertEquals(ownedNamespacesByPulsar2.size(), 3); assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString())); assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString())); @@ -1134,7 +1187,8 @@ public void testTryAcquiringOwnership() String topic = "persistent://" + namespace + "/test"; NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get(); NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get(); - assertEquals(namespaceEphemeralData.getNativeUrl(), pulsar1.getBrokerServiceUrl()); + assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl()) + .contains(namespaceEphemeralData.getNativeUrl())); admin.namespaces().deleteNamespace(namespace, true); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index de21842f4584c..a226df53e12f3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -30,8 +30,6 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2; import static org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost; import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost; @@ -89,7 +87,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.TableViewImpl; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -639,7 +636,7 @@ public void splitAndRetryTest() throws Exception { var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; validateMonitorCounters(leader, 0, - 1, + 3, 0, 0, 0, @@ -756,34 +753,6 @@ public void handleBrokerDeletionEventTest() waitUntilNewOwner(channel1, bundle2, broker); waitUntilNewOwner(channel2, bundle2, broker); - // Register the broker-1 heartbeat namespace bundle. - String heartbeatNamespaceBroker1V1 = NamespaceName - .get(String.format(HEARTBEAT_NAMESPACE_FMT, conf.getClusterName(), broker)).toString(); - String heartbeatNamespaceBroker1V2 = NamespaceName - .get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, broker)).toString(); - String heartbeatNamespaceBroker1V1Bundle = heartbeatNamespaceBroker1V1 + "/0x00000000_0xfffffff0"; - String heartbeatNamespaceBroker1V2Bundle = heartbeatNamespaceBroker1V2 + "/0x00000000_0xfffffff0"; - channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V1Bundle, broker); - channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V2Bundle, broker); - - // Register the broker-2 heartbeat namespace bundle. - String heartbeatNamespaceBroker2V1 = NamespaceName - .get(String.format(HEARTBEAT_NAMESPACE_FMT, conf.getClusterName(), lookupServiceAddress2)).toString(); - String heartbeatNamespaceBroker2V2 = NamespaceName - .get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, lookupServiceAddress2)).toString(); - String heartbeatNamespaceBroker2V1Bundle = heartbeatNamespaceBroker2V1 + "/0x00000000_0xfffffff0"; - String heartbeatNamespaceBroker2V2Bundle = heartbeatNamespaceBroker2V2 + "/0x00000000_0xfffffff0"; - channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V1Bundle, lookupServiceAddress2); - channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V2Bundle, lookupServiceAddress2); - waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, broker); - waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, broker); - waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, broker); - waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, broker); - waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle, lookupServiceAddress2); - waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle, lookupServiceAddress2); - waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle, lookupServiceAddress2); - waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle, lookupServiceAddress2); - // Verify to transfer the ownership to the other broker. channel1.publishUnloadEventAsync(new Unload(broker, bundle1, Optional.of(lookupServiceAddress2))); waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2); @@ -806,16 +775,6 @@ public void handleBrokerDeletionEventTest() waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2); waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2); - waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, null); - waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, null); - waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, null); - waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, null); - - waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle, null); - waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle, null); - waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle, null); - waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle, null); - verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any()); verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); @@ -827,7 +786,7 @@ public void handleBrokerDeletionEventTest() validateMonitorCounters(leaderChannel, 2, 0, - 7, + 3, 0, 2, 0, @@ -858,7 +817,7 @@ public void handleBrokerDeletionEventTest() validateMonitorCounters(leaderChannel, 2, 0, - 7, + 3, 0, 3, 0, @@ -879,7 +838,7 @@ public void handleBrokerDeletionEventTest() validateMonitorCounters(leaderChannel, 2, 0, - 7, + 3, 0, 3, 0, @@ -901,7 +860,7 @@ public void handleBrokerDeletionEventTest() validateMonitorCounters(leaderChannel, 2, 0, - 7, + 3, 0, 4, 0, @@ -923,7 +882,7 @@ public void handleBrokerDeletionEventTest() validateMonitorCounters(leaderChannel, 3, 0, - 9, + 5, 0, 4, 0, @@ -952,7 +911,7 @@ public void handleBrokerDeletionEventTest() validateMonitorCounters(leaderChannel, 3, 0, - 9, + 5, 0, 4, 1, @@ -1447,7 +1406,7 @@ public void splitAndRetryFailureTest() throws Exception { validateMonitorCounters(leader, 0, - 1, + 3, 1, 0, 0,