Skip to content

Commit

Permalink
[fix][broker] Fix lookup heartbeat and sla namespace bundle when usin…
Browse files Browse the repository at this point in the history
…g extensible load manager (apache#21213)
  • Loading branch information
Demogorgon314 authored Oct 8, 2023
1 parent d9ebaf5 commit f85e0dc
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 263 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,7 @@ protected void startLeaderElectionService() {
protected void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it
NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config);
NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getLookupServiceAddress(), config);
if (!this.pulsarResources.getNamespaceResources().namespaceExists(nsName)) {
LOG.info("SLA Namespace = {} doesn't exist.", nsName);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public Map<String, BundleData> getBundleData() {

public Map<String, BundleData> getBundleDataForLoadShedding() {
return bundleData.entrySet().stream()
.filter(e -> !NamespaceService.filterNamespaceForShedding(
.filter(e -> !NamespaceService.isSLAOrHeartbeatNamespace(
NamespaceBundle.getBundleNamespace(e.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
Expand Down Expand Up @@ -94,7 +92,6 @@
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
Expand Down Expand Up @@ -1216,48 +1213,19 @@ private synchronized void doCleanup(String broker) {
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
String heartbeatNamespace =
NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), broker)).toString();
String heartbeatNamespaceV2 =
NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, broker)).toString();

Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<>();
for (var etr : tableview.entrySet()) {
var stateData = etr.getValue();
var serviceUnit = etr.getKey();
var state = state(stateData);
if (StringUtils.equals(broker, stateData.dstBroker())) {
if (isActiveState(state)) {
if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
orphanSystemServiceUnits.put(serviceUnit, stateData);
} else if (serviceUnit.startsWith(heartbeatNamespace)
|| serviceUnit.startsWith(heartbeatNamespaceV2)) {
// Skip the heartbeat namespace
log.info("Skip override heartbeat namespace bundle"
+ " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
if (e != null) {
log.error("Failed cleaning the heartbeat namespace ownership serviceUnit:{}, "
+ "stateData:{}, cleanupErrorCnt:{}.",
serviceUnit, stateData,
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
}
});
} else {
overrideOwnership(serviceUnit, stateData, broker);
}
orphanServiceUnitCleanupCnt++;
}

} else if (StringUtils.equals(broker, stateData.sourceBroker())) {
if (isInFlightState(state)) {
if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
orphanSystemServiceUnits.put(serviceUnit, stateData);
} else {
overrideOwnership(serviceUnit, stateData, broker);
}
orphanServiceUnitCleanupCnt++;
if (StringUtils.equals(broker, stateData.dstBroker()) && isActiveState(state)
|| StringUtils.equals(broker, stateData.sourceBroker()) && isInFlightState(state)) {
if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
orphanSystemServiceUnits.put(serviceUnit, stateData);
} else {
overrideOwnership(serviceUnit, stateData, broker);
}
orphanServiceUnitCleanupCnt++;
}
}

Expand Down Expand Up @@ -1401,16 +1369,21 @@ protected void monitorOwnerships(List<String> brokers) {
String srcBroker = stateData.sourceBroker();
var state = stateData.state();

if (isActiveState(state)) {
if (StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) {
inactiveBrokers.add(srcBroker);
} else if (StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) {
inactiveBrokers.add(dstBroker);
} else if (isInFlightState(state)
&& now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) {
orphanServiceUnits.put(serviceUnit, stateData);
}
} else if (now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) {
if (isActiveState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) {
inactiveBrokers.add(srcBroker);
continue;
}
if (isActiveState(state) && StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) {
inactiveBrokers.add(dstBroker);
continue;
}
if (isActiveState(state) && isInFlightState(state)
&& now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) {
orphanServiceUnits.put(serviceUnit, stateData);
continue;
}

if (now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) {
log.info("Found semi-terminal states to tombstone"
+ " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
Expand Down Expand Up @@ -70,7 +72,8 @@ public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) {
pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
for (var etr : bundleStats.entrySet()) {
String bundle = etr.getKey();
if (bundle.startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) {
// TODO: do not filter system topic while shedding
if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) {
continue;
}
if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled && hasPolicies(bundle)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public class NamespaceService implements AutoCloseable {
public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s";
public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s:%s";
public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s";

private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> namespaceClients;

Expand Down Expand Up @@ -189,7 +189,7 @@ public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicN
CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
.thenCompose(bundle -> {
// Do redirection if the cluster is in rollback or deploying.
return redirectManager.findRedirectLookupResultAsync().thenCompose(optResult -> {
return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
if (optResult.isPresent()) {
LOG.info("[{}] Redirect lookup request to {} for topic {}",
pulsar.getSafeWebServiceAddress(), optResult.get(), topic);
Expand Down Expand Up @@ -221,6 +221,13 @@ public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicN
return future;
}

private CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(ServiceUnitId bundle) {
if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
return CompletableFuture.completedFuture(Optional.empty());
}
return redirectManager.findRedirectLookupResultAsync();
}

public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
return bundleFactory.getBundlesAsync(topic.getNamespaceObject())
.thenApply(bundles -> bundles.findBundle(topic));
Expand Down Expand Up @@ -288,8 +295,7 @@ public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions option
private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(@Nullable ServiceUnitId topic,
NamespaceBundle bundle,
LookupOptions options) {

return redirectManager.findRedirectLookupResultAsync().thenCompose(optResult -> {
return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
if (optResult.isPresent()) {
LOG.info("[{}] Redirect lookup request to {} for topic {}",
pulsar.getSafeWebServiceAddress(), optResult.get(), topic);
Expand Down Expand Up @@ -695,7 +701,7 @@ public CompletableFuture<LookupResult> createLookupResult(String candidateBroker
return lookupFuture;
}

private boolean isBrokerActive(String candidateBroker) {
public boolean isBrokerActive(String candidateBroker) {
String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
Set<String> availableBrokers = getAvailableBrokers();
if (availableBrokers.contains(candidateBrokerHostAndPort)) {
Expand Down Expand Up @@ -1564,7 +1570,7 @@ public CompletableFuture<Boolean> checkOwnershipPresentAsync(NamespaceBundle bun
}

public void unloadSLANamespace() throws Exception {
NamespaceName namespaceName = getSLAMonitorNamespace(host, config);
NamespaceName namespaceName = getSLAMonitorNamespace(pulsar.getLookupServiceAddress(), config);

LOG.info("Checking owner for SLA namespace {}", namespaceName);

Expand All @@ -1589,14 +1595,8 @@ public static NamespaceName getHeartbeatNamespaceV2(String lookupBroker, Service
return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, lookupBroker));
}

public static NamespaceName getSLAMonitorNamespace(String host, ServiceConfiguration config) {
Integer port = null;
if (config.getWebServicePortTls().isPresent()) {
port = config.getWebServicePortTls().get();
} else if (config.getWebServicePort().isPresent()) {
port = config.getWebServicePort().get();
}
return NamespaceName.get(String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, port));
public static NamespaceName getSLAMonitorNamespace(String lookupBroker, ServiceConfiguration config) {
return NamespaceName.get(String.format(SLA_NAMESPACE_FMT, config.getClusterName(), lookupBroker));
}

public static String checkHeartbeatNamespace(ServiceUnitId ns) {
Expand Down Expand Up @@ -1640,7 +1640,7 @@ public static boolean isSystemServiceNamespace(String namespace) {
* @param namespace the namespace name
* @return True if namespace is HEARTBEAT_NAMESPACE or SLA_NAMESPACE
*/
public static boolean filterNamespaceForShedding(String namespace) {
public static boolean isSLAOrHeartbeatNamespace(String namespace) {
return SLA_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
Expand All @@ -1653,14 +1653,16 @@ public static boolean isHeartbeatNamespace(ServiceUnitId ns) {
}

public boolean registerSLANamespace() throws PulsarServerException {
boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false);
String lookupServiceAddress = pulsar.getLookupServiceAddress();
boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(lookupServiceAddress, config), false);
if (isNameSpaceRegistered) {
if (LOG.isDebugEnabled()) {
LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}",
getSLAMonitorNamespace(host, config));
getSLAMonitorNamespace(lookupServiceAddress, config));
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("SLA Monitoring not owned by the broker: ns={}", getSLAMonitorNamespace(host, config));
LOG.debug("SLA Monitoring not owned by the broker: ns={}",
getSLAMonitorNamespace(lookupServiceAddress, config));
}
return isNameSpaceRegistered;
}
Expand Down
Loading

0 comments on commit f85e0dc

Please sign in to comment.