diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 559db301158a4..7698f3d119c23 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -463,6 +463,9 @@ public CompletableFuture closeAsync() { return closeFuture; } LOG.info("Closing PulsarService"); + if (topicPoliciesService != null) { + topicPoliciesService.close(); + } if (brokerService != null) { brokerService.unloadNamespaceBundlesGracefully(); } @@ -578,10 +581,6 @@ public CompletableFuture closeAsync() { transactionBufferClient.close(); } - if (topicPoliciesService != null) { - topicPoliciesService.close(); - topicPoliciesService = null; - } if (client != null) { client.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 8e34f2f697fb1..bf98df2c5ac8f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -182,7 +182,14 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS private SplitManager splitManager; - private volatile boolean started = false; + enum State { + INIT, + RUNNING, + // It's removing visibility of the current broker from other brokers. In this state, it cannot play as a leader + // or follower. + DISABLED, + } + private final AtomicReference state = new AtomicReference<>(State.INIT); private boolean configuredSystemTopics = false; @@ -210,7 +217,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS * Get all the bundles that are owned by this broker. */ public CompletableFuture> getOwnedServiceUnitsAsync() { - if (!started) { + if (state.get() == State.INIT) { log.warn("Failed to get owned service units, load manager is not started."); return CompletableFuture.completedFuture(Collections.emptySet()); } @@ -373,7 +380,7 @@ public static CompletableFuture> getAssignedBrokerLoo @Override public void start() throws PulsarServerException { - if (this.started) { + if (state.get() != State.INIT) { return; } try { @@ -471,7 +478,9 @@ public void start() throws PulsarServerException { this.splitScheduler.start(); this.initWaiter.complete(true); - this.started = true; + if (!state.compareAndSet(State.INIT, State.RUNNING)) { + failForUnexpectedState("start"); + } log.info("Started load manager."); } catch (Throwable e) { failStarting(e); @@ -643,21 +652,17 @@ public CompletableFuture> selectAsync(ServiceUnitId bundle, filter.filterAsync(availableBrokerCandidates, bundle, context); futures.add(future); } - CompletableFuture> result = new CompletableFuture<>(); - FutureUtil.waitForAll(futures).whenComplete((__, ex) -> { - if (ex != null) { - // TODO: We may need to revisit this error case. - log.error("Failed to filter out brokers when select bundle: {}", bundle, ex); - } + return FutureUtil.waitForAll(futures).exceptionally(e -> { + // TODO: We may need to revisit this error case. + log.error("Failed to filter out brokers when select bundle: {}", bundle, e); + return null; + }).thenApply(__ -> { if (availableBrokerCandidates.isEmpty()) { - result.complete(Optional.empty()); - return; + return Optional.empty(); } Set candidateBrokers = availableBrokerCandidates.keySet(); - - result.complete(getBrokerSelectionStrategy().select(candidateBrokers, bundle, context)); + return getBrokerSelectionStrategy().select(candidateBrokers, bundle, context); }); - return result; }); } @@ -695,6 +700,9 @@ public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, boolean force, long timeout, TimeUnit timeoutUnit) { + if (state.get() == State.INIT) { + return CompletableFuture.completedFuture(null); + } if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) { log.info("Skip unloading namespace bundle: {}.", bundle); return CompletableFuture.completedFuture(null); @@ -783,28 +791,13 @@ private CompletableFuture splitAsync(SplitDecision decision, @Override public void close() throws PulsarServerException { - if (!this.started) { + if (state.get() == State.INIT) { return; } try { - if (brokerLoadDataReportTask != null) { - brokerLoadDataReportTask.cancel(true); - } - - if (topBundlesLoadDataReportTask != null) { - topBundlesLoadDataReportTask.cancel(true); - } - - if (monitorTask != null) { - monitorTask.cancel(true); - } - - this.brokerLoadDataStore.close(); - this.topBundlesLoadDataStore.close(); + stopLoadDataReportTasks(); this.unloadScheduler.close(); this.splitScheduler.close(); - } catch (IOException ex) { - throw new PulsarServerException(ex); } finally { try { this.brokerRegistry.close(); @@ -818,7 +811,7 @@ public void close() throws PulsarServerException { } catch (Exception e) { throw new PulsarServerException(e); } finally { - this.started = false; + state.set(State.INIT); } } @@ -826,6 +819,28 @@ public void close() throws PulsarServerException { } } + private void stopLoadDataReportTasks() { + if (brokerLoadDataReportTask != null) { + brokerLoadDataReportTask.cancel(true); + } + if (topBundlesLoadDataReportTask != null) { + topBundlesLoadDataReportTask.cancel(true); + } + if (monitorTask != null) { + monitorTask.cancel(true); + } + try { + brokerLoadDataStore.shutdown(); + } catch (IOException e) { + log.warn("Failed to shutdown brokerLoadDataStore", e); + } + try { + topBundlesLoadDataStore.shutdown(); + } catch (IOException e) { + log.warn("Failed to shutdown topBundlesLoadDataStore", e); + } + } + public static boolean isInternalTopic(String topic) { return INTERNAL_TOPICS.contains(topic) || topic.startsWith(TOPIC) @@ -841,13 +856,16 @@ synchronized void playLeader() { boolean becameFollower = false; while (!Thread.currentThread().isInterrupted()) { try { - if (!initWaiter.get()) { + if (!initWaiter.get() || disabled()) { return; } if (!serviceUnitStateChannel.isChannelOwner()) { becameFollower = true; break; } + if (disabled()) { + return; + } // Confirm the system topics have been created or create them if they do not exist. // If the leader has changed, the new leader need to reset // the local brokerService.topics (by this topic creations). @@ -859,6 +877,11 @@ synchronized void playLeader() { serviceUnitStateChannel.scheduleOwnershipMonitor(); break; } catch (Throwable e) { + if (disabled()) { + log.warn("The broker:{} failed to set the role but exit because it's disabled", + pulsar.getBrokerId(), e); + return; + } log.warn("The broker:{} failed to set the role. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); try { @@ -870,6 +893,9 @@ synchronized void playLeader() { } } } + if (disabled()) { + return; + } if (becameFollower) { log.warn("The broker:{} became follower while initializing leader role.", pulsar.getBrokerId()); @@ -893,13 +919,16 @@ synchronized void playFollower() { boolean becameLeader = false; while (!Thread.currentThread().isInterrupted()) { try { - if (!initWaiter.get()) { + if (!initWaiter.get() || disabled()) { return; } if (serviceUnitStateChannel.isChannelOwner()) { becameLeader = true; break; } + if (disabled()) { + return; + } unloadScheduler.close(); serviceUnitStateChannel.cancelOwnershipMonitor(); closeInternalTopics(); @@ -908,6 +937,11 @@ synchronized void playFollower() { topBundlesLoadDataStore.startProducer(); break; } catch (Throwable e) { + if (disabled()) { + log.warn("The broker:{} failed to set the role but exit because it's disabled", + pulsar.getBrokerId(), e); + return; + } log.warn("The broker:{} failed to set the role. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); try { @@ -919,6 +953,9 @@ synchronized void playFollower() { } } } + if (disabled()) { + return; + } if (becameLeader) { log.warn("This broker:{} became leader while initializing follower role.", pulsar.getBrokerId()); @@ -997,9 +1034,20 @@ protected void monitor() { } public void disableBroker() throws Exception { + // TopicDoesNotExistException might be thrown and it's not recoverable. Enable this flag to exit playFollower() + // or playLeader() quickly. + if (!state.compareAndSet(State.RUNNING, State.DISABLED)) { + failForUnexpectedState("disableBroker"); + } + stopLoadDataReportTasks(); serviceUnitStateChannel.cleanOwnerships(); - leaderElectionService.close(); brokerRegistry.unregister(); + leaderElectionService.close(); + final var availableBrokers = brokerRegistry.getAvailableBrokersAsync() + .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + if (availableBrokers.isEmpty()) { + close(); + } // Close the internal topics (if owned any) after giving up the possible leader role, // so that the subsequent lookups could hit the next leader. closeInternalTopics(); @@ -1033,4 +1081,16 @@ protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) { protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) { return new ServiceUnitStateChannelImpl(pulsar); } + + private void failForUnexpectedState(String msg) { + throw new IllegalStateException("Failed to " + msg + ", state: " + state.get()); + } + + boolean running() { + return state.get() == State.RUNNING; + } + + private boolean disabled() { + return state.get() == State.DISABLED; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java index 25eb27bc58d27..35f6cfcbcf549 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -50,6 +50,10 @@ public void start() throws PulsarServerException { loadManager.start(); } + public boolean started() { + return loadManager.running() && loadManager.getServiceUnitStateChannel().started(); + } + @Override public void initialize(PulsarService pulsar) { loadManager.initialize(pulsar); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java index 9be76e1b0f44d..6319fc332a678 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java @@ -43,6 +43,11 @@ public interface ServiceUnitStateChannel extends Closeable { */ void start() throws PulsarServerException; + /** + * Whether the channel started. + */ + boolean started(); + /** * Closes the ServiceUnitStateChannel. * @throws PulsarServerException if it fails to close the channel. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 1a4b6eb3dfbe2..1f2715a00acfd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -32,6 +32,7 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isInFlightState; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Closed; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Constructed; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Disabled; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.LeaderElectionServiceStarted; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Started; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Assign; @@ -181,7 +182,8 @@ enum ChannelState { Closed(0), Constructed(1), LeaderElectionServiceStarted(2), - Started(3); + Started(3), + Disabled(4); ChannelState(int id) { this.id = id; @@ -260,11 +262,19 @@ public void cancelOwnershipMonitor() { } } + + @Override public void cleanOwnerships() { + disable(); doCleanup(brokerId); } + @Override + public synchronized boolean started() { + return validateChannelState(Started, true); + } + public synchronized void start() throws PulsarServerException { if (!validateChannelState(LeaderElectionServiceStarted, false)) { throw new IllegalStateException("Invalid channel state:" + channelState.name()); @@ -442,9 +452,7 @@ public CompletableFuture isChannelOwnerAsync() { if (owner.isPresent()) { return isTargetBroker(owner.get()); } else { - String msg = "There is no channel owner now."; - log.error(msg); - throw new IllegalStateException(msg); + throw new IllegalStateException("There is no channel owner now."); } }); } @@ -618,7 +626,7 @@ private long getNextVersionId(ServiceUnitStateData data) { } public CompletableFuture publishAssignEventAsync(String serviceUnit, String broker) { - if (!validateChannelState(Started, true)) { + if (!validateChannelState(Started, true) || channelState == Disabled) { return CompletableFuture.failedFuture( new IllegalStateException("Invalid channel state:" + channelState.name())); } @@ -699,6 +707,14 @@ private void handleEvent(String serviceUnit, ServiceUnitStateData data) { } ServiceUnitState state = state(data); + if (channelState == Disabled && (data == null || !data.force())) { + final var request = getOwnerRequests.remove(serviceUnit); + if (request != null) { + request.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException( + "cancel the lookup request for " + serviceUnit + " when receiving " + state)); + } + return; + } try { switch (state) { case Owned -> handleOwnEvent(serviceUnit, data); @@ -866,7 +882,7 @@ private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) { } } - private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { + private CompletableFuture handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { var getOwnerRequest = getOwnerRequests.remove(serviceUnit); if (getOwnerRequest != null) { getOwnerRequest.complete(null); @@ -880,8 +896,10 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { : CompletableFuture.completedFuture(0)).thenApply(__ -> null); stateChangeListeners.notifyOnCompletion(future, serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, null)); + return future; } else { stateChangeListeners.notify(serviceUnit, data, null); + return CompletableFuture.completedFuture(null); } } @@ -1258,8 +1276,17 @@ private void handleBrokerCreationEvent(String broker) { } private void handleBrokerDeletionEvent(String broker) { - if (!isChannelOwner()) { - log.warn("This broker is not the leader now. Ignoring BrokerDeletionEvent for broker {}.", broker); + try { + if (!isChannelOwner()) { + log.warn("This broker is not the leader now. Ignoring BrokerDeletionEvent for broker {}.", broker); + return; + } + } catch (Exception e) { + if (e instanceof ExecutionException && e.getCause() instanceof IllegalStateException) { + log.warn("Failed to handle broker deletion event due to {}", e.getMessage()); + } else { + log.error("Failed to handle broker deletion event.", e); + } return; } MetadataState state = getMetadataState(); @@ -1279,6 +1306,11 @@ private void handleBrokerDeletionEvent(String broker) { private void scheduleCleanup(String broker, long delayInSecs) { var scheduled = new MutableObject>(); try { + final var channelState = this.channelState; + if (channelState == Disabled || channelState == Closed) { + log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState); + return; + } cleanupJobs.computeIfAbsent(broker, k -> { Executor delayed = CompletableFuture .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor()); @@ -1371,6 +1403,7 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max if (data.state() == Owned && broker.equals(data.dstBroker())) { cleaned = false; + log.info("[{}] bundle {} is still owned by this, data: {}", broker, serviceUnit, data); break; } } @@ -1784,4 +1817,9 @@ public Set> getOwnershipEntrySet() { public static ServiceUnitStateChannel get(PulsarService pulsar) { return ExtensibleLoadManagerImpl.get(pulsar.getLoadManager().get()).getServiceUnitStateChannel(); } + + @VisibleForTesting + protected void disable() { + channelState = Disabled; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java index 6a98b79be81d0..7dd1035b22046 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java @@ -131,4 +131,4 @@ private boolean invalidUnload(ServiceUnitStateData from, ServiceUnitStateData to || !from.dstBroker().equals(to.sourceBroker()) || from.dstBroker().equals(to.dstBroker()); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java index 307d3a4acb175..61a1606ba6e56 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java @@ -34,7 +34,7 @@ public record ServiceUnitStateData( public ServiceUnitStateData { Objects.requireNonNull(state); - if (StringUtils.isBlank(dstBroker) && StringUtils.isBlank(sourceBroker)) { + if (state != ServiceUnitState.Free && StringUtils.isBlank(dstBroker) && StringUtils.isBlank(sourceBroker)) { throw new IllegalArgumentException("Empty broker"); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java index 48213c18e6376..9863d05ee751e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java @@ -41,7 +41,12 @@ public CompletableFuture> filterAsync(Map { - Optional brokerLoadDataOpt = context.brokerLoadDataStore().get(broker); + final Optional brokerLoadDataOpt; + try { + brokerLoadDataOpt = context.brokerLoadDataStore().get(broker); + } catch (IllegalStateException ignored) { + return false; + } long topics = brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0L); // TODO: The broker load data might be delayed, so the max topic check might not accurate. return topics >= loadBalancerBrokerMaxTopics; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java index a7deeeaad8a5c..157d6a107c511 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java @@ -103,4 +103,10 @@ public interface LoadDataStore extends Closeable { */ void startProducer() throws LoadDataStoreException; -} + /** + * Shutdowns the data store. + */ + default void shutdown() throws IOException { + close(); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index e9289d3ccdac2..672a9a66af07c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -43,20 +43,17 @@ public class TableViewLoadDataStoreImpl implements LoadDataStore { private static final long LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2; + private static final String SHUTDOWN_ERR_MSG = "This load store tableview has been shutdown"; private static final long INIT_TIMEOUT_IN_SECS = 5; - private volatile TableView tableView; private volatile long tableViewLastUpdateTimestamp; - + private volatile long producerLastPublishTimestamp; private volatile Producer producer; - private final ServiceConfiguration conf; - private final PulsarClient client; - private final String topic; - private final Class clazz; + private volatile boolean isShutdown; public TableViewLoadDataStoreImpl(PulsarService pulsar, String topic, Class clazz) throws LoadDataStoreException { @@ -65,6 +62,7 @@ public TableViewLoadDataStoreImpl(PulsarService pulsar, String topic, Class c this.client = pulsar.getClient(); this.topic = topic; this.clazz = clazz; + this.isShutdown = false; } catch (Exception e) { throw new LoadDataStoreException(e); } @@ -72,41 +70,80 @@ public TableViewLoadDataStoreImpl(PulsarService pulsar, String topic, Class c @Override public synchronized CompletableFuture pushAsync(String key, T loadData) { - validateProducer(); - return producer.newMessage().key(key).value(loadData).sendAsync().thenAccept(__ -> {}); + String msg = validateProducer(); + if (StringUtils.isNotBlank(msg)) { + return CompletableFuture.failedFuture(new IllegalStateException(msg)); + } + return producer.newMessage().key(key).value(loadData).sendAsync() + .thenAccept(__ -> producerLastPublishTimestamp = System.currentTimeMillis()); } @Override public synchronized CompletableFuture removeAsync(String key) { - validateProducer(); - return producer.newMessage().key(key).value(null).sendAsync().thenAccept(__ -> {}); + String msg = validateProducer(); + if (StringUtils.isNotBlank(msg)) { + return CompletableFuture.failedFuture(new IllegalStateException(msg)); + } + return producer.newMessage().key(key).value(null).sendAsync() + .thenAccept(__ -> producerLastPublishTimestamp = System.currentTimeMillis()); } @Override public synchronized Optional get(String key) { - validateTableView(); + String msg = validateTableView(); + if (StringUtils.isNotBlank(msg)) { + if (msg.equals(SHUTDOWN_ERR_MSG)) { + return Optional.empty(); + } else { + throw new IllegalStateException(msg); + } + } return Optional.ofNullable(tableView.get(key)); } @Override public synchronized void forEach(BiConsumer action) { - validateTableView(); + String msg = validateTableView(); + if (StringUtils.isNotBlank(msg)) { + throw new IllegalStateException(msg); + } tableView.forEach(action); } public synchronized Set> entrySet() { - validateTableView(); + String msg = validateTableView(); + if (StringUtils.isNotBlank(msg)) { + throw new IllegalStateException(msg); + } return tableView.entrySet(); } @Override public synchronized int size() { - validateTableView(); + String msg = validateTableView(); + if (StringUtils.isNotBlank(msg)) { + throw new IllegalStateException(msg); + } return tableView.size(); } + private void validateState() { + if (isShutdown) { + throw new IllegalStateException(SHUTDOWN_ERR_MSG); + } + } + + + @Override + public synchronized void init() throws IOException { + validateState(); + close(); + start(); + } + @Override public synchronized void closeTableView() throws IOException { + validateState(); if (tableView != null) { tableView.close(); tableView = null; @@ -115,16 +152,26 @@ public synchronized void closeTableView() throws IOException { @Override public synchronized void start() throws LoadDataStoreException { + validateState(); startProducer(); startTableView(); } + private synchronized void closeProducer() throws IOException { + validateState(); + if (producer != null) { + producer.close(); + producer = null; + } + } @Override public synchronized void startTableView() throws LoadDataStoreException { + validateState(); if (tableView == null) { try { tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).createAsync() .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS); + tableViewLastUpdateTimestamp = System.currentTimeMillis(); tableView.forEachAndListen((k, v) -> tableViewLastUpdateTimestamp = System.currentTimeMillis()); } catch (Exception e) { @@ -133,13 +180,14 @@ public synchronized void startTableView() throws LoadDataStoreException { } } } - @Override public synchronized void startProducer() throws LoadDataStoreException { + validateState(); if (producer == null) { try { producer = client.newProducer(Schema.JSON(clazz)).topic(topic).createAsync() .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS); + producerLastPublishTimestamp = System.currentTimeMillis(); } catch (Exception e) { producer = null; throw new LoadDataStoreException(e); @@ -149,38 +197,65 @@ public synchronized void startProducer() throws LoadDataStoreException { @Override public synchronized void close() throws IOException { - if (producer != null) { - producer.close(); - producer = null; + if (isShutdown) { + return; } + closeProducer(); closeTableView(); } @Override - public synchronized void init() throws IOException { + public synchronized void shutdown() throws IOException { close(); - start(); + isShutdown = true; } - private void validateProducer() { - if (producer == null) { + private String validateProducer() { + if (isShutdown) { + return SHUTDOWN_ERR_MSG; + } + String restartReason = getRestartReason(producer, producerLastPublishTimestamp); + if (StringUtils.isNotBlank(restartReason)) { try { + closeProducer(); startProducer(); - log.info("Restarted producer on {}", topic); + log.info("Restarted producer on {}, {}", topic, restartReason); } catch (Exception e) { - log.error("Failed to restart producer on {}", topic, e); - throw new RuntimeException(e); + String msg = "Failed to restart producer on " + topic + ", restart reason: " + restartReason; + log.error(msg, e); + return msg; } } + return null; } - private void validateTableView() { + private String validateTableView() { + if (isShutdown) { + return SHUTDOWN_ERR_MSG; + } + String restartReason = getRestartReason(tableView, tableViewLastUpdateTimestamp); + if (StringUtils.isNotBlank(restartReason)) { + try { + closeTableView(); + startTableView(); + log.info("Restarted tableview on {}, {}", topic, restartReason); + } catch (Exception e) { + String msg = "Failed to tableview on " + topic + ", restart reason: " + restartReason; + log.error(msg, e); + return msg; + } + } + return null; + } + + private String getRestartReason(Object obj, long lastUpdateTimestamp) { + String restartReason = null; - if (tableView == null) { - restartReason = "table view is null"; + if (obj == null) { + restartReason = "object is null"; } else { - long inactiveDuration = System.currentTimeMillis() - tableViewLastUpdateTimestamp; + long inactiveDuration = System.currentTimeMillis() - lastUpdateTimestamp; long threshold = TimeUnit.MINUTES.toMillis(conf.getLoadBalancerReportUpdateMaxIntervalMinutes()) * LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART; if (inactiveDuration > threshold) { @@ -189,17 +264,6 @@ private void validateTableView() { TimeUnit.MILLISECONDS.toSeconds(threshold)); } } - - if (StringUtils.isNotBlank(restartReason)) { - tableViewLastUpdateTimestamp = 0; - try { - closeTableView(); - startTableView(); - log.info("Restarted tableview on {}, {}", topic, restartReason); - } catch (Exception e) { - log.error("Failed to restart tableview on {}", topic, e); - throw new RuntimeException(e); - } - } + return restartReason; } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b4885ffcb6fcc..cb121b663560a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -978,7 +978,12 @@ public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean cl pulsar.getNamespaceService().unloadNamespaceBundle(su, timeout, MILLISECONDS, closeWithoutWaitingClientDisconnect).get(timeout, MILLISECONDS); } catch (Exception e) { - log.warn("Failed to unload namespace bundle {}", su, e); + if (e instanceof ExecutionException + && e.getCause() instanceof ServiceUnitNotReadyException) { + log.warn("Failed to unload namespace bundle {}: {}", su, e.getMessage()); + } else { + log.warn("Failed to unload namespace bundle {}", su, e); + } } } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 5156246bb5efb..8054d781976b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -349,6 +349,9 @@ public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle name @VisibleForTesting @Nonnull CompletableFuture prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) { requireNonNull(namespace); + if (closed.get()) { + return CompletableFuture.completedFuture(null); + } return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace) .thenCompose(namespacePolicies -> { if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) { @@ -372,6 +375,9 @@ public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle name }); initFuture.exceptionally(ex -> { try { + if (closed.get()) { + return null; + } log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); cleanCacheAndCloseReader(namespace, false); @@ -779,14 +785,22 @@ public void close() throws Exception { if (closed.compareAndSet(false, true)) { writerCaches.synchronous().invalidateAll(); readerCaches.values().forEach(future -> { - if (future != null && !future.isCompletedExceptionally()) { - future.thenAccept(reader -> { - try { - reader.close(); - } catch (Exception e) { - log.error("Failed to close reader.", e); - } - }); + try { + final var reader = future.getNow(null); + if (reader != null) { + reader.close(); + log.info("Closed the reader for topic policies"); + } else { + // Avoid blocking the thread that the reader is created + future.thenAccept(SystemTopicClient.Reader::closeAsync).whenComplete((__, e) -> { + if (e == null) { + log.info("Closed the reader for topic policies"); + } else { + log.error("Failed to close the reader for topic policies", e); + } + }); + } + } catch (Throwable ignored) { } }); readerCaches.clear(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java index 41413f3e3a913..fa63ce566c603 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java @@ -22,13 +22,15 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -36,26 +38,33 @@ import org.testng.annotations.Test; @Slf4j +@Test(groups = "broker") public class ExtensibleLoadManagerCloseTest { private static final String clusterName = "test"; - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(1, 0, () -> 0); private final List brokers = new ArrayList<>(); - private PulsarAdmin admin; + private LocalBookkeeperEnsemble bk; @BeforeClass(alwaysRun = true) public void setup() throws Exception { + bk = new LocalBookkeeperEnsemble(1, 0, () -> 0); bk.start(); - for (int i = 0; i < 3; i++) { + } + + private void setupBrokers(int numBrokers) throws Exception { + brokers.clear(); + for (int i = 0; i < numBrokers; i++) { final var broker = new PulsarService(brokerConfig()); broker.start(); brokers.add(broker); } - admin = brokers.get(0).getAdminClient(); - admin.clusters().createCluster(clusterName, ClusterData.builder().build()); - admin.tenants().createTenant("public", TenantInfo.builder() - .allowedClusters(Collections.singleton(clusterName)).build()); - admin.namespaces().createNamespace("public/default"); + final var admin = brokers.get(0).getAdminClient(); + if (!admin.clusters().getClusters().contains(clusterName)) { + admin.clusters().createCluster(clusterName, ClusterData.builder().build()); + admin.tenants().createTenant("public", TenantInfo.builder() + .allowedClusters(Collections.singleton(clusterName)).build()); + admin.namespaces().createNamespace("public/default"); + } } @@ -85,7 +94,9 @@ private ServiceConfiguration brokerConfig() { @Test public void testCloseAfterLoadingBundles() throws Exception { + setupBrokers(3); final var topic = "test"; + final var admin = brokers.get(0).getAdminClient(); admin.topics().createPartitionedTopic(topic, 20); admin.lookups().lookupPartitionedTopic(topic); final var client = PulsarClient.builder().serviceUrl(brokers.get(0).getBrokerServiceUrl()).build(); @@ -104,4 +115,25 @@ public void testCloseAfterLoadingBundles() throws Exception { Assert.assertTrue(closeTimeMs < 5000L); } } + + @Test + public void testLookup() throws Exception { + setupBrokers(1); + final var topic = "test-lookup"; + final var numPartitions = 16; + final var admin = brokers.get(0).getAdminClient(); + admin.topics().createPartitionedTopic(topic, numPartitions); + + final var futures = new ArrayList>(); + for (int i = 0; i < numPartitions; i++) { + futures.add(admin.lookups().lookupTopicAsync(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i)); + } + FutureUtil.waitForAll(futures).get(); + + final var start = System.currentTimeMillis(); + brokers.get(0).close(); + final var closeTimeMs = System.currentTimeMillis() - start; + log.info("Broker close time: {}", closeTimeMs); + Assert.assertTrue(closeTimeMs < 5000L); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index e569f0d32d573..af42649436bd4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -1551,6 +1551,9 @@ public void testOverrideInactiveBrokerStateData() assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get()); assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally()); assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty()); + ServiceUnitStateChannel finalLeaderChannel = leaderChannel; + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> getCleanupJobs(finalLeaderChannel).isEmpty()); // clean-up FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true); @@ -1621,8 +1624,8 @@ public void testOverrideOrphanStateData() .monitorOwnerships(List.of(brokerId1, brokerId2, "broker-3")); ServiceUnitStateChannel finalLeaderChannel = leaderChannel; - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> getCleanupJobs(finalLeaderChannel).isEmpty()); - + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> getCleanupJobs(finalLeaderChannel).isEmpty()); waitUntilNewOwner(channel2, releasingBundle1, brokerId2); waitUntilNewOwner(channel2, releasingBundle2, brokerId2); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java index d25cba2bd1bdd..6d4e0f00c93b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java @@ -18,8 +18,12 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.store; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; import static org.testng.AssertJUnit.assertTrue; import com.google.common.collect.Sets; @@ -33,6 +37,7 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -75,8 +80,6 @@ public void testPushGetAndRemove() throws Exception { @Cleanup LoadDataStore loadDataStore = LoadDataStoreFactory.create(pulsar, topic, MyClass.class); - loadDataStore.startProducer(); - loadDataStore.startTableView(); MyClass myClass1 = new MyClass("1", 1); loadDataStore.pushAsync("key1", myClass1).get(); @@ -109,8 +112,6 @@ public void testForEach() throws Exception { @Cleanup LoadDataStore loadDataStore = LoadDataStoreFactory.create(pulsar, topic, Integer.class); - loadDataStore.startProducer(); - loadDataStore.startTableView(); Map map = new HashMap<>(); for (int i = 0; i < 10; i++) { @@ -134,9 +135,6 @@ public void testTableViewRestart() throws Exception { String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID(); LoadDataStore loadDataStore = LoadDataStoreFactory.create(pulsar, topic, Integer.class); - loadDataStore.startProducer(); - - loadDataStore.startTableView(); loadDataStore.pushAsync("1", 1).get(); Awaitility.await().untilAsserted(() -> assertEquals(loadDataStore.size(), 1)); assertEquals(loadDataStore.get("1").get(), 1); @@ -150,6 +148,31 @@ public void testTableViewRestart() throws Exception { Awaitility.await().untilAsserted(() -> assertEquals(loadDataStore.get("1").get(), 3)); } + @Test + public void testProducerRestart() throws Exception { + String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID(); + var loadDataStore = + (TableViewLoadDataStoreImpl) spy(LoadDataStoreFactory.create(pulsar, topic, Integer.class)); + + // happy case + loadDataStore.pushAsync("1", 1).get(); + Awaitility.await().untilAsserted(() -> assertEquals(loadDataStore.size(), 1)); + assertEquals(loadDataStore.get("1").get(), 1); + verify(loadDataStore, times(1)).startProducer(); + + // loadDataStore will restart producer if null. + FieldUtils.writeField(loadDataStore, "producer", null, true); + loadDataStore.pushAsync("1", 2).get(); + Awaitility.await().untilAsserted(() -> assertEquals(loadDataStore.get("1").get(), 2)); + verify(loadDataStore, times(2)).startProducer(); + + // loadDataStore will restart producer if too slow. + FieldUtils.writeField(loadDataStore, "producerLastPublishTimestamp", 0 , true); + loadDataStore.pushAsync("1", 3).get(); + Awaitility.await().untilAsserted(() -> assertEquals(loadDataStore.get("1").get(), 3)); + verify(loadDataStore, times(3)).startProducer(); + } + @Test public void testProducerStop() throws Exception { String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID(); @@ -165,4 +188,25 @@ public void testProducerStop() throws Exception { loadDataStore.removeAsync("2").get(); } -} + @Test + public void testShutdown() throws Exception { + String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID(); + LoadDataStore loadDataStore = + LoadDataStoreFactory.create(pulsar, topic, Integer.class); + loadDataStore.start(); + loadDataStore.shutdown(); + + Assert.assertTrue(loadDataStore.pushAsync("2", 2).isCompletedExceptionally()); + Assert.assertTrue(loadDataStore.removeAsync("2").isCompletedExceptionally()); + assertTrue(loadDataStore.get("2").isEmpty()); + assertThrows(IllegalStateException.class, loadDataStore::size); + assertThrows(IllegalStateException.class, loadDataStore::entrySet); + assertThrows(IllegalStateException.class, () -> loadDataStore.forEach((k, v) -> {})); + assertThrows(IllegalStateException.class, loadDataStore::init); + assertThrows(IllegalStateException.class, loadDataStore::start); + assertThrows(IllegalStateException.class, loadDataStore::startProducer); + assertThrows(IllegalStateException.class, loadDataStore::startTableView); + assertThrows(IllegalStateException.class, loadDataStore::closeTableView); + } + +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index d5d4174ee10a9..4f52060497864 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -364,8 +364,8 @@ private void readAllExistingMessages(Reader reader, CompletableFuture f } }).exceptionally(ex -> { if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { - log.error("Reader {} was closed while reading existing messages.", - reader.getTopic(), ex); + log.info("Reader {} was closed while reading existing messages.", + reader.getTopic()); } else { log.warn("Reader {} was interrupted while reading existing messages. ", reader.getTopic(), ex); @@ -393,8 +393,7 @@ private void readTailMessages(Reader reader) { readTailMessages(reader); }).exceptionally(ex -> { if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { - log.error("Reader {} was closed while reading tail messages.", - reader.getTopic(), ex); + log.info("Reader {} was closed while reading tail messages.", reader.getTopic()); // Fail all refresh request when no more messages can be read. pendingRefreshRequests.keySet().forEach(future -> { pendingRefreshRequests.remove(future);