Skip to content

Commit

Permalink
[fix][broker] Avoid infinite bundle unloading (#20822)
Browse files Browse the repository at this point in the history
(cherry picked from commit 3f63768)
  • Loading branch information
Demogorgon314 committed Jul 26, 2023
1 parent fea2f9b commit a0499bd
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -554,4 +554,10 @@ public static void filterBrokersWithLargeTopicCount(Set<String> brokerCandidateC
brokerCandidateCache.addAll(filteredBrokerCandidates);
}
}

public static NamespaceBundle getNamespaceBundle(PulsarService pulsar, String bundle) {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
Expand Down Expand Up @@ -642,9 +643,21 @@ public synchronized void doLoadShedding() {
if (!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
return;
}
NamespaceBundle bundleToUnload = LoadManagerShared.getNamespaceBundle(pulsar, bundle);
Optional<String> destBroker = this.selectBroker(bundleToUnload);
if (!destBroker.isPresent()) {
log.info("[{}] No broker available to unload bundle {} from broker {}",
strategy.getClass().getSimpleName(), bundle, broker);
return;
}
if (destBroker.get().equals(broker)) {
log.warn("[{}] The destination broker {} is the same as the current owner broker for Bundle {}",
strategy.getClass().getSimpleName(), destBroker.get(), bundle);
return;
}

log.info("[{}] Unloading bundle: {} from broker {}",
strategy.getClass().getSimpleName(), bundle, broker);
log.info("[{}] Unloading bundle: {} from broker {} to dest broker {}",
strategy.getClass().getSimpleName(), bundle, broker, destBroker.get());
try {
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange);
loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis());
Expand Down Expand Up @@ -799,16 +812,56 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
// If the given bundle is already in preallocated, return the selected broker.
return Optional.of(preallocatedBundleToBroker.get(bundle));
}
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);

// filter brokers which owns topic higher than threshold
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
conf.getLoadBalancerBrokerMaxTopics());
Optional<String> broker = selectBroker(serviceUnit);
if (!broker.isPresent()) {
// If no broker is selected, return empty.
return broker;
}
// Add new bundle to preallocated.
preallocateBundle(bundle, broker.get());
return broker;
}
} finally {
selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
}

private void preallocateBundle(String bundle, String broker) {
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker);

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}
}

@VisibleForTesting
Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
synchronized (brokerCandidateCache) {
final String bundle = serviceUnit.toString();
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);

// filter brokers which owns topic higher than threshold
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
conf.getLoadBalancerBrokerMaxTopics());

// distribute namespaces to domain and brokers according to anti-affinity-group
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(),
Expand All @@ -820,71 +873,50 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
brokerToNamespaceToBundleRange);
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);

// Use the filter pipeline to finalize broker candidates.
try {
for (BrokerFilter filter : filterPipeline) {
filter.filter(brokerCandidateCache, data, loadData, conf);
}
} catch (BrokerFilterException x) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}

if (brokerCandidateCache.isEmpty()) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
// Use the filter pipeline to finalize broker candidates.
try {
for (BrokerFilter filter : filterPipeline) {
filter.filter(brokerCandidateCache, data, loadData, conf);
}
} catch (BrokerFilterException x) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}

// Choose a broker among the potentially smaller filtered list, when possible
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (log.isDebugEnabled()) {
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
}
if (brokerCandidateCache.isEmpty()) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}

if (!broker.isPresent()) {
// No brokers available
return broker;
}
// Choose a broker among the potentially smaller filtered list, when possible
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (log.isDebugEnabled()) {
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
}

final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
Optional<String> brokerTmp =
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (brokerTmp.isPresent()) {
broker = brokerTmp;
}
}
if (!broker.isPresent()) {
// No brokers available
return broker;
}

// Add new bundle to preallocated.
loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker.get());

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker.get(),
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
Optional<String> brokerTmp =
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (brokerTmp.isPresent()) {
broker = brokerTmp;
}
return broker;
}
} finally {
selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
return broker;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance;
package org.apache.pulsar.broker.loadbalance.impl;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -51,12 +53,12 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadBalancerTestingUtils;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -329,38 +331,70 @@ public void testLoadShedding() throws Exception {
bundleReference.set(invocation.getArguments()[0].toString() + '/' + invocation.getArguments()[1]);
return null;
}).when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());

AtomicReference<Optional<String>> selectedBrokerRef = new AtomicReference<>();
ModularLoadManagerImpl primaryLoadManagerSpy = spy(primaryLoadManager);
doAnswer(invocation -> {
ServiceUnitId serviceUnitId = (ServiceUnitId) invocation.getArguments()[0];
Optional<String> broker = primaryLoadManager.selectBroker(serviceUnitId);
selectedBrokerRef.set(broker);
return broker;
}).when(primaryLoadManagerSpy).selectBroker(any());

setField(pulsar1.getAdminClient(), "namespaces", namespacesSpy1);
pulsar1.getConfiguration().setLoadBalancerEnabled(true);
final LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData");
final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy, "loadData");
final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryHost));
when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
brokerDataMap.put(primaryHost, brokerDataSpy1);
// Need to update all the bundle data for the shredder to see the spy.
primaryLoadManager.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080"));
primaryLoadManagerSpy.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080"));

Thread.sleep(100);
localBrokerData.setCpu(new ResourceUsage(80, 100));
primaryLoadManager.doLoadShedding();
primaryLoadManagerSpy.doLoadShedding();

// 80% is below overload threshold: verify nothing is unloaded.
verify(namespacesSpy1, Mockito.times(0)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
verify(namespacesSpy1, Mockito.times(0))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());

localBrokerData.setCpu(new ResourceUsage(90, 100));
primaryLoadManager.doLoadShedding();
primaryLoadManagerSpy.doLoadShedding();
// Most expensive bundle will be unloaded.
verify(namespacesSpy1, Mockito.times(1)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
verify(namespacesSpy1, Mockito.times(1))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
assertEquals(bundleReference.get(), mockBundleName(2));
assertEquals(selectedBrokerRef.get().get(), secondaryHost);

primaryLoadManager.doLoadShedding();
primaryLoadManagerSpy.doLoadShedding();
// Now less expensive bundle will be unloaded (normally other bundle would move off and nothing would be
// unloaded, but this is not the case due to the spy's behavior).
verify(namespacesSpy1, Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
verify(namespacesSpy1, Mockito.times(2))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
assertEquals(bundleReference.get(), mockBundleName(1));
assertEquals(selectedBrokerRef.get().get(), secondaryHost);

primaryLoadManager.doLoadShedding();
primaryLoadManagerSpy.doLoadShedding();
// Now both are in grace period: neither should be unloaded.
verify(namespacesSpy1, Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
verify(namespacesSpy1, Mockito.times(2))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
assertEquals(selectedBrokerRef.get().get(), secondaryHost);

// Test bundle transfer to same broker

loadData.getRecentlyUnloadedBundles().clear();
primaryLoadManagerSpy.doLoadShedding();
verify(namespacesSpy1, Mockito.times(3))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());

doReturn(Optional.of(primaryHost)).when(primaryLoadManagerSpy).selectBroker(any());
loadData.getRecentlyUnloadedBundles().clear();
primaryLoadManagerSpy.doLoadShedding();
// The bundle shouldn't be unloaded because the broker is the same.
verify(namespacesSpy1, Mockito.times(3))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());

}

// Test that ModularLoadManagerImpl will determine that writing local data to ZooKeeper is necessary if certain
Expand Down

0 comments on commit a0499bd

Please sign in to comment.