listener) {
this.leaderElection = cs.getLeaderElection(LeaderBroker.class, electionRoot, listener);
- this.localValue = new LeaderBroker(localWebServiceAddress);
+ this.localValue = new LeaderBroker(brokerId, serviceUrl);
}
public void start() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
index 42ef264b6db04..61f34ef4901ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -45,6 +47,7 @@ public class LinuxInfoUtils {
private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage";
private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us";
+
// proc states
private static final String PROC_STAT_PATH = "/proc/stat";
private static final String NIC_PATH = "/sys/class/net/";
@@ -52,6 +55,30 @@ public class LinuxInfoUtils {
private static final int ARPHRD_ETHER = 1;
private static final String NIC_SPEED_TEMPLATE = "/sys/class/net/%s/speed";
+ private static Object /*jdk.internal.platform.Metrics*/ metrics;
+ private static Method getMetricsProviderMethod;
+ private static Method getCpuQuotaMethod;
+ private static Method getCpuPeriodMethod;
+ private static Method getCpuUsageMethod;
+
+ static {
+ try {
+ metrics = Class.forName("jdk.internal.platform.Container").getMethod("metrics")
+ .invoke(null);
+ if (metrics != null) {
+ getMetricsProviderMethod = metrics.getClass().getMethod("getProvider");
+ getMetricsProviderMethod.setAccessible(true);
+ getCpuQuotaMethod = metrics.getClass().getMethod("getCpuQuota");
+ getCpuQuotaMethod.setAccessible(true);
+ getCpuPeriodMethod = metrics.getClass().getMethod("getCpuPeriod");
+ getCpuPeriodMethod.setAccessible(true);
+ getCpuUsageMethod = metrics.getClass().getMethod("getCpuUsage");
+ getCpuUsageMethod.setAccessible(true);
+ }
+ } catch (Throwable e) {
+ log.warn("Failed to get runtime metrics", e);
+ }
+ }
/**
* Determine whether the OS is the linux kernel.
@@ -66,9 +93,14 @@ public static boolean isLinux() {
*/
public static boolean isCGroupEnabled() {
try {
- return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
+ if (metrics == null) {
+ return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
+ }
+ String provider = (String) getMetricsProviderMethod.invoke(metrics);
+ log.info("[LinuxInfo] The system metrics provider is: {}", provider);
+ return provider.contains("cgroup");
} catch (Exception e) {
- log.warn("[LinuxInfo] Failed to check cgroup CPU usage file: {}", e.getMessage());
+ log.warn("[LinuxInfo] Failed to check cgroup CPU: {}", e.getMessage());
return false;
}
}
@@ -81,13 +113,21 @@ public static boolean isCGroupEnabled() {
public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
if (isCGroupsEnabled) {
try {
- long quota = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
- long period = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
+ long quota;
+ long period;
+ if (metrics != null && getCpuQuotaMethod != null && getCpuPeriodMethod != null) {
+ quota = (long) getCpuQuotaMethod.invoke(metrics);
+ period = (long) getCpuPeriodMethod.invoke(metrics);
+ } else {
+ quota = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
+ period = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
+ }
+
if (quota > 0) {
return 100.0 * quota / period;
}
- } catch (IOException e) {
- log.warn("[LinuxInfo] Failed to read CPU quotas from cgroups", e);
+ } catch (Exception e) {
+ log.warn("[LinuxInfo] Failed to read CPU quotas from cgroup", e);
// Fallback to availableProcessors
}
}
@@ -99,11 +139,14 @@ public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
* Get CGroup cpu usage.
* @return Cpu usage
*/
- public static double getCpuUsageForCGroup() {
+ public static long getCpuUsageForCGroup() {
try {
+ if (metrics != null && getCpuUsageMethod != null) {
+ return (long) getCpuUsageMethod.invoke(metrics);
+ }
return readLongFromFile(Paths.get(CGROUPS_CPU_USAGE_PATH));
- } catch (IOException e) {
- log.error("[LinuxInfo] Failed to read CPU usage from {}", CGROUPS_CPU_USAGE_PATH, e);
+ } catch (Exception e) {
+ log.error("[LinuxInfo] Failed to read CPU usage from cgroup", e);
return -1;
}
}
@@ -118,7 +161,8 @@ public static double getCpuUsageForCGroup() {
*
*
* Line is split in "words", filtering the first. The sum of all numbers give the amount of cpu cycles used this
- * far. Real CPU usage should equal the sum substracting the idle cycles, this would include iowait, irq and steal.
+ * far. Real CPU usage should equal the sum substracting the idle cycles(that is idle+iowait), this would include
+ * cpu, user, nice, system, irq, softirq, steal, guest and guest_nice.
*/
public static ResourceUsage getCpuUsageForEntireHost() {
try (Stream stream = Files.lines(Paths.get(PROC_STAT_PATH))) {
@@ -132,7 +176,7 @@ public static ResourceUsage getCpuUsageForEntireHost() {
.filter(s -> !s.contains("cpu"))
.mapToLong(Long::parseLong)
.sum();
- long idle = Long.parseLong(words[4]);
+ long idle = Long.parseLong(words[4]) + Long.parseLong(words[5]);
return ResourceUsage.builder()
.usage(total - idle)
.idle(idle)
@@ -291,6 +335,11 @@ enum Operstate {
UP
}
+ @VisibleForTesting
+ public static Object getMetrics() {
+ return metrics;
+ }
+
@AllArgsConstructor
public enum NICUsageType {
// transport
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
index 87f630f1a09fb..c1fe2a4930c34 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
@@ -64,7 +64,7 @@ public Map getBundleData() {
public Map getBundleDataForLoadShedding() {
return bundleData.entrySet().stream()
- .filter(e -> !NamespaceService.isSystemServiceNamespace(
+ .filter(e -> !NamespaceService.isSLAOrHeartbeatNamespace(
NamespaceBundle.getBundleNamespace(e.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index 0de2ae92db61a..f9f36b705d4c4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -43,7 +43,7 @@
public class NoopLoadManager implements LoadManager {
private PulsarService pulsar;
- private String lookupServiceAddress;
+ private String brokerId;
private ResourceUnit localResourceUnit;
private LockManager lockManager;
private Map bundleBrokerAffinityMap;
@@ -57,16 +57,15 @@ public void initialize(PulsarService pulsar) {
@Override
public void start() throws PulsarServerException {
- lookupServiceAddress = pulsar.getLookupServiceAddress();
- localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
- new PulsarResourceDescription());
+ brokerId = pulsar.getBrokerId();
+ localResourceUnit = new SimpleResourceUnit(brokerId, new PulsarResourceDescription());
- LocalBrokerData localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(),
+ LocalBrokerData localData = new LocalBrokerData(pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
localData.setProtocols(pulsar.getProtocolDataToAdvertise());
localData.setLoadManagerClassName(this.pulsar.getConfig().getLoadManagerClassName());
- String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+ String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + brokerId;
try {
log.info("Acquiring broker resource lock on {}", brokerReportPath);
@@ -129,12 +128,12 @@ public void disableBroker() throws Exception {
@Override
public Set getAvailableBrokers() throws Exception {
- return Collections.singleton(lookupServiceAddress);
+ return Collections.singleton(brokerId);
}
@Override
public CompletableFuture> getAvailableBrokersAsync() {
- return CompletableFuture.completedFuture(Collections.singleton(lookupServiceAddress));
+ return CompletableFuture.completedFuture(Collections.singleton(brokerId));
}
@Override
@@ -153,7 +152,6 @@ public String setNamespaceBundleAffinity(String bundle, String broker) {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
- broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
index ef4dd2a97b280..c28a8be4c0d3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
@@ -23,8 +23,6 @@
*/
public interface ResourceUnit extends Comparable {
- String PROPERTY_KEY_BROKER_ZNODE_NAME = "__advertised_addr";
-
String getResourceId();
ResourceDescription getAvailableResource();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 921ce35b5c65e..18e30ddf922d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -82,9 +82,9 @@ public BrokerRegistryImpl(PulsarService pulsar) {
this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
- this.brokerId = pulsar.getLookupServiceAddress();
+ this.brokerId = pulsar.getBrokerId();
this.brokerLookupData = new BrokerLookupData(
- pulsar.getSafeWebServiceAddress(),
+ pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 4ebf537f7a8a8..6a0e677c66268 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -21,31 +21,40 @@
import static java.lang.String.format;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
+import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
+import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
@@ -78,6 +87,9 @@
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -86,7 +98,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.slf4j.Logger;
@@ -107,6 +118,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;
+ public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;
+
private static final String ELECTION_ROOT = "/loadbalance/extension/leader";
private PulsarService pulsar;
@@ -143,6 +156,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@Getter
private final List brokerFilterPipeline;
+
/**
* The load data reporter.
*/
@@ -160,7 +174,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
private SplitManager splitManager;
- private boolean started = false;
+ private volatile boolean started = false;
+
+ private boolean configuredSystemTopics = false;
private final AssignCounter assignCounter = new AssignCounter();
@Getter
@@ -168,16 +184,66 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
private final SplitCounter splitCounter = new SplitCounter();
// record unload metrics
- private final AtomicReference> unloadMetrics = new AtomicReference();
+ private final AtomicReference> unloadMetrics = new AtomicReference<>();
// record split metrics
private final AtomicReference> splitMetrics = new AtomicReference<>();
- private final ConcurrentOpenHashMap>>
- lookupRequests = ConcurrentOpenHashMap.>>newBuilder()
- .build();
+ private final ConcurrentHashMap>>
+ lookupRequests = new ConcurrentHashMap<>();
private final CountDownLatch initWaiter = new CountDownLatch(1);
+ /**
+ * Get all the bundles that are owned by this broker.
+ */
+ public Set getOwnedServiceUnits() {
+ if (!started) {
+ log.warn("Failed to get owned service units, load manager is not started.");
+ return Collections.emptySet();
+ }
+ Set> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
+ String brokerId = brokerRegistry.getBrokerId();
+ Set ownedServiceUnits = entrySet.stream()
+ .filter(entry -> {
+ var stateData = entry.getValue();
+ return stateData.state() == ServiceUnitState.Owned
+ && StringUtils.isNotBlank(stateData.dstBroker())
+ && stateData.dstBroker().equals(brokerId);
+ }).map(entry -> {
+ var bundle = entry.getKey();
+ return getNamespaceBundle(pulsar, bundle);
+ }).collect(Collectors.toSet());
+ // Add heartbeat and SLA monitor namespace bundle.
+ NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration());
+ try {
+ NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(heartbeatNamespace);
+ ownedServiceUnits.add(fullBundle);
+ } catch (Exception e) {
+ log.warn("Failed to get heartbeat namespace bundle.", e);
+ }
+ NamespaceName heartbeatNamespaceV2 = NamespaceService
+ .getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration());
+ try {
+ NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(heartbeatNamespaceV2);
+ ownedServiceUnits.add(fullBundle);
+ } catch (Exception e) {
+ log.warn("Failed to get heartbeat namespace V2 bundle.", e);
+ }
+
+ NamespaceName slaMonitorNamespace = NamespaceService
+ .getSLAMonitorNamespace(brokerId, pulsar.getConfiguration());
+ try {
+ NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(slaMonitorNamespace);
+ ownedServiceUnits.add(fullBundle);
+ } catch (Exception e) {
+ log.warn("Failed to get SLA Monitor namespace bundle.", e);
+ }
+
+ return ownedServiceUnits;
+ }
+
public enum Role {
Leader,
Follower
@@ -201,6 +267,10 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}
+ public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
+ return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerWrapper;
+ }
+
public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
if (!(loadManager instanceof ExtensibleLoadManagerWrapper loadManagerWrapper)) {
throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'.");
@@ -212,104 +282,151 @@ public static boolean debug(ServiceConfiguration config, Logger log) {
return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
}
+ public static void createSystemTopic(PulsarService pulsar, String topic) throws PulsarServerException {
+ try {
+ pulsar.getAdminClient().topics().createNonPartitionedTopic(topic);
+ log.info("Created topic {}.", topic);
+ } catch (PulsarAdminException.ConflictException ex) {
+ if (debug(pulsar.getConfiguration(), log)) {
+ log.info("Topic {} already exists.", topic);
+ }
+ } catch (PulsarAdminException e) {
+ throw new PulsarServerException(e);
+ }
+ }
+
+ private static void createSystemTopics(PulsarService pulsar) throws PulsarServerException {
+ createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
+ createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+ }
+
+ private static boolean configureSystemTopics(PulsarService pulsar) {
+ try {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
+ && (pulsar.getConfiguration().isSystemTopicEnabled()
+ && pulsar.getConfiguration().isTopicLevelPoliciesEnabled())) {
+ Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
+ if (threshold == null || COMPACTION_THRESHOLD != threshold.longValue()) {
+ pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD);
+ log.info("Set compaction threshold: {} bytes for system topic {}.", COMPACTION_THRESHOLD, TOPIC);
+ }
+ } else {
+ log.warn("System topic or topic level policies is disabled. "
+ + "{} compaction threshold follows the broker or namespace policies.", TOPIC);
+ }
+ return true;
+ } catch (Exception e) {
+ log.error("Failed to set compaction threshold for system topic:{}", TOPIC, e);
+ }
+ return false;
+ }
+
@Override
public void start() throws PulsarServerException {
if (this.started) {
return;
}
- this.brokerRegistry = new BrokerRegistryImpl(pulsar);
- this.leaderElectionService = new LeaderElectionService(
- pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
- state -> {
- pulsar.getLoadManagerExecutor().execute(() -> {
- if (state == LeaderElectionState.Leading) {
- playLeader();
- } else {
- playFollower();
- }
+ try {
+ this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+ this.leaderElectionService = new LeaderElectionService(
+ pulsar.getCoordinationService(), pulsar.getBrokerId(),
+ pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
+ state -> {
+ pulsar.getLoadManagerExecutor().execute(() -> {
+ if (state == LeaderElectionState.Leading) {
+ playLeader();
+ } else {
+ playFollower();
+ }
+ });
});
- });
- this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
- this.brokerRegistry.start();
- this.splitManager = new SplitManager(splitCounter);
- this.unloadManager = new UnloadManager(unloadCounter);
- this.serviceUnitStateChannel.listen(unloadManager);
- this.serviceUnitStateChannel.listen(splitManager);
- this.leaderElectionService.start();
- this.serviceUnitStateChannel.start();
- this.antiAffinityGroupPolicyHelper =
- new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel);
- antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
- this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
- this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
- SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar);
- this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
- this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
+ this.serviceUnitStateChannel = ServiceUnitStateChannelImpl.newInstance(pulsar);
+ this.brokerRegistry.start();
+ this.splitManager = new SplitManager(splitCounter);
+ this.unloadManager = new UnloadManager(unloadCounter);
+ this.serviceUnitStateChannel.listen(unloadManager);
+ this.serviceUnitStateChannel.listen(splitManager);
+ this.leaderElectionService.start();
+ this.serviceUnitStateChannel.start();
+ this.antiAffinityGroupPolicyHelper =
+ new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel);
+ antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
+ this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
+ this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
+ SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar);
+ this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
+ this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
- try {
- this.brokerLoadDataStore = LoadDataStoreFactory
- .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
- this.brokerLoadDataStore.startTableView();
- this.topBundlesLoadDataStore = LoadDataStoreFactory
- .create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
- } catch (LoadDataStoreException e) {
- throw new PulsarServerException(e);
- }
+ try {
+ this.brokerLoadDataStore = LoadDataStoreFactory
+ .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
+ this.topBundlesLoadDataStore = LoadDataStoreFactory
+ .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
+ } catch (LoadDataStoreException e) {
+ throw new PulsarServerException(e);
+ }
- this.context = LoadManagerContextImpl.builder()
- .configuration(conf)
- .brokerRegistry(brokerRegistry)
- .brokerLoadDataStore(brokerLoadDataStore)
- .topBundleLoadDataStore(topBundlesLoadDataStore).build();
-
- this.brokerLoadDataReporter =
- new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore);
-
- this.topBundleLoadDataReporter =
- new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore);
- this.serviceUnitStateChannel.listen(brokerLoadDataReporter);
- this.serviceUnitStateChannel.listen(topBundleLoadDataReporter);
- var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
- this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
- .scheduleAtFixedRate(() -> {
- try {
- brokerLoadDataReporter.reportAsync(false);
- // TODO: update broker load metrics using getLocalData
- } catch (Throwable e) {
- log.error("Failed to run the broker load manager executor job.", e);
- }
- },
- interval,
- interval, TimeUnit.MILLISECONDS);
-
- this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
- .scheduleAtFixedRate(() -> {
- try {
- // TODO: consider excluding the bundles that are in the process of split.
- topBundleLoadDataReporter.reportAsync(false);
- } catch (Throwable e) {
- log.error("Failed to run the top bundles load manager executor job.", e);
- }
- },
- interval,
- interval, TimeUnit.MILLISECONDS);
-
- this.monitorTask = this.pulsar.getLoadManagerExecutor()
- .scheduleAtFixedRate(() -> {
- monitor();
- },
- MONITOR_INTERVAL_IN_MILLIS,
- MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
-
- this.unloadScheduler = new UnloadScheduler(
- pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context,
- serviceUnitStateChannel, unloadCounter, unloadMetrics);
- this.unloadScheduler.start();
- this.splitScheduler = new SplitScheduler(
- pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
- this.splitScheduler.start();
- this.initWaiter.countDown();
- this.started = true;
+ this.context = LoadManagerContextImpl.builder()
+ .configuration(conf)
+ .brokerRegistry(brokerRegistry)
+ .brokerLoadDataStore(brokerLoadDataStore)
+ .topBundleLoadDataStore(topBundlesLoadDataStore).build();
+
+ this.brokerLoadDataReporter =
+ new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore);
+
+ this.topBundleLoadDataReporter =
+ new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore);
+ this.serviceUnitStateChannel.listen(brokerLoadDataReporter);
+ this.serviceUnitStateChannel.listen(topBundleLoadDataReporter);
+ var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
+ this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ try {
+ brokerLoadDataReporter.reportAsync(false);
+ // TODO: update broker load metrics using getLocalData
+ } catch (Throwable e) {
+ log.error("Failed to run the broker load manager executor job.", e);
+ }
+ },
+ interval,
+ interval, TimeUnit.MILLISECONDS);
+
+ this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ try {
+ // TODO: consider excluding the bundles that are in the process of split.
+ topBundleLoadDataReporter.reportAsync(false);
+ } catch (Throwable e) {
+ log.error("Failed to run the top bundles load manager executor job.", e);
+ }
+ },
+ interval,
+ interval, TimeUnit.MILLISECONDS);
+
+ this.monitorTask = this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ monitor();
+ },
+ MONITOR_INTERVAL_IN_MILLIS,
+ MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
+
+ this.unloadScheduler = new UnloadScheduler(
+ pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context,
+ serviceUnitStateChannel, unloadCounter, unloadMetrics);
+ this.splitScheduler = new SplitScheduler(
+ pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
+ this.splitScheduler.start();
+ this.initWaiter.countDown();
+ this.started = true;
+ log.info("Started load manager.");
+ } catch (Exception ex) {
+ log.error("Failed to start the extensible load balance and close broker registry {}.",
+ this.brokerRegistry, ex);
+ if (this.brokerRegistry != null) {
+ brokerRegistry.close();
+ }
+ }
}
@Override
@@ -324,89 +441,167 @@ public CompletableFuture> assign(Optional> future = lookupRequests.computeIfAbsent(bundle, k -> {
+ return dedupeLookupRequest(bundle, k -> {
final CompletableFuture> owner;
// Assign the bundle to channel owner if is internal topic, to avoid circular references.
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
owner = serviceUnitStateChannel.getChannelOwnerAsync();
} else {
- owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
- // If the bundle not assign yet, select and publish assign event to channel.
- if (broker.isEmpty()) {
- return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
- if (brokerOpt.isPresent()) {
- assignCounter.incrementSuccess();
- log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
- return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
- .thenApply(Optional::of);
- } else {
- throw new IllegalStateException(
- "Failed to select the new owner broker for bundle: " + bundle);
- }
- });
+ String candidateBrokerId = getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
+ if (candidateBrokerId != null) {
+ owner = CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
+ } else {
+ owner = getOrSelectOwnerAsync(serviceUnit, bundle).thenApply(Optional::ofNullable);
+ }
+ }
+ return getBrokerLookupData(owner, bundle);
+ });
+ }
+
+ private String getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) {
+ // Check if this is Heartbeat or SLAMonitor namespace
+ String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit);
+ if (candidateBroker == null) {
+ candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit);
+ }
+ if (candidateBroker == null) {
+ candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit);
+ }
+ if (candidateBroker != null) {
+ return candidateBroker.substring(candidateBroker.lastIndexOf('/') + 1);
+ }
+ return candidateBroker;
+ }
+
+ private CompletableFuture getOrSelectOwnerAsync(ServiceUnitId serviceUnit,
+ String bundle) {
+ return serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+ // If the bundle not assign yet, select and publish assign event to channel.
+ if (broker.isEmpty()) {
+ return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+ if (brokerOpt.isPresent()) {
+ assignCounter.incrementSuccess();
+ log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
+ return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
}
- assignCounter.incrementSkip();
- // Already assigned, return it.
- return CompletableFuture.completedFuture(broker);
+ throw new IllegalStateException(
+ "Failed to select the new owner broker for bundle: " + bundle);
});
}
-
- return owner.thenCompose(broker -> {
- if (broker.isEmpty()) {
- String errorMsg = String.format(
- "Failed to get or assign the owner for bundle:%s", bundle);
- log.error(errorMsg);
- throw new IllegalStateException(errorMsg);
- }
- return CompletableFuture.completedFuture(broker.get());
- }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
- if (brokerLookupData.isEmpty()) {
- String errorMsg = String.format(
- "Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
- log.error(errorMsg);
- throw new IllegalStateException(errorMsg);
- }
- return CompletableFuture.completedFuture(brokerLookupData);
- }));
+ assignCounter.incrementSkip();
+ // Already assigned, return it.
+ return CompletableFuture.completedFuture(broker.get());
});
- future.whenComplete((r, t) -> {
- if (t != null) {
+ }
+
+ private CompletableFuture> getBrokerLookupData(
+ CompletableFuture> owner,
+ String bundle) {
+ return owner.thenCompose(broker -> {
+ if (broker.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to get or assign the owner for bundle:%s", bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return CompletableFuture.completedFuture(broker.get());
+ }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
+ if (brokerLookupData.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to lookup broker:%s for bundle:%s, the broker has not been registered.",
+ broker, bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return CompletableFuture.completedFuture(brokerLookupData);
+ }));
+ }
+
+ /**
+ * Method to get the current owner of the NamespaceBundle
+ * or set the local broker as the owner if absent.
+ *
+ * @param namespaceBundle the NamespaceBundle
+ * @return The ephemeral node data showing the current ownership info in ServiceUnitStateChannel
+ */
+ public CompletableFuture tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
+ log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, brokerRegistry.getBrokerId());
+ final String bundle = namespaceBundle.toString();
+ return assign(Optional.empty(), namespaceBundle)
+ .thenApply(brokerLookupData -> {
+ if (brokerLookupData.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to get the broker lookup data for bundle:%s", bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return brokerLookupData.get().toNamespaceEphemeralData();
+ });
+ }
+
+ private CompletableFuture> dedupeLookupRequest(
+ String key, Function>> provider) {
+ final MutableObject>> newFutureCreated = new MutableObject<>();
+ try {
+ return lookupRequests.computeIfAbsent(key, k -> {
+ CompletableFuture> future = provider.apply(k);
+ newFutureCreated.setValue(future);
+ return future;
+ });
+ } finally {
+ if (newFutureCreated.getValue() != null) {
+ newFutureCreated.getValue().whenComplete((v, ex) -> {
+ if (ex != null) {
assignCounter.incrementFailure();
}
- lookupRequests.remove(bundle);
- }
- );
- return future;
+ lookupRequests.remove(key);
+ });
+ }
+ }
}
public CompletableFuture> selectAsync(ServiceUnitId bundle) {
+ return selectAsync(bundle, Collections.emptySet());
+ }
+
+ public CompletableFuture> selectAsync(ServiceUnitId bundle,
+ Set excludeBrokerSet) {
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
- .thenCompose(availableBrokers -> {
+ .thenComposeAsync(availableBrokers -> {
LoadManagerContext context = this.getContext();
- Map availableBrokerCandidates = new HashMap<>(availableBrokers);
+ Map availableBrokerCandidates = new ConcurrentHashMap<>(availableBrokers);
+ if (!excludeBrokerSet.isEmpty()) {
+ for (String exclude : excludeBrokerSet) {
+ availableBrokerCandidates.remove(exclude);
+ }
+ }
// Filter out brokers that do not meet the rules.
List filterPipeline = getBrokerFilterPipeline();
+ ArrayList>> futures =
+ new ArrayList<>(filterPipeline.size());
for (final BrokerFilter filter : filterPipeline) {
- try {
- filter.filter(availableBrokerCandidates, bundle, context);
- // Preserve the filter successes result.
- availableBrokers.keySet().retainAll(availableBrokerCandidates.keySet());
- } catch (BrokerFilterException e) {
+ CompletableFuture