Skip to content

Commit

Permalink
[fix][broker] Support lookup options for extensible load manager (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 authored and heesung-sn committed Jun 26, 2024
1 parent f0afe34 commit cf5915e
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.Reflections;
Expand Down Expand Up @@ -63,7 +64,7 @@ public interface LoadManager {
Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;

default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
Optional<ServiceUnitId> topic, ServiceUnitId bundle, LookupOptions options) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ public interface ExtensibleLoadManager extends Closeable {
* (e.g. {@link NamespaceService#internalGetWebServiceUrl(NamespaceBundle, LookupOptions)}),
* So the topic is optional.
* @param serviceUnit service unit (e.g. bundle).
* @param options The lookup options.
* @return The broker lookup data.
*/
CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);
CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
ServiceUnitId serviceUnit,
LookupOptions options);

/**
* Check the incoming service unit is owned by the current broker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -496,7 +497,8 @@ public void initialize(PulsarService pulsar) {

@Override
public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
ServiceUnitId serviceUnit) {
ServiceUnitId serviceUnit,
LookupOptions options) {

final String bundle = serviceUnit.toString();

Expand All @@ -510,7 +512,7 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
if (candidateBrokerId != null) {
return CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
}
return getOrSelectOwnerAsync(serviceUnit, bundle).thenApply(Optional::ofNullable);
return getOrSelectOwnerAsync(serviceUnit, bundle, options).thenApply(Optional::ofNullable);
});
}
return getBrokerLookupData(owner, bundle);
Expand All @@ -523,18 +525,18 @@ private CompletableFuture<String> getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId
}

private CompletableFuture<String> getOrSelectOwnerAsync(ServiceUnitId serviceUnit,
String bundle) {
String bundle,
LookupOptions options) {
return serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
// If the bundle not assign yet, select and publish assign event to channel.
if (broker.isEmpty()) {
return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
return this.selectAsync(serviceUnit, Collections.emptySet(), options).thenCompose(brokerOpt -> {
if (brokerOpt.isPresent()) {
assignCounter.incrementSuccess();
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
}
throw new IllegalStateException(
"Failed to select the new owner broker for bundle: " + bundle);
return CompletableFuture.completedFuture(null);
});
}
assignCounter.incrementSkip();
Expand All @@ -548,22 +550,19 @@ private CompletableFuture<Optional<BrokerLookupData>> getBrokerLookupData(
String bundle) {
return owner.thenCompose(broker -> {
if (broker.isEmpty()) {
String errorMsg = String.format(
"Failed to get or assign the owner for bundle:%s", bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(broker.get());
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
"Failed to lookup broker:%s for bundle:%s, the broker has not been registered.",
broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
return CompletableFuture.completedFuture(Optional.empty());
}
return CompletableFuture.completedFuture(brokerLookupData);
}));
return this.getBrokerRegistry().lookupAsync(broker.get()).thenCompose(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
"Failed to lookup broker:%s for bundle:%s, the broker has not been registered.",
broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(brokerLookupData);
});
});
}

/**
Expand All @@ -576,7 +575,7 @@ private CompletableFuture<Optional<BrokerLookupData>> getBrokerLookupData(
public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, brokerRegistry.getBrokerId());
final String bundle = namespaceBundle.toString();
return assign(Optional.empty(), namespaceBundle)
return assign(Optional.empty(), namespaceBundle, LookupOptions.builder().readOnly(false).build())
.thenApply(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
Expand Down Expand Up @@ -609,12 +608,12 @@ private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
}
}

public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
return selectAsync(bundle, Collections.emptySet());
}

public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle,
Set<String> excludeBrokerSet) {
Set<String> excludeBrokerSet,
LookupOptions options) {
if (options.isReadOnly()) {
return CompletableFuture.completedFuture(Optional.empty());
}
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenComposeAsync(availableBrokers -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;

public class ExtensibleLoadManagerWrapper implements LoadManager {
Expand Down Expand Up @@ -62,9 +63,15 @@ public boolean isCentralized() {

@Override
public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
return loadManager.assign(topic, bundle)
.thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult));
Optional<ServiceUnitId> topic, ServiceUnitId bundle, LookupOptions options) {
return loadManager.assign(topic, bundle, options)
.thenApply(lookupData -> lookupData.map(data -> {
try {
return data.toLookupResult(options);
} catch (PulsarServerException ex) {
throw FutureUtil.wrapToCompletionException(ex);
}
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.CompressionType;
Expand Down Expand Up @@ -1456,7 +1457,8 @@ private synchronized void doCleanup(String broker) {
private Optional<String> selectBroker(String serviceUnit, String inactiveBroker) {
try {
return loadManager.selectAsync(
LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit), Set.of(inactiveBroker))
LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit),
Set.of(inactiveBroker), LookupOptions.builder().build())
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
} catch (Throwable e) {
log.error("Failed to select a broker for serviceUnit:{}", serviceUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.data;

import java.net.URI;
import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
Expand Down Expand Up @@ -79,7 +82,19 @@ public long getStartTimestamp() {
return this.startTimestamp;
}

public LookupResult toLookupResult() {
public LookupResult toLookupResult(LookupOptions options) throws PulsarServerException {
if (options.hasAdvertisedListenerName()) {
AdvertisedListener listener = advertisedListeners.get(options.getAdvertisedListenerName());
if (listener == null) {
throw new PulsarServerException("the broker do not have "
+ options.getAdvertisedListenerName() + " listener");
}
URI url = listener.getBrokerServiceUrl();
URI urlTls = listener.getBrokerServiceUrlTls();
return new LookupResult(webServiceUrl, webServiceUrlTls,
url == null ? null : url.toString(),
urlTls == null ? null : urlTls.toString(), LookupResult.Type.BrokerUrl, false);
}
return new LookupResult(webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls,
LookupResult.Type.BrokerUrl, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicN
return CompletableFuture.completedFuture(optResult);
}
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle);
return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle, options);
} else {
// TODO: Add unit tests cover it.
return findBrokerServiceUrl(bundle, options);
Expand Down Expand Up @@ -313,7 +313,7 @@ private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(@Nullable Serv
}
CompletableFuture<Optional<LookupResult>> future =
ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
? loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle) :
? loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle, options) :
findBrokerServiceUrl(bundle, options);

return future.thenApply(lookupResult -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.AntiAffinityGroupPolicyFilter;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
Expand All @@ -61,7 +62,8 @@ protected String getLoadManagerClassName() {

protected String selectBroker(ServiceUnitId serviceUnit, Object loadManager) {
try {
return ((ExtensibleLoadManagerImpl) loadManager).assign(Optional.empty(), serviceUnit).get()
return ((ExtensibleLoadManagerImpl) loadManager)
.assign(Optional.empty(), serviceUnit, LookupOptions.builder().build()).get()
.get().getPulsarServiceUrl();
} catch (Throwable e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import com.google.common.collect.Sets;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -76,6 +78,8 @@ protected ServiceConfiguration updateConfig(ServiceConfiguration conf) {
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
conf.setLoadBalancerSheddingEnabled(false);
conf.setLoadBalancerDebugModeEnabled(true);
conf.setWebServicePortTls(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
return conf;
}

Expand Down
Loading

0 comments on commit cf5915e

Please sign in to comment.