diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index 794b5399c99c9..1083fc8737d8f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -373,8 +373,8 @@ public static void createTenantIfAbsent(PulsarResources resources, String tenant } } - static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName, - String cluster, int bundleNumber) throws IOException { + public static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName, + String cluster, int bundleNumber) throws IOException { NamespaceResources namespaceResources = resources.getNamespaceResources(); if (!namespaceResources.namespaceExists(namespaceName)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 1a45bedfce426..47842e4722e5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -277,6 +277,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final ExecutorProvider transactionExecutorProvider; private final DefaultMonotonicSnapshotClock monotonicSnapshotClock; private String brokerId; + private final CompletableFuture readyForIncomingRequestsFuture = new CompletableFuture<>(); public enum State { Init, Started, Closing, Closed @@ -904,6 +905,9 @@ public void start() throws PulsarServerException { this.metricsGenerator = new MetricsGenerator(this); + // the broker is ready to accept incoming requests by Pulsar binary protocol and http/https + readyForIncomingRequestsFuture.complete(null); + // Initialize the message protocol handlers. // start the protocol handlers only after the broker is ready, // so that the protocol handlers can access broker service properly. @@ -952,12 +956,22 @@ public void start() throws PulsarServerException { state = State.Started; } catch (Exception e) { LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e); - throw new PulsarServerException(e); + PulsarServerException startException = new PulsarServerException(e); + readyForIncomingRequestsFuture.completeExceptionally(startException); + throw startException; } finally { mutex.unlock(); } } + public void runWhenReadyForIncomingRequests(Runnable runnable) { + readyForIncomingRequestsFuture.thenRun(runnable); + } + + public void waitUntilReadyForIncomingRequests() throws ExecutionException, InterruptedException { + readyForIncomingRequestsFuture.get(); + } + protected BrokerInterceptor newBrokerInterceptor() throws IOException { return BrokerInterceptors.load(config); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index 2cce68b60cb49..0dd5d948480ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -31,6 +31,7 @@ import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.broker.lookup.LookupResult; +import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.Reflections; @@ -63,7 +64,7 @@ public interface LoadManager { Optional getLeastLoaded(ServiceUnitId su) throws Exception; default CompletableFuture> findBrokerServiceUrl( - Optional topic, ServiceUnitId bundle) { + Optional topic, ServiceUnitId bundle, LookupOptions options) { throw new UnsupportedOperationException(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java index b7da70d1cf1de..eabf6005b439b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java @@ -60,9 +60,12 @@ public interface ExtensibleLoadManager extends Closeable { * (e.g. {@link NamespaceService#internalGetWebServiceUrl(NamespaceBundle, LookupOptions)}), * So the topic is optional. * @param serviceUnit service unit (e.g. bundle). + * @param options The lookup options. * @return The broker lookup data. */ - CompletableFuture> assign(Optional topic, ServiceUnitId serviceUnit); + CompletableFuture> assign(Optional topic, + ServiceUnitId serviceUnit, + LookupOptions options); /** * Check the incoming service unit is owned by the current broker. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index b35143c5d81d1..2b657f8cb9cba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -36,7 +36,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -88,6 +87,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies; +import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -165,10 +165,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private TopBundleLoadDataReporter topBundleLoadDataReporter; - private ScheduledFuture brokerLoadDataReportTask; - private ScheduledFuture topBundlesLoadDataReportTask; + private volatile ScheduledFuture brokerLoadDataReportTask; + private volatile ScheduledFuture topBundlesLoadDataReportTask; - private ScheduledFuture monitorTask; + private volatile ScheduledFuture monitorTask; private SplitScheduler splitScheduler; private UnloadManager unloadManager; @@ -195,7 +195,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private final ConcurrentHashMap>> lookupRequests = new ConcurrentHashMap<>(); - private final CountDownLatch initWaiter = new CountDownLatch(1); + private final CompletableFuture initWaiter = new CompletableFuture<>(); /** * Get all the bundles that are owned by this broker. @@ -366,12 +366,14 @@ public void start() throws PulsarServerException { pulsar.getCoordinationService(), pulsar.getBrokerId(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT, state -> { - pulsar.getLoadManagerExecutor().execute(() -> { - if (state == LeaderElectionState.Leading) { - playLeader(); - } else { - playFollower(); - } + pulsar.runWhenReadyForIncomingRequests(() -> { + pulsar.getLoadManagerExecutor().execute(() -> { + if (state == LeaderElectionState.Leading) { + playLeader(); + } else { + playFollower(); + } + }); }); }); this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); @@ -381,7 +383,13 @@ public void start() throws PulsarServerException { this.serviceUnitStateChannel.listen(unloadManager); this.serviceUnitStateChannel.listen(splitManager); this.leaderElectionService.start(); - this.serviceUnitStateChannel.start(); + pulsar.runWhenReadyForIncomingRequests(() -> { + try { + this.serviceUnitStateChannel.start(); + } catch (Exception e) { + failStarting(e); + } + }); this.antiAffinityGroupPolicyHelper = new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel); antiAffinityGroupPolicyHelper.listenFailureDomainUpdate(); @@ -413,54 +421,72 @@ public void start() throws PulsarServerException { new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore); this.serviceUnitStateChannel.listen(brokerLoadDataReporter); this.serviceUnitStateChannel.listen(topBundleLoadDataReporter); - var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); - this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor() - .scheduleAtFixedRate(() -> { - try { - brokerLoadDataReporter.reportAsync(false); - // TODO: update broker load metrics using getLocalData - } catch (Throwable e) { - log.error("Failed to run the broker load manager executor job.", e); - } - }, - interval, - interval, TimeUnit.MILLISECONDS); - - this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor() - .scheduleAtFixedRate(() -> { - try { - // TODO: consider excluding the bundles that are in the process of split. - topBundleLoadDataReporter.reportAsync(false); - } catch (Throwable e) { - log.error("Failed to run the top bundles load manager executor job.", e); - } - }, - interval, - interval, TimeUnit.MILLISECONDS); - - this.monitorTask = this.pulsar.getLoadManagerExecutor() - .scheduleAtFixedRate(() -> { - monitor(); - }, - MONITOR_INTERVAL_IN_MILLIS, - MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); this.unloadScheduler = new UnloadScheduler( pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel, unloadCounter, unloadMetrics); this.splitScheduler = new SplitScheduler( pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context); - this.splitScheduler.start(); - this.initWaiter.countDown(); - this.started = true; - log.info("Started load manager."); + + pulsar.runWhenReadyForIncomingRequests(() -> { + try { + var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); + + this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor() + .scheduleAtFixedRate(() -> { + try { + brokerLoadDataReporter.reportAsync(false); + // TODO: update broker load metrics using getLocalData + } catch (Throwable e) { + log.error("Failed to run the broker load manager executor job.", e); + } + }, + interval, + interval, TimeUnit.MILLISECONDS); + + this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor() + .scheduleAtFixedRate(() -> { + try { + // TODO: consider excluding the bundles that are in the process of split. + topBundleLoadDataReporter.reportAsync(false); + } catch (Throwable e) { + log.error("Failed to run the top bundles load manager executor job.", e); + } + }, + interval, + interval, TimeUnit.MILLISECONDS); + + this.monitorTask = this.pulsar.getLoadManagerExecutor() + .scheduleAtFixedRate(() -> { + monitor(); + }, + MONITOR_INTERVAL_IN_MILLIS, + MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); + + this.splitScheduler.start(); + this.initWaiter.complete(null); + this.started = true; + log.info("Started load manager."); + } catch (Exception ex) { + failStarting(ex); + } + }); } catch (Exception ex) { - log.error("Failed to start the extensible load balance and close broker registry {}.", - this.brokerRegistry, ex); - if (this.brokerRegistry != null) { + failStarting(ex); + } + } + + private void failStarting(Exception ex) { + log.error("Failed to start the extensible load balance and close broker registry {}.", + this.brokerRegistry, ex); + if (this.brokerRegistry != null) { + try { brokerRegistry.close(); + } catch (PulsarServerException e) { + // ignore } } + initWaiter.completeExceptionally(ex); } @Override @@ -471,7 +497,8 @@ public void initialize(PulsarService pulsar) { @Override public CompletableFuture> assign(Optional topic, - ServiceUnitId serviceUnit) { + ServiceUnitId serviceUnit, + LookupOptions options) { final String bundle = serviceUnit.toString(); @@ -481,45 +508,35 @@ public CompletableFuture> assign(Optional { + if (candidateBrokerId != null) { + return CompletableFuture.completedFuture(Optional.of(candidateBrokerId)); + } + return getOrSelectOwnerAsync(serviceUnit, bundle, options).thenApply(Optional::ofNullable); + }); } return getBrokerLookupData(owner, bundle); }); } - private String getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) { - // Check if this is Heartbeat or SLAMonitor namespace - String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit); - if (candidateBroker == null) { - candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit); - } - if (candidateBroker == null) { - candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit); - } - if (candidateBroker != null) { - return candidateBroker.substring(candidateBroker.lastIndexOf('/') + 1); - } - return candidateBroker; + private CompletableFuture getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) { + return pulsar.getNamespaceService().getHeartbeatOrSLAMonitorBrokerId(serviceUnit, + cb -> brokerRegistry.lookupAsync(cb).thenApply(Optional::isPresent)); } private CompletableFuture getOrSelectOwnerAsync(ServiceUnitId serviceUnit, - String bundle) { + String bundle, + LookupOptions options) { return serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> { // If the bundle not assign yet, select and publish assign event to channel. if (broker.isEmpty()) { - return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> { + return this.selectAsync(serviceUnit, Collections.emptySet(), options).thenCompose(brokerOpt -> { if (brokerOpt.isPresent()) { assignCounter.incrementSuccess(); log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle); return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get()); } - throw new IllegalStateException( - "Failed to select the new owner broker for bundle: " + bundle); + return CompletableFuture.completedFuture(null); }); } assignCounter.incrementSkip(); @@ -533,22 +550,19 @@ private CompletableFuture> getBrokerLookupData( String bundle) { return owner.thenCompose(broker -> { if (broker.isEmpty()) { - String errorMsg = String.format( - "Failed to get or assign the owner for bundle:%s", bundle); - log.error(errorMsg); - throw new IllegalStateException(errorMsg); - } - return CompletableFuture.completedFuture(broker.get()); - }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> { - if (brokerLookupData.isEmpty()) { - String errorMsg = String.format( - "Failed to lookup broker:%s for bundle:%s, the broker has not been registered.", - broker, bundle); - log.error(errorMsg); - throw new IllegalStateException(errorMsg); + return CompletableFuture.completedFuture(Optional.empty()); } - return CompletableFuture.completedFuture(brokerLookupData); - })); + return this.getBrokerRegistry().lookupAsync(broker.get()).thenCompose(brokerLookupData -> { + if (brokerLookupData.isEmpty()) { + String errorMsg = String.format( + "Failed to lookup broker:%s for bundle:%s, the broker has not been registered.", + broker, bundle); + log.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + return CompletableFuture.completedFuture(brokerLookupData); + }); + }); } /** @@ -561,7 +575,7 @@ private CompletableFuture> getBrokerLookupData( public CompletableFuture tryAcquiringOwnership(NamespaceBundle namespaceBundle) { log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, brokerRegistry.getBrokerId()); final String bundle = namespaceBundle.toString(); - return assign(Optional.empty(), namespaceBundle) + return assign(Optional.empty(), namespaceBundle, LookupOptions.builder().readOnly(false).build()) .thenApply(brokerLookupData -> { if (brokerLookupData.isEmpty()) { String errorMsg = String.format( @@ -594,12 +608,12 @@ private CompletableFuture> dedupeLookupRequest( } } - public CompletableFuture> selectAsync(ServiceUnitId bundle) { - return selectAsync(bundle, Collections.emptySet()); - } - public CompletableFuture> selectAsync(ServiceUnitId bundle, - Set excludeBrokerSet) { + Set excludeBrokerSet, + LookupOptions options) { + if (options.isReadOnly()) { + return CompletableFuture.completedFuture(Optional.empty()); + } BrokerRegistry brokerRegistry = getBrokerRegistry(); return brokerRegistry.getAvailableBrokerLookupDataAsync() .thenComposeAsync(availableBrokers -> { @@ -651,11 +665,12 @@ public CompletableFuture> getOwnershipAsync(Optional { + if (candidateBroker != null) { + return CompletableFuture.completedFuture(Optional.of(candidateBroker)); + } + return serviceUnitStateChannel.getOwnerAsync(bundle); + }); } public CompletableFuture> getOwnershipWithLookupDataAsync(ServiceUnitId bundleUnit) { @@ -668,7 +683,10 @@ public CompletableFuture> getOwnershipWithLookupDataA } public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, - Optional destinationBroker) { + Optional destinationBroker, + boolean force, + long timeout, + TimeUnit timeoutUnit) { if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) { log.info("Skip unloading namespace bundle: {}.", bundle); return CompletableFuture.completedFuture(null); @@ -687,11 +705,11 @@ public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, log.warn(msg); throw new IllegalArgumentException(msg); } - Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, true); + Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, force); UnloadDecision unloadDecision = new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin); return unloadAsync(unloadDecision, - conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); + timeout, timeoutUnit); }); } @@ -814,7 +832,7 @@ synchronized void playLeader() { boolean becameFollower = false; while (!Thread.currentThread().isInterrupted()) { try { - initWaiter.await(); + initWaiter.get(); if (!serviceUnitStateChannel.isChannelOwner()) { becameFollower = true; break; @@ -864,7 +882,7 @@ synchronized void playFollower() { boolean becameLeader = false; while (!Thread.currentThread().isInterrupted()) { try { - initWaiter.await(); + initWaiter.get(); if (serviceUnitStateChannel.isChannelOwner()) { becameLeader = true; break; @@ -926,7 +944,7 @@ public List getMetrics() { @VisibleForTesting protected void monitor() { try { - initWaiter.await(); + initWaiter.get(); // Monitor role // Periodically check the role in case ZK watcher fails. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java index cd1561cb70e2d..25eb27bc58d27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -28,10 +28,11 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.ResourceUnit; -import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.lookup.LookupResult; +import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; public class ExtensibleLoadManagerWrapper implements LoadManager { @@ -62,9 +63,15 @@ public boolean isCentralized() { @Override public CompletableFuture> findBrokerServiceUrl( - Optional topic, ServiceUnitId bundle) { - return loadManager.assign(topic, bundle) - .thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult)); + Optional topic, ServiceUnitId bundle, LookupOptions options) { + return loadManager.assign(topic, bundle, options) + .thenApply(lookupData -> lookupData.map(data -> { + try { + return data.toLookupResult(options); + } catch (PulsarServerException ex) { + throw FutureUtil.wrapToCompletionException(ex); + } + })); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 477a92395386c..4fa9c1db3c19e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -83,12 +83,12 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; +import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -114,7 +114,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD; private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000; private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100; - public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; public static final long VERSION_ID_INIT = 1; // initial versionId public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately @@ -299,7 +298,8 @@ public synchronized void start() throws PulsarServerException { (pulsar.getPulsarResources(), SYSTEM_NAMESPACE.getTenant(), config.getClusterName()); PulsarClusterMetadataSetup.createNamespaceIfAbsent - (pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName()); + (pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName(), + config.getDefaultNumberOfNamespaceBundles()); ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC); @@ -485,7 +485,7 @@ private CompletableFuture> getActiveOwnerAsync( String serviceUnit, ServiceUnitState state, Optional owner) { - return deferGetOwnerRequest(serviceUnit) + return dedupeGetOwnerRequest(serviceUnit) .thenCompose(newOwner -> { if (newOwner == null) { return CompletableFuture.completedFuture(null); @@ -623,7 +623,7 @@ public CompletableFuture publishAssignEventAsync(String serviceUnit, Str } EventType eventType = Assign; eventCounters.get(eventType).getTotal().incrementAndGet(); - CompletableFuture getOwnerRequest = deferGetOwnerRequest(serviceUnit); + CompletableFuture getOwnerRequest = dedupeGetOwnerRequest(serviceUnit); pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker, getNextVersionId(serviceUnit))) .whenComplete((__, ex) -> { @@ -842,6 +842,7 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) { Free, null, data.sourceBroker(), getNextVersionId(data)); unloadFuture = closeServiceUnit(serviceUnit, true); } + // If the optimized bundle unload is disabled, disconnect the clients at time of RELEASE. stateChangeListeners.notifyOnCompletion(unloadFuture .thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, next)); @@ -862,9 +863,12 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { } if (isTargetBroker(data.sourceBroker())) { - stateChangeListeners.notifyOnCompletion( - data.force() ? closeServiceUnit(serviceUnit, true) - : CompletableFuture.completedFuture(0), serviceUnit, data) + // If data.force(), try closeServiceUnit and tombstone the bundle. + CompletableFuture future = + (data.force() ? closeServiceUnit(serviceUnit, true) + .thenCompose(__ -> tombstoneAsync(serviceUnit)) + : CompletableFuture.completedFuture(0)).thenApply(__ -> null); + stateChangeListeners.notifyOnCompletion(future, serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, null)); } else { stateChangeListeners.notify(serviceUnit, data, null); @@ -876,9 +880,13 @@ private void handleDeleteEvent(String serviceUnit, ServiceUnitStateData data) { if (getOwnerRequest != null) { getOwnerRequest.completeExceptionally(new IllegalStateException(serviceUnit + "has been deleted.")); } - stateChangeListeners.notify(serviceUnit, data, null); + if (isTargetBroker(data.sourceBroker())) { - log(null, serviceUnit, data, null); + stateChangeListeners.notifyOnCompletion( + tombstoneAsync(serviceUnit), serviceUnit, data) + .whenComplete((__, e) -> log(e, serviceUnit, data, null)); + } else { + stateChangeListeners.notify(serviceUnit, data, null); } } @@ -920,44 +928,54 @@ private boolean isTargetBroker(String broker) { return broker.equals(brokerId); } - private CompletableFuture deferGetOwnerRequest(String serviceUnit) { + private CompletableFuture deferGetOwner(String serviceUnit) { + var future = new CompletableFuture().orTimeout(inFlightStateWaitingTimeInMillis, + TimeUnit.MILLISECONDS) + .exceptionally(e -> { + var ownerAfter = getOwner(serviceUnit); + log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to " + + "return the current owner:{}", + brokerId, serviceUnit, ownerAfter, e); + if (ownerAfter == null) { + throw new IllegalStateException(e); + } + return ownerAfter.orElse(null); + }); + if (debug()) { + log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit); + } + return future; + } + + private CompletableFuture dedupeGetOwnerRequest(String serviceUnit) { var requested = new MutableObject>(); try { - return getOwnerRequests - .computeIfAbsent(serviceUnit, k -> { - var ownerBefore = getOwner(serviceUnit); - if (ownerBefore != null && ownerBefore.isPresent()) { - // Here, we do a quick active check first with the computeIfAbsent lock - brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty()) - .ifPresent(__ -> requested.setValue( - CompletableFuture.completedFuture(ownerBefore.get()))); - - if (requested.getValue() != null) { - return requested.getValue(); - } - } - - - CompletableFuture future = - new CompletableFuture().orTimeout(inFlightStateWaitingTimeInMillis, - TimeUnit.MILLISECONDS) - .exceptionally(e -> { - var ownerAfter = getOwner(serviceUnit); - log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to " - + "return the current owner:{}", - brokerId, serviceUnit, ownerAfter, e); - if (ownerAfter == null) { - throw new IllegalStateException(e); - } - return ownerAfter.orElse(null); - }); - if (debug()) { - log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit); - } - requested.setValue(future); - return future; - }); + return getOwnerRequests.computeIfAbsent(serviceUnit, k -> { + var ownerBefore = getOwner(serviceUnit); + if (ownerBefore != null && ownerBefore.isPresent()) { + // Here, we do the broker active check first with the computeIfAbsent lock + requested.setValue(brokerRegistry.lookupAsync(ownerBefore.get()) + .thenCompose(brokerLookupData -> { + if (brokerLookupData.isPresent()) { + // The owner broker is active. + // Immediately return the request. + return CompletableFuture.completedFuture(ownerBefore.get()); + } else { + // The owner broker is inactive. + // The leader broker should be cleaning up the orphan service units. + // Defer this request til the leader notifies the new ownerships. + return deferGetOwner(serviceUnit); + } + })); + } else { + // The owner broker has not been declared yet. + // The ownership should be in the middle of transferring or assigning. + // Defer this request til the inflight ownership change is complete. + requested.setValue(deferGetOwner(serviceUnit)); + } + return requested.getValue(); + }); } finally { var future = requested.getValue(); if (future != null) { @@ -996,6 +1014,9 @@ private CompletableFuture closeServiceUnit(String serviceUnit, boolean if (ex != null) { log.error("Failed to close topics under bundle:{} in {} ms", bundle.toString(), unloadBundleTime, ex); + if (!disconnectClients) { + pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle); + } } else { log.info("Unloading bundle:{} with {} topics completed in {} ms", bundle, unloadedTopics, unloadBundleTime); @@ -1338,11 +1359,6 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max } } if (cleaned) { - try { - MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS); - } catch (InterruptedException e) { - log.warn("Interrupted while gracefully waiting for the cleanup convergence."); - } break; } else { try { @@ -1353,9 +1369,23 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max } } } + log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} ms", brokerId, + System.currentTimeMillis() - started); } private synchronized void doCleanup(String broker) { + try { + if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS) + .isEmpty()) { + log.error("Found the channel owner is empty. Skip the inactive broker:{}'s orphan bundle cleanup", + broker); + return; + } + } catch (Exception e) { + log.error("Failed to find the channel owner. Skip the inactive broker:{}'s orphan bundle cleanup", broker); + return; + } + long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; @@ -1377,8 +1407,8 @@ private synchronized void doCleanup(String broker) { } try { - producer.flush(); - } catch (PulsarClientException e) { + producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS); + } catch (Exception e) { log.error("Failed to flush the in-flight non-system bundle override messages.", e); } @@ -1401,8 +1431,8 @@ private synchronized void doCleanup(String broker) { } try { - producer.flush(); - } catch (PulsarClientException e) { + producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS); + } catch (Exception e) { log.error("Failed to flush the in-flight system bundle override messages.", e); } @@ -1427,7 +1457,8 @@ private synchronized void doCleanup(String broker) { private Optional selectBroker(String serviceUnit, String inactiveBroker) { try { return loadManager.selectAsync( - LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit), Set.of(inactiveBroker)) + LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit), + Set.of(inactiveBroker), LookupOptions.builder().build()) .get(inFlightStateWaitingTimeInMillis, MILLISECONDS); } catch (Throwable e) { log.error("Failed to select a broker for serviceUnit:{}", serviceUnit); @@ -1580,8 +1611,8 @@ protected void monitorOwnerships(List brokers) { } try { - producer.flush(); - } catch (PulsarClientException e) { + producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS); + } catch (Exception e) { log.error("Failed to flush the in-flight messages.", e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java index 41f5b18e321e8..50a2b70404039 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java @@ -18,9 +18,12 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.data; +import java.net.URI; import java.util.Map; import java.util.Optional; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.lookup.LookupResult; +import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; @@ -79,7 +82,19 @@ public long getStartTimestamp() { return this.startTimestamp; } - public LookupResult toLookupResult() { + public LookupResult toLookupResult(LookupOptions options) throws PulsarServerException { + if (options.hasAdvertisedListenerName()) { + AdvertisedListener listener = advertisedListeners.get(options.getAdvertisedListenerName()); + if (listener == null) { + throw new PulsarServerException("the broker do not have " + + options.getAdvertisedListenerName() + " listener"); + } + URI url = listener.getBrokerServiceUrl(); + URI urlTls = listener.getBrokerServiceUrlTls(); + return new LookupResult(webServiceUrl, webServiceUrlTls, + url == null ? null : url.toString(), + urlTls == null ? null : urlTls.toString(), LookupResult.Type.BrokerUrl, false); + } return new LookupResult(webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls, LookupResult.Type.BrokerUrl, false); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java index 71ebbc92a87db..ac21e4c624163 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java @@ -97,7 +97,7 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable return; } switch (state) { - case Deleted, Owned, Init -> this.complete(serviceUnit, t); + case Init -> this.complete(serviceUnit, t); default -> { if (log.isDebugEnabled()) { log.debug("Handling {} for service unit {}", data, serviceUnit); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java index bf9885b2a252e..a905803c95ddd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.manager; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure; @@ -93,7 +94,7 @@ public CompletableFuture waitAsync(CompletableFuture eventPubFuture, public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { ServiceUnitState state = ServiceUnitStateData.state(data); - if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) { + if ((state == Owned || state == Assigning) && StringUtils.isBlank(data.sourceBroker())) { if (log.isDebugEnabled()) { log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit); } @@ -113,7 +114,16 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable } switch (state) { - case Free, Owned -> this.complete(serviceUnit, t); + case Free -> { + if (!data.force()) { + complete(serviceUnit, t); + } + } + case Init -> { + checkArgument(data == null, "Init state must be associated with null data"); + complete(serviceUnit, t); + } + case Owned -> complete(serviceUnit, t); default -> { if (log.isDebugEnabled()) { log.debug("Handling {} for service unit {}", data, serviceUnit); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index 81cf33b4a55d2..e9289d3ccdac2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -31,7 +31,6 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; @@ -44,6 +43,7 @@ public class TableViewLoadDataStoreImpl implements LoadDataStore { private static final long LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2; + private static final long INIT_TIMEOUT_IN_SECS = 5; private volatile TableView tableView; private volatile long tableViewLastUpdateTimestamp; @@ -123,10 +123,11 @@ public synchronized void start() throws LoadDataStoreException { public synchronized void startTableView() throws LoadDataStoreException { if (tableView == null) { try { - tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create(); + tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).createAsync() + .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS); tableView.forEachAndListen((k, v) -> tableViewLastUpdateTimestamp = System.currentTimeMillis()); - } catch (PulsarClientException e) { + } catch (Exception e) { tableView = null; throw new LoadDataStoreException(e); } @@ -137,8 +138,9 @@ public synchronized void startTableView() throws LoadDataStoreException { public synchronized void startProducer() throws LoadDataStoreException { if (producer == null) { try { - producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create(); - } catch (PulsarClientException e) { + producer = client.newProducer(Schema.JSON(clazz)).topic(topic).createAsync() + .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS); + } catch (Exception e) { producer = null; throw new LoadDataStoreException(e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 6844b44419ddd..539f899393a4a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -43,6 +43,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -196,7 +197,7 @@ public CompletableFuture> getBrokerServiceUrlAsync(TopicN return CompletableFuture.completedFuture(optResult); } if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { - return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle); + return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle, options); } else { // TODO: Add unit tests cover it. return findBrokerServiceUrl(bundle, options); @@ -312,7 +313,7 @@ private CompletableFuture> internalGetWebServiceUrl(@Nullable Serv } CompletableFuture> future = ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar) - ? loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle) : + ? loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle, options) : findBrokerServiceUrl(bundle, options); return future.thenApply(lookupResult -> { @@ -496,6 +497,38 @@ private CompletableFuture> findBrokerServiceUrl( }); } + /** + * Check if this is Heartbeat or SLAMonitor namespace and return the broker id. + * + * @param serviceUnit the service unit + * @param isBrokerActive the function to check if the broker is active + * @return the broker id + */ + public CompletableFuture getHeartbeatOrSLAMonitorBrokerId( + ServiceUnitId serviceUnit, Function> isBrokerActive) { + String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit); + if (candidateBroker != null) { + return CompletableFuture.completedFuture(candidateBroker); + } + candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit); + if (candidateBroker != null) { + return CompletableFuture.completedFuture(candidateBroker); + } + candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit); + if (candidateBroker != null) { + // Check if the broker is available + final String finalCandidateBroker = candidateBroker; + return isBrokerActive.apply(candidateBroker).thenApply(isActive -> { + if (isActive) { + return finalCandidateBroker; + } else { + return null; + } + }); + } + return CompletableFuture.completedFuture(null); + } + private void searchForCandidateBroker(NamespaceBundle bundle, CompletableFuture> lookupFuture, LookupOptions options) { @@ -523,17 +556,9 @@ private void searchForCandidateBroker(NamespaceBundle bundle, try { // check if this is Heartbeat or SLAMonitor namespace - candidateBroker = checkHeartbeatNamespace(bundle); - if (candidateBroker == null) { - candidateBroker = checkHeartbeatNamespaceV2(bundle); - } - if (candidateBroker == null) { - String broker = getSLAMonitorBrokerName(bundle); - // checking if the broker is up and running - if (broker != null && isBrokerActive(broker)) { - candidateBroker = broker; - } - } + candidateBroker = getHeartbeatOrSLAMonitorBrokerId(bundle, cb -> + CompletableFuture.completedFuture(isBrokerActive(cb))) + .get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS); if (candidateBroker == null) { Optional currentLeader = pulsar.getLeaderElectionService().getCurrentLeader(); @@ -783,7 +808,7 @@ public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, boolean closeWithoutWaitingClientDisconnect) { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { return ExtensibleLoadManagerImpl.get(loadManager.get()) - .unloadNamespaceBundleAsync(bundle, destinationBroker); + .unloadNamespaceBundleAsync(bundle, destinationBroker, false, timeout, timeoutUnit); } // unload namespace bundle OwnedBundle ob = ownershipCache.getOwnedBundle(bundle); @@ -795,6 +820,11 @@ public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, } public CompletableFuture isNamespaceBundleOwned(NamespaceBundle bundle) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { + ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); + return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle) + .thenApply(Optional::isPresent); + } return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle)); } @@ -1227,7 +1257,9 @@ public CompletableFuture removeOwnedServiceUnitAsync(NamespaceBundle nsBun CompletableFuture future; if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); - future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty()); + future = extensibleLoadManager.unloadNamespaceBundleAsync( + nsBundle, Optional.empty(), true, + pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); } else { future = ownershipCache.removeOwnership(nsBundle); } @@ -1271,7 +1303,9 @@ public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener bundleOwnershipListeners.add(listener); } } - getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners)); + pulsar.runWhenReadyForIncomingRequests(() -> { + getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners)); + }); } public void addNamespaceBundleSplitListener(NamespaceBundleSplitListener... listeners) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index 5308b3c981eb4..e276ea24fed18 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import com.google.common.annotations.VisibleForTesting; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -104,6 +105,9 @@ public PulsarChannelInitializer(PulsarService pulsar, PulsarChannelOptions opts) @Override protected void initChannel(SocketChannel ch) throws Exception { + // disable auto read explicitly so that requests aren't served until auto read is enabled + // ServerCnx must enable auto read in channelActive after PulsarService is ready to accept incoming requests + ch.config().setAutoRead(false); ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, true)); if (this.enableTls) { if (this.tlsEnabledWithKeyStore) { @@ -128,7 +132,8 @@ protected void initChannel(SocketChannel ch) throws Exception { // ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling // auto-read. ch.pipeline().addLast("flowController", new FlowControlHandler()); - ServerCnx cnx = newServerCnx(pulsar, listenerName); + // using "ChannelHandler" type to workaround an IntelliJ bug that shows a false positive error + ChannelHandler cnx = newServerCnx(pulsar, listenerName); ch.pipeline().addLast("handler", cnx); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c7d68a30e7241..59a0b3deeffc7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -28,7 +28,6 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl; import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; -import static org.apache.pulsar.common.protocol.Commands.newCloseConsumer; import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; @@ -70,6 +69,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.TransactionMetadataStoreService; @@ -81,6 +81,7 @@ import org.apache.pulsar.broker.limiter.ConnectionController; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -146,6 +147,7 @@ import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.intercept.InterceptException; +import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; @@ -363,6 +365,10 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor, this); this.service.getPulsarStats().recordConnectionCreate(); cnxsPerThread.get().add(this); + service.getPulsar().runWhenReadyForIncomingRequests(() -> { + // enable auto read after PulsarService is ready to accept incoming requests + ctx.channel().config().setAutoRead(true); + }); } @Override @@ -3116,15 +3122,28 @@ public void closeProducer(Producer producer, Optional assigned closeProducer(producer.getProducerId(), producer.getEpoch(), assignedBrokerLookupData); } + private LookupData getLookupData(BrokerLookupData lookupData) { + LookupOptions.LookupOptionsBuilder builder = LookupOptions.builder(); + if (StringUtils.isNotBlank((listenerName))) { + builder.advertisedListenerName(listenerName); + } + try { + return lookupData.toLookupResult(builder.build()).getLookupData(); + } catch (PulsarServerException e) { + log.error("Failed to get lookup data", e); + throw new RuntimeException(e); + } + } + private void closeProducer(long producerId, long epoch, Optional assignedBrokerLookupData) { if (getRemoteEndpointProtocolVersion() >= v5.getValue()) { - if (assignedBrokerLookupData.isPresent()) { - writeAndFlush(Commands.newCloseProducer(producerId, -1L, - assignedBrokerLookupData.get().pulsarServiceUrl(), - assignedBrokerLookupData.get().pulsarServiceUrlTls())); - } else { - writeAndFlush(Commands.newCloseProducer(producerId, -1L)); - } + assignedBrokerLookupData.ifPresentOrElse(lookup -> { + LookupData lookupData = getLookupData(lookup); + writeAndFlush(Commands.newCloseProducer(producerId, -1L, + lookupData.getBrokerUrl(), + lookupData.getBrokerUrlTls())); + }, + () -> writeAndFlush(Commands.newCloseProducer(producerId, -1L))); // The client does not necessarily know that the producer is closed, but the connection is still // active, and there could be messages in flight already. We want to ignore these messages for a time @@ -3150,9 +3169,13 @@ public void closeConsumer(Consumer consumer, Optional assigned private void closeConsumer(long consumerId, Optional assignedBrokerLookupData) { if (getRemoteEndpointProtocolVersion() >= v5.getValue()) { - writeAndFlush(newCloseConsumer(consumerId, -1L, - assignedBrokerLookupData.map(BrokerLookupData::pulsarServiceUrl).orElse(null), - assignedBrokerLookupData.map(BrokerLookupData::pulsarServiceUrlTls).orElse(null))); + assignedBrokerLookupData.ifPresentOrElse(lookup -> { + LookupData lookupData = getLookupData(lookup); + writeAndFlush(Commands.newCloseConsumer(consumerId, -1L, + lookupData.getBrokerUrl(), + lookupData.getBrokerUrlTls())); + }, + () -> writeAndFlush(Commands.newCloseConsumer(consumerId, -1L, null, null))); } else { close(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 8af654633b415..22ee11d8eaf32 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -580,7 +580,8 @@ public CompletableFuture stopReplProducers() { @Override public CompletableFuture checkReplication() { TopicName name = TopicName.get(topic); - if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { + if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name) + || ExtensibleLoadManagerImpl.isInternalTopic(topic)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f363132f94496..bd53191a261a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1817,7 +1817,8 @@ CompletableFuture checkPersistencePolicies() { @Override public CompletableFuture checkReplication() { TopicName name = TopicName.get(topic); - if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { + if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name) + || ExtensibleLoadManagerImpl.isInternalTopic(topic)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 9a439268a8b4f..89c12c6771ece 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -20,12 +20,21 @@ import io.prometheus.client.CollectorRegistry; import io.prometheus.client.jetty.JettyStatisticsCollector; +import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; import javax.servlet.DispatcherType; +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletResponse; import lombok.Getter; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -228,6 +237,7 @@ private static class FilterInitializer { private final FilterHolder authenticationFilterHolder; FilterInitializer(PulsarService pulsarService) { ServiceConfiguration config = pulsarService.getConfiguration(); + if (config.getMaxConcurrentHttpRequests() > 0) { FilterHolder filterHolder = new FilterHolder(QoSFilter.class); filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests())); @@ -239,6 +249,10 @@ private static class FilterInitializer { new RateLimitingFilter(config.getHttpRequestsMaxPerSecond()))); } + // wait until the PulsarService is ready to serve incoming requests + filterHolders.add( + new FilterHolder(new WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter(pulsarService))); + boolean brokerInterceptorEnabled = pulsarService.getBrokerInterceptor() != null; if (brokerInterceptorEnabled) { ExceptionHandler handler = new ExceptionHandler(); @@ -280,6 +294,42 @@ public void addFilters(ServletContextHandler context, boolean requiresAuthentica } } + // Filter that waits until the PulsarService is ready to serve incoming requests + private static class WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter implements Filter { + private final PulsarService pulsarService; + + public WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter(PulsarService pulsarService) { + this.pulsarService = pulsarService; + } + + @Override + public void init(FilterConfig filterConfig) throws ServletException { + + } + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + try { + // Wait until the PulsarService is ready to serve incoming requests + pulsarService.waitUntilReadyForIncomingRequests(); + } catch (ExecutionException e) { + ((HttpServletResponse) response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, + "PulsarService failed to start."); + return; + } catch (InterruptedException e) { + ((HttpServletResponse) response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, + "PulsarService is not ready."); + return; + } + chain.doFilter(request, response); + } + + @Override + public void destroy() { + + } + } } public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java index d77490e1b8210..cd653a964be36 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java @@ -38,6 +38,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; import org.apache.pulsar.broker.loadbalance.extensions.filter.AntiAffinityGroupPolicyFilter; import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper; +import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -61,7 +62,8 @@ protected String getLoadManagerClassName() { protected String selectBroker(ServiceUnitId serviceUnit, Object loadManager) { try { - return ((ExtensibleLoadManagerImpl) loadManager).assign(Optional.empty(), serviceUnit).get() + return ((ExtensibleLoadManagerImpl) loadManager) + .assign(Optional.empty(), serviceUnit, LookupOptions.builder().build()).get() .get().getPulsarServiceUrl(); } catch (Throwable e) { throw new RuntimeException(e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index a77af54daa31d..ea3eff702b4e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -21,6 +21,8 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import com.google.common.collect.Sets; + +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; @@ -63,22 +65,30 @@ protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) { this.defaultTestNamespace = defaultTestNamespace; } - protected ServiceConfiguration initConfig(ServiceConfiguration conf) { + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + updateConfig(conf); + } + + + protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { conf.setForceDeleteNamespaceAllowed(true); conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); conf.setLoadBalancerSheddingEnabled(false); conf.setLoadBalancerDebugModeEnabled(true); + conf.setWebServicePortTls(Optional.of(0)); + conf.setBrokerServicePortTls(Optional.of(0)); return conf; } @Override @BeforeClass(alwaysRun = true) protected void setup() throws Exception { - initConfig(conf); super.internalSetup(conf); pulsar1 = pulsar; - var conf2 = initConfig(getDefaultConf()); + var conf2 = updateConfig(getDefaultConf()); additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2); pulsar2 = additionalPulsarTestContext.getPulsarService(); @@ -141,7 +151,7 @@ private void setSecondaryLoadManager() throws IllegalAccessException { FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true); } - protected CompletableFuture getBundleAsync(PulsarService pulsar, TopicName topic) { + protected static CompletableFuture getBundleAsync(PulsarService pulsar, TopicName topic) { return pulsar.getNamespaceService().getBundleAsync(topic); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index f9c1bf97a2f3d..16d7d97de72c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -50,6 +50,7 @@ import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -109,9 +110,11 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; @@ -153,10 +156,12 @@ public ExtensibleLoadManagerImplTest() { public void testAssignInternalTopic() throws Exception { Optional brokerLookupData1 = primaryLoadManager.assign( Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)), - getBundleAsync(pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get(); + getBundleAsync(pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get(), + LookupOptions.builder().build()).get(); Optional brokerLookupData2 = secondaryLoadManager.assign( Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)), - getBundleAsync(pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get(); + getBundleAsync(pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get(), + LookupOptions.builder().build()).get(); assertEquals(brokerLookupData1, brokerLookupData2); assertTrue(brokerLookupData1.isPresent()); @@ -164,7 +169,7 @@ public void testAssignInternalTopic() throws Exception { FieldUtils.readField(channel1, "leaderElectionService", true); Optional currentLeader = leaderElectionService.getCurrentLeader(); assertTrue(currentLeader.isPresent()); - assertEquals(brokerLookupData1.get().getWebServiceUrl(), currentLeader.get().getServiceUrl()); + assertEquals(brokerLookupData1.get().getWebServiceUrlTls(), currentLeader.get().getServiceUrl()); } @Test @@ -172,15 +177,17 @@ public void testAssign() throws Exception { Pair topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-assign"); TopicName topicName = topicAndBundle.getLeft(); NamespaceBundle bundle = topicAndBundle.getRight(); - Optional brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + Optional brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle, + LookupOptions.builder().build()).get(); assertTrue(brokerLookupData.isPresent()); log.info("Assign the bundle {} to {}", bundle, brokerLookupData); // Should get owner info from channel. - Optional brokerLookupData1 = secondaryLoadManager.assign(Optional.empty(), bundle).get(); + Optional brokerLookupData1 = secondaryLoadManager.assign(Optional.empty(), bundle, + LookupOptions.builder().build()).get(); assertEquals(brokerLookupData, brokerLookupData1); Optional lookupResult = pulsar2.getNamespaceService() - .getBrokerServiceUrlAsync(topicName, null).get(); + .getBrokerServiceUrlAsync(topicName, LookupOptions.builder().build()).get(); assertTrue(lookupResult.isPresent()); assertEquals(lookupResult.get().getLookupData().getHttpUrl(), brokerLookupData.get().getWebServiceUrl()); @@ -190,6 +197,43 @@ public void testAssign() throws Exception { assertEquals(webServiceUrl.get().toString(), brokerLookupData.get().getWebServiceUrl()); } + @Test + public void testLookupOptions() throws Exception { + Pair topicAndBundle = + getBundleIsNotOwnByChangeEventTopic("test-lookup-options"); + TopicName topicName = topicAndBundle.getLeft(); + NamespaceBundle bundle = topicAndBundle.getRight(); + + admin.topics().createPartitionedTopic(topicName.toString(), 1); + + // Test LookupOptions.readOnly = true when the bundle is not owned by any broker. + Optional webServiceUrlReadOnlyTrue = pulsar1.getNamespaceService() + .getWebServiceUrl(bundle, LookupOptions.builder().readOnly(true).requestHttps(false).build()); + assertTrue(webServiceUrlReadOnlyTrue.isEmpty()); + + // Test LookupOptions.readOnly = false and the bundle assign to some broker. + Optional webServiceUrlReadOnlyFalse = pulsar1.getNamespaceService() + .getWebServiceUrl(bundle, LookupOptions.builder().readOnly(false).requestHttps(false).build()); + assertTrue(webServiceUrlReadOnlyFalse.isPresent()); + + // Test LookupOptions.requestHttps = true + Optional webServiceUrlHttps = pulsar2.getNamespaceService() + .getWebServiceUrl(bundle, LookupOptions.builder().requestHttps(true).build()); + assertTrue(webServiceUrlHttps.isPresent()); + assertTrue(webServiceUrlHttps.get().toString().startsWith("https")); + + // TODO: Support LookupOptions.loadTopicsInBundle = true + + // Test LookupOptions.advertisedListenerName = internal but the broker do not have internal listener. + try { + pulsar2.getNamespaceService() + .getWebServiceUrl(bundle, LookupOptions.builder().advertisedListenerName("internal").build()); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().contains("the broker do not have internal listener")); + } + } + @Test public void testCheckOwnershipAsync() throws Exception { Pair topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-check-ownership"); @@ -207,7 +251,7 @@ public void testCheckOwnershipAsync() throws Exception { assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); // 2. Assign the bundle to a broker. - Optional lookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + Optional lookupData = primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); assertTrue(lookupData.isPresent()); if (lookupData.get().getPulsarServiceUrl().equals(pulsar1.getBrokerServiceUrl())) { assertTrue(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); @@ -240,7 +284,7 @@ public CompletableFuture> filterAsync(Map brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + Optional brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); assertTrue(brokerLookupData.isPresent()); assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress()); } @@ -260,7 +304,7 @@ public CompletableFuture> filterAsync(Map brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + Optional brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); assertTrue(brokerLookupData.isPresent()); } @@ -269,7 +313,7 @@ public void testUnloadUponTopicLookupFailure() throws Exception { TopicName topicName = TopicName.get("public/test/testUnloadUponTopicLookupFailure"); NamespaceBundle bundle = pulsar1.getNamespaceService().getBundle(topicName); - primaryLoadManager.assign(Optional.empty(), bundle).get(); + primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); CompletableFuture future1 = new CompletableFuture(); CompletableFuture future2 = new CompletableFuture(); @@ -390,6 +434,19 @@ public Object[][] isPersistentTopicSubscriptionTypeTest() { @Test(timeOut = 30_000, dataProvider = "isPersistentTopicSubscriptionTypeTest") public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) throws Exception { + testTransferClientReconnectionWithoutLookup(topicDomain, subscriptionType, defaultTestNamespace, admin, + lookupUrl.toString(), pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager); + } + + @Test(enabled = false) + public static void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, + SubscriptionType subscriptionType, + String defaultTestNamespace, + PulsarAdmin admin, String brokerServiceUrl, + PulsarService pulsar1, PulsarService pulsar2, + ExtensibleLoadManager primaryLoadManager, + ExtensibleLoadManager secondaryLoadManager) + throws Exception { var id = String.format("test-tx-client-reconnect-%s-%s", subscriptionType, UUID.randomUUID()); var topic = String.format("%s://%s/%s", topicDomain.toString(), defaultTestNamespace, id); var topicName = TopicName.get(topic); @@ -399,7 +456,8 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, var consumers = new ArrayList>(); try { var lookups = new ArrayList(); - + var pulsarClient = pulsarClient(brokerServiceUrl, 0); + clients.add(pulsarClient); @Cleanup var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); lookups.add(spyLookupService(pulsarClient)); @@ -407,7 +465,7 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, var consumerCount = subscriptionType == SubscriptionType.Exclusive ? 1 : 3; for (int i = 0; i < consumerCount; i++) { - var client = newPulsarClient(lookupUrl.toString(), 0); + var client = pulsarClient(brokerServiceUrl, 0); clients.add(client); var consumer = client.newConsumer(Schema.STRING). subscriptionName(id). @@ -434,7 +492,7 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, dstBrokerUrl = pulsar1.getBrokerId(); dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl(); } - checkOwnershipState(broker, bundle); + checkOwnershipState(broker, bundle, primaryLoadManager, secondaryLoadManager, pulsar1); var messageCountBeforeUnloading = 100; var messageCountAfterUnloading = 100; @@ -528,6 +586,17 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicSubscriptionTypeTest") public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) throws Exception { + testUnloadClientReconnectionWithLookup(topicDomain, subscriptionType, defaultTestNamespace, admin, + lookupUrl.toString(), pulsar1); + } + + @Test(enabled = false) + public static void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, + SubscriptionType subscriptionType, + String defaultTestNamespace, + PulsarAdmin admin, + String brokerServiceUrl, + PulsarService pulsar1) throws Exception { var id = String.format("test-unload-%s-client-reconnect-%s-%s", topicDomain, subscriptionType, UUID.randomUUID()); var topic = String.format("%s://%s/%s", topicDomain, defaultTestNamespace, id); @@ -536,6 +605,7 @@ public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, var consumers = new ArrayList>(); try { @Cleanup + var pulsarClient = pulsarClient(brokerServiceUrl, 0); var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); var consumerCount = subscriptionType == SubscriptionType.Exclusive ? 1 : 3; @@ -600,13 +670,16 @@ public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, } } - private LookupService spyLookupService(PulsarClient client) throws IllegalAccessException { + private static LookupService spyLookupService(PulsarClient client) throws IllegalAccessException { LookupService svc = (LookupService) FieldUtils.readDeclaredField(client, "lookup", true); var lookup = spy(svc); FieldUtils.writeDeclaredField(client, "lookup", lookup, true); return lookup; } - private void checkOwnershipState(String broker, NamespaceBundle bundle) + + protected static void checkOwnershipState(String broker, NamespaceBundle bundle, + ExtensibleLoadManager primaryLoadManager, + ExtensibleLoadManager secondaryLoadManager, PulsarService pulsar1) throws ExecutionException, InterruptedException { var targetLoadManager = secondaryLoadManager; var otherLoadManager = primaryLoadManager; @@ -618,9 +691,15 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle) assertFalse(otherLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); } + protected void checkOwnershipState(String broker, NamespaceBundle bundle) + throws ExecutionException, InterruptedException { + checkOwnershipState(broker, bundle, primaryLoadManager, secondaryLoadManager, pulsar1); + } + @Test(timeOut = 30 * 1000) public void testSplitBundleAdminAPI() throws Exception { - String namespace = defaultTestNamespace; + final String namespace = "public/testSplitBundleAdminAPI"; + admin.namespaces().createNamespace(namespace, 1); Pair topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-split"); TopicName topicName = topicAndBundle.getLeft(); admin.topics().createPartitionedTopic(topicName.toString(), 10); @@ -674,6 +753,30 @@ public boolean test(NamespaceBundle namespaceBundle) { } catch (PulsarAdminException ex) { assertTrue(ex.getMessage().contains("Invalid bundle range")); } + + + // delete and retry + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + admin.namespaces().deleteNamespace(namespace); + }); + admin.namespaces().createNamespace(namespace, 1); + admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null); + + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + BundlesData bundlesData = admin.namespaces().getBundles(namespace); + assertEquals(bundlesData.getNumBundles(), numBundles + 1); + String lowBundle = String.format("0x%08x", bundleRanges.get(0)); + String midBundle = String.format("0x%08x", mid); + String highBundle = String.format("0x%08x", bundleRanges.get(1)); + assertTrue(bundlesData.getBoundaries().contains(lowBundle)); + assertTrue(bundlesData.getBoundaries().contains(midBundle)); + assertTrue(bundlesData.getBoundaries().contains(highBundle)); + assertEquals(splitCount.get(), 2); + }); } @Test(timeOut = 30 * 1000) @@ -791,7 +894,7 @@ public CompletableFuture> filterAsync(Map brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + Optional brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> { assertTrue(brokerLookupData.isPresent()); assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress()); @@ -922,6 +1025,47 @@ public void testDeployAndRollbackLoadManager() throws Exception { pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); } } + // Check if the broker is available + var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get(); + var loadManager4 = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(wrapper, "loadManager", true)); + loadManager4.getBrokerRegistry().unregister(); + + NamespaceName slaMonitorNamespace = + getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration()); + String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test"); + String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(result); + log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); + assertNotEquals(result, pulsar4.getBrokerServiceUrl()); + + Producer producer = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); + producer.send("t1"); + + // Test re-register broker and check the lookup result + loadManager4.getBrokerRegistry().register(); + + result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(result); + log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); + assertEquals(result, pulsar4.getBrokerServiceUrl()); + + producer.send("t2"); + Producer producer1 = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); + producer1.send("t3"); + + producer.close(); + producer1.close(); + @Cleanup + Consumer consumer = pulsar.getClient().newConsumer(Schema.STRING) + .topic(slaMonitorTopic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test") + .subscribe(); + // receive message t1 t2 t3 + assertEquals(consumer.receive().getValue(), "t1"); + assertEquals(consumer.receive().getValue(), "t2"); + assertEquals(consumer.receive().getValue(), "t3"); } } } @@ -1440,7 +1584,7 @@ public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exceptio String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units"; admin.topics().createPartitionedTopic(topic, 1); NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).join(); - CompletableFuture> owner = primaryLoadManager.assign(Optional.empty(), bundle); + CompletableFuture> owner = primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()); assertFalse(owner.join().isEmpty()); BrokerLookupData brokerLookupData = owner.join().get(); @@ -1489,7 +1633,11 @@ public void testTryAcquiringOwnership() NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get(); assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl()) .contains(namespaceEphemeralData.getNativeUrl())); - admin.namespaces().deleteNamespace(namespace); + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + admin.namespaces().deleteNamespace(namespace, true); + }); } @Test(timeOut = 30 * 1000) @@ -1547,4 +1695,11 @@ public String name() { } + protected static PulsarClient pulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return + PulsarClient.builder() + .serviceUrl(url) + .statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java new file mode 100644 index 0000000000000..087aefa1cfcd6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.TopicDomain; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Unit test for {@link ExtensibleLoadManagerImpl with AdvertisedListeners broker configs}. + */ +@Slf4j +@Test(groups = "flaky") +@SuppressWarnings("unchecked") +public class ExtensibleLoadManagerImplWithAdvertisedListenersTest extends ExtensibleLoadManagerImplBaseTest { + + public String brokerServiceUrl; + public ExtensibleLoadManagerImplWithAdvertisedListenersTest() { + super("public/test"); + } + + @Override + protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { + super.updateConfig(conf); + int privatePulsarPort = nextLockedFreePort(); + int publicPulsarPort = nextLockedFreePort(); + conf.setInternalListenerName("internal"); + conf.setBindAddresses("external:pulsar://localhost:" + publicPulsarPort); + conf.setAdvertisedListeners( + "external:pulsar://localhost:" + publicPulsarPort + + ",internal:pulsar://localhost:" + privatePulsarPort); + conf.setWebServicePortTls(Optional.empty()); + conf.setBrokerServicePortTls(Optional.empty()); + conf.setBrokerServicePort(Optional.of(privatePulsarPort)); + conf.setWebServicePort(Optional.of(0)); + brokerServiceUrl = conf.getBindAddresses().replaceAll("external:", ""); + return conf; + } + + @DataProvider(name = "isPersistentTopicSubscriptionTypeTest") + public Object[][] isPersistentTopicSubscriptionTypeTest() { + return new Object[][]{ + {TopicDomain.non_persistent, SubscriptionType.Exclusive}, + {TopicDomain.persistent, SubscriptionType.Key_Shared} + }; + } + + @Test(timeOut = 30_000, dataProvider = "isPersistentTopicSubscriptionTypeTest") + public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) + throws Exception { + ExtensibleLoadManagerImplTest.testTransferClientReconnectionWithoutLookup(topicDomain, subscriptionType, + defaultTestNamespace, admin, + brokerServiceUrl, + pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager); + } + + @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicSubscriptionTypeTest") + public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, + SubscriptionType subscriptionType) throws Exception { + ExtensibleLoadManagerImplTest.testUnloadClientReconnectionWithLookup(topicDomain, subscriptionType, + defaultTestNamespace, admin, + brokerServiceUrl, + pulsar1); + } + + @DataProvider(name = "isPersistentTopicTest") + public Object[][] isPersistentTopicTest() { + return new Object[][]{{TopicDomain.persistent}, {TopicDomain.non_persistent}}; + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java index 0c95dd85f28e0..ed99b502b7e29 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java @@ -31,8 +31,8 @@ public ExtensibleLoadManagerImplWithTransactionCoordinatorTest() { } @Override - protected ServiceConfiguration initConfig(ServiceConfiguration conf) { - conf = super.initConfig(conf); + protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { + conf = super.updateConfig(conf); conf.setTransactionCoordinatorEnabled(true); return conf; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index fe8387710eeae..837aceca1416f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -502,7 +502,7 @@ public void transferTestWhenDestBrokerFails() // recovered, check the monitor update state : Assigned -> Owned doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1))) - .when(loadManager).selectAsync(any(), any()); + .when(loadManager).selectAsync(any(), any(), any()); FieldUtils.writeDeclaredField(channel2, "producer", producer, true); FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 1 , true); @@ -576,11 +576,11 @@ public void splitAndRetryTest() throws Exception { childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty())); channel1.publishSplitEventAsync(split); - waitUntilState(channel1, bundle, Deleted); - waitUntilState(channel2, bundle, Deleted); + waitUntilState(channel1, bundle, Init); + waitUntilState(channel2, bundle, Init); - validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0); - validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0); + validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0); + validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0); validateEventCounters(channel1, 1, 0, 1, 0, 0, 0); validateEventCounters(channel2, 0, 0, 0, 0, 0, 0); // Verify the retry count @@ -620,7 +620,7 @@ public void splitAndRetryTest() throws Exception { var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; validateMonitorCounters(leader, 0, - 1, + 0, 0, 0, 0, @@ -724,7 +724,7 @@ public void handleBrokerDeletionEventTest() var owner1 = channel1.getOwnerAsync(bundle1); var owner2 = channel2.getOwnerAsync(bundle2); doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2))) - .when(loadManager).selectAsync(any(), any()); + .when(loadManager).selectAsync(any(), any(), any()); assertTrue(owner1.get().isEmpty()); assertTrue(owner2.get().isEmpty()); @@ -1126,7 +1126,7 @@ public void assignTestWhenDestBrokerProducerFails() FieldUtils.writeDeclaredField(channel2, "inFlightStateWaitingTimeInMillis", 3 * 1000, true); doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2))) - .when(loadManager).selectAsync(any(), any()); + .when(loadManager).selectAsync(any(), any(), any()); channel1.publishAssignEventAsync(bundle, brokerId2); // channel1 is broken. the assign won't be complete. waitUntilState(channel1, bundle); @@ -1236,15 +1236,15 @@ public void splitTestWhenProducerFails() var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; - waitUntilStateWithMonitor(leader, bundle, Deleted); - waitUntilStateWithMonitor(channel1, bundle, Deleted); - waitUntilStateWithMonitor(channel2, bundle, Deleted); + waitUntilStateWithMonitor(leader, bundle, Init); + waitUntilStateWithMonitor(channel1, bundle, Init); + waitUntilStateWithMonitor(channel2, bundle, Init); var ownerAddr1 = channel1.getOwnerAsync(bundle); var ownerAddr2 = channel2.getOwnerAsync(bundle); - assertTrue(ownerAddr1.isCompletedExceptionally()); - assertTrue(ownerAddr2.isCompletedExceptionally()); + assertTrue(ownerAddr1.get().isEmpty()); + assertTrue(ownerAddr2.get().isEmpty()); FieldUtils.writeDeclaredField(channel1, @@ -1428,13 +1428,15 @@ public void splitAndRetryFailureTest() throws Exception { var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; ((ServiceUnitStateChannelImpl) leader) .monitorOwnerships(List.of(brokerId1, brokerId2)); - waitUntilState(leader, bundle3, Deleted); - waitUntilState(channel1, bundle3, Deleted); - waitUntilState(channel2, bundle3, Deleted); + + waitUntilState(leader, bundle3, Init); + waitUntilState(channel1, bundle3, Init); + waitUntilState(channel2, bundle3, Init); - validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0, 1, 0); - validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 1, 0); + + validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 1, 0, 1, 0); + validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 1, 0, 1, 0); validateEventCounters(channel1, 1, 0, 1, 0, 0, 0); validateEventCounters(channel2, 0, 0, 0, 0, 0, 0); @@ -1464,7 +1466,7 @@ public void splitAndRetryFailureTest() throws Exception { validateMonitorCounters(leader, 0, - 1, + 0, 1, 0, 0, @@ -1525,7 +1527,7 @@ public void testOverrideInactiveBrokerStateData() // test stable metadata state doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2))) - .when(loadManager).selectAsync(any(), any()); + .when(loadManager).selectAsync(any(), any(), any()); leaderChannel.handleMetadataSessionEvent(SessionReestablished); followerChannel.handleMetadataSessionEvent(SessionReestablished); FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", @@ -1542,7 +1544,7 @@ public void testOverrideInactiveBrokerStateData() waitUntilNewOwner(channel2, ownedBundle, brokerId2); assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get()); assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally()); - assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally()); + assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty()); // clean-up FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true); @@ -1590,7 +1592,7 @@ public void testOverrideOrphanStateData() // test stable metadata state doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2))) - .when(loadManager).selectAsync(any(), any()); + .when(loadManager).selectAsync(any(), any(), any()); FieldUtils.writeDeclaredField(leaderChannel, "inFlightStateWaitingTimeInMillis", -1, true); FieldUtils.writeDeclaredField(followerChannel, "inFlightStateWaitingTimeInMillis", @@ -1605,7 +1607,7 @@ public void testOverrideOrphanStateData() waitUntilNewOwner(channel2, ownedBundle, broker); assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get()); assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally()); - assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally()); + assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty()); // clean-up FieldUtils.writeDeclaredField(channel1, @@ -1618,34 +1620,65 @@ public void testOverrideOrphanStateData() @Test(priority = 19) public void testActiveGetOwner() throws Exception { - - // set the bundle owner is the broker + // case 1: the bundle owner is empty String broker = brokerId2; String bundle = "public/owned/0xfffffff0_0xffffffff"; + overrideTableViews(bundle, null); + assertEquals(Optional.empty(), channel1.getOwnerAsync(bundle).get()); + + // case 2: the bundle ownership is transferring, and the dst broker is not the channel owner + overrideTableViews(bundle, + new ServiceUnitStateData(Releasing, broker, brokerId1, 1)); + assertEquals(Optional.of(broker), channel1.getOwnerAsync(bundle).get()); + + + // case 3: the bundle ownership is transferring, and the dst broker is the channel owner + overrideTableViews(bundle, + new ServiceUnitStateData(Assigning, brokerId1, brokerId2, 1)); + assertTrue(!channel1.getOwnerAsync(bundle).isDone()); + + // case 4: the bundle ownership is found overrideTableViews(bundle, new ServiceUnitStateData(Owned, broker, null, 1)); var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get(); assertEquals(owner, broker); - // simulate the owner is inactive + // case 5: the owner lookup gets delayed var spyRegistry = spy(new BrokerRegistryImpl(pulsar)); - doReturn(CompletableFuture.completedFuture(Optional.empty())) - .when(spyRegistry).lookupAsync(eq(broker)); FieldUtils.writeDeclaredField(channel1, "brokerRegistry", spyRegistry , true); FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 1000, true); + var delayedFuture = new CompletableFuture(); + doReturn(delayedFuture).when(spyRegistry).lookupAsync(eq(broker)); + CompletableFuture.runAsync(() -> { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt();; + } + delayedFuture.complete(Optional.of(broker)); + }); - - // verify getOwnerAsync times out because the owner is inactive now. + // verify the owner eventually returns in inFlightStateWaitingTimeInMillis. long start = System.currentTimeMillis(); + assertEquals(broker, channel1.getOwnerAsync(bundle).get().get()); + long elapsed = System.currentTimeMillis() - start; + assertTrue(elapsed < 1000); + + // case 6: the owner is inactive + doReturn(CompletableFuture.completedFuture(Optional.empty())) + .when(spyRegistry).lookupAsync(eq(broker)); + + // verify getOwnerAsync times out + start = System.currentTimeMillis(); var ex = expectThrows(ExecutionException.class, () -> channel1.getOwnerAsync(bundle).get()); assertTrue(ex.getCause() instanceof IllegalStateException); assertTrue(System.currentTimeMillis() - start >= 1000); - // simulate ownership cleanup(no selected owner) by the leader channel + // case 7: the ownership cleanup(no new owner) by the leader channel doReturn(CompletableFuture.completedFuture(Optional.empty())) - .when(loadManager).selectAsync(any(), any()); + .when(loadManager).selectAsync(any(), any(), any()); var leaderChannel = channel1; String leader1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get(); String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get(); @@ -1663,13 +1696,16 @@ public void testActiveGetOwner() throws Exception { "inFlightStateWaitingTimeInMillis", 20 * 1000, true); start = System.currentTimeMillis(); assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty()); + waitUntilState(channel1, bundle, Init); + waitUntilState(channel2, bundle, Init); + assertTrue(System.currentTimeMillis() - start < 20_000); - // simulate ownership cleanup(brokerId1 selected owner) by the leader channel + // case 8: simulate ownership cleanup(brokerId1 as the new owner) by the leader channel overrideTableViews(bundle, new ServiceUnitStateData(Owned, broker, null, 1)); doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1))) - .when(loadManager).selectAsync(any(), any()); + .when(loadManager).selectAsync(any(), any(), any()); leaderChannel.handleMetadataSessionEvent(SessionReestablished); FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); @@ -1690,6 +1726,7 @@ public void testActiveGetOwner() throws Exception { } + private static ConcurrentHashMap>> getOwnerRequests( ServiceUnitStateChannel channel) throws IllegalAccessException { return (ConcurrentHashMap>>) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java index 0d874e0f77117..66e8c917d1fc5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java @@ -18,13 +18,19 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.data; +import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; + +import java.net.URI; +import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.lookup.LookupResult; +import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; import org.testng.annotations.Test; @@ -32,12 +38,20 @@ public class BrokerLookupDataTest { @Test - public void testConstructors() { + public void testConstructors() throws PulsarServerException, URISyntaxException { String webServiceUrl = "http://localhost:8080"; String webServiceUrlTls = "https://localhoss:8081"; String pulsarServiceUrl = "pulsar://localhost:6650"; String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651"; - Map advertisedListeners = new HashMap<>(); + final String listenerUrl = "pulsar://gateway:7000"; + final String listenerUrlTls = "pulsar://gateway:8000"; + final String listener = "internal"; + Map advertisedListeners = new HashMap<>(){{ + put(listener, AdvertisedListener.builder() + .brokerServiceUrl(new URI(listenerUrl)) + .brokerServiceUrlTls(new URI(listenerUrlTls)) + .build()); + }}; Map protocols = new HashMap<>(){{ put("kafka", "9092"); }}; @@ -56,10 +70,22 @@ public void testConstructors() { assertEquals("3.0", lookupData.brokerVersion()); - LookupResult lookupResult = lookupData.toLookupResult(); + LookupResult lookupResult = lookupData.toLookupResult(LookupOptions.builder().build()); assertEquals(webServiceUrl, lookupResult.getLookupData().getHttpUrl()); assertEquals(webServiceUrlTls, lookupResult.getLookupData().getHttpUrlTls()); assertEquals(pulsarServiceUrl, lookupResult.getLookupData().getBrokerUrl()); assertEquals(pulsarServiceUrlTls, lookupResult.getLookupData().getBrokerUrlTls()); + + try { + lookupData.toLookupResult(LookupOptions.builder().advertisedListenerName("others").build()); + fail(); + } catch (PulsarServerException ex) { + assertTrue(ex.getMessage().contains("the broker do not have others listener")); + } + lookupResult = lookupData.toLookupResult(LookupOptions.builder().advertisedListenerName(listener).build()); + assertEquals(listenerUrl, lookupResult.getLookupData().getBrokerUrl()); + assertEquals(listenerUrlTls, lookupResult.getLookupData().getBrokerUrlTls()); + assertEquals(webServiceUrl, lookupResult.getLookupData().getHttpUrl()); + assertEquals(webServiceUrlTls, lookupResult.getLookupData().getHttpUrlTls()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java index 3287306ab48ba..57b7830214b92 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java @@ -123,40 +123,23 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 1); - assertEquals(counter.toMetrics(null).toString(), - counterExpected.toMetrics(null).toString()); manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null); - counterExpected.update(SplitDecision.Label.Success, Sessions); - assertEquals(inFlightUnloadRequests.size(), 0); - assertEquals(counter.toMetrics(null).toString(), - counterExpected.toMetrics(null).toString()); + assertEquals(inFlightUnloadRequests.size(), 1); - // Success with Init state. - future = manager.waitAsync(CompletableFuture.completedFuture(null), - bundle, decision, 5, TimeUnit.SECONDS); - inFlightUnloadRequests = getinFlightUnloadRequests(manager); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 1); + + // Success with Init state. manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 0); counterExpected.update(SplitDecision.Label.Success, Sessions); assertEquals(counter.toMetrics(null).toString(), counterExpected.toMetrics(null).toString()); - future.get(); - // Success with Owned state. - future = manager.waitAsync(CompletableFuture.completedFuture(null), - bundle, decision, 5, TimeUnit.SECONDS); - inFlightUnloadRequests = getinFlightUnloadRequests(manager); - assertEquals(inFlightUnloadRequests.size(), 1); - manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null); - assertEquals(inFlightUnloadRequests.size(), 0); - counterExpected.update(SplitDecision.Label.Success, Sessions); - assertEquals(counter.toMetrics(null).toString(), - counterExpected.toMetrics(null).toString()); future.get(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java index 45b1cd9544f91..06cfb0d970549 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java @@ -123,11 +123,11 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int assertEquals(inFlightUnloadRequestMap.size(), 1); manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null); + new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, true, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null); + // Success with Init state. + manager.handleEvent(bundle, null, null); assertEquals(inFlightUnloadRequestMap.size(), 0); future.get(); assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1); @@ -137,17 +137,30 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int bundle, unloadDecision, 5, TimeUnit.SECONDS); inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 0); - future.get(); assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2); + + // Success with Free state. + future = manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, unloadDecision, 5, TimeUnit.SECONDS); + inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, true, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, false, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 0); + future.get(); + assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 3); + + } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java index 9cc20cf7b9def..bdaddf9afb1da 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java @@ -27,7 +27,6 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; -import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -73,7 +72,7 @@ public void testStopBroker() throws PulsarServerException { pulsar.close(); final var elapsedMs = System.currentTimeMillis() - beforeStop; log.info("It spends {} ms to stop the broker ({} for protocol handler)", elapsedMs, handler.closeTimeMs); - Assert.assertTrue(elapsedMs < ServiceUnitStateChannelImpl.OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS + Assert.assertTrue(elapsedMs < + handler.closeTimeMs + shutdownTimeoutMs + 1000); // tolerate 1 more second for other processes } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java index 479c1e616e340..514e0207fbfb1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java @@ -27,6 +27,8 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -38,6 +40,8 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; import org.testng.annotations.Test; import java.lang.reflect.Method; @@ -52,6 +56,18 @@ public class ReplicatorGlobalNSTest extends ReplicatorTestBase { protected String methodName; + @DataProvider(name = "loadManagerClassName") + public static Object[][] loadManagerClassName() { + return new Object[][]{ + {ModularLoadManagerImpl.class.getName()}, + {ExtensibleLoadManagerImpl.class.getName()} + }; + } + + @Factory(dataProvider = "loadManagerClassName") + public ReplicatorGlobalNSTest(String loadManagerClassName) { + this.loadManagerClassName = loadManagerClassName; + } @BeforeMethod public void beforeMethod(Method m) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 39cd13fbba5f5..d87f896e31a1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -119,6 +119,11 @@ public abstract class ReplicatorTestBase extends TestRetrySupport { protected final String cluster2 = "r2"; protected final String cluster3 = "r3"; protected final String cluster4 = "r4"; + protected String loadManagerClassName; + + protected String getLoadManagerClassName() { + return loadManagerClassName; + } // Default frequency public int getBrokerServicePurgeInactiveFrequency() { @@ -271,8 +276,9 @@ protected void setup() throws Exception { .brokerClientTlsTrustStoreType(keyStoreType) .build()); - admin1.tenants().createTenant("pulsar", - new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3"))); + updateTenantInfo("pulsar", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), + Sets.newHashSet("r1", "r2", "r3"))); admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet("r1", "r2", "r3")); admin1.namespaces().createNamespace("pulsar/ns1", Sets.newHashSet("r1", "r2")); @@ -344,6 +350,7 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName, config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); config.setEnableReplicatedSubscriptions(true); config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); + config.setLoadManagerClassName(getLoadManagerClassName()); } public void resetConfig1() { @@ -436,6 +443,14 @@ protected void cleanup() throws Exception { resetConfig4(); } + protected void updateTenantInfo(String tenant, TenantInfoImpl tenantInfo) throws Exception { + if (!admin1.tenants().getTenants().contains(tenant)) { + admin1.tenants().createTenant(tenant, tenantInfo); + } else { + admin1.tenants().updateTenant(tenant, tenantInfo); + } + } + static class MessageProducer implements AutoCloseable { URL url; String namespace; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index 1f29e19f01873..ee7497010adfc 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -403,9 +403,10 @@ public void testIsolationPolicy() throws Exception { () -> { try { admin.lookups().lookupTopicAsync(topic).get(5, TimeUnit.SECONDS); + fail(); } catch (Exception ex) { log.error("Failed to lookup topic: ", ex); - assertThat(ex.getMessage()).contains("Failed to select the new owner broker for bundle"); + assertThat(ex.getMessage()).contains("Service Unavailable"); } } );