Skip to content

Commit

Permalink
[fix][broker] Fix unloadNamespaceBundlesGracefully can be stuck with …
Browse files Browse the repository at this point in the history
…extensible load manager (apache#23349)
  • Loading branch information
BewareMyPower authored Sep 27, 2024
1 parent 5583102 commit e91574a
Show file tree
Hide file tree
Showing 14 changed files with 240 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ public CompletableFuture<Void> closeAsync() {
return closeFuture;
}
LOG.info("Closing PulsarService");
if (topicPoliciesService != null) {
topicPoliciesService.close();
}
if (brokerService != null) {
brokerService.unloadNamespaceBundlesGracefully();
}
Expand Down Expand Up @@ -633,10 +636,6 @@ public CompletableFuture<Void> closeAsync() {
transactionBufferClient.close();
}

if (topicPoliciesService != null) {
topicPoliciesService.close();
topicPoliciesService = null;
}

if (client != null) {
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,14 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS

private SplitManager splitManager;

volatile boolean started = false;
enum State {
INIT,
RUNNING,
// It's removing visibility of the current broker from other brokers. In this state, it cannot play as a leader
// or follower.
DISABLED,
}
private final AtomicReference<State> state = new AtomicReference<>(State.INIT);

private boolean configuredSystemTopics = false;

Expand Down Expand Up @@ -214,7 +221,7 @@ public CompletableFuture<Set<NamespaceBundle>> getOwnedServiceUnitsAsync() {
}

public Set<NamespaceBundle> getOwnedServiceUnits() {
if (!started) {
if (state.get() == State.INIT) {
log.warn("Failed to get owned service units, load manager is not started.");
return Collections.emptySet();
}
Expand Down Expand Up @@ -344,7 +351,7 @@ public static CompletableFuture<Optional<BrokerLookupData>> getAssignedBrokerLoo

@Override
public void start() throws PulsarServerException {
if (this.started) {
if (state.get() != State.INIT) {
return;
}
try {
Expand Down Expand Up @@ -443,7 +450,9 @@ public void start() throws PulsarServerException {

this.splitScheduler.start();
this.initWaiter.complete(true);
this.started = true;
if (!state.compareAndSet(State.INIT, State.RUNNING)) {
failForUnexpectedState("start");
}
log.info("Started load manager.");
} catch (Throwable e) {
failStarting(e);
Expand Down Expand Up @@ -615,21 +624,17 @@ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle,
filter.filterAsync(availableBrokerCandidates, bundle, context);
futures.add(future);
}
CompletableFuture<Optional<String>> result = new CompletableFuture<>();
FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
if (ex != null) {
// TODO: We may need to revisit this error case.
log.error("Failed to filter out brokers when select bundle: {}", bundle, ex);
}
return FutureUtil.waitForAll(futures).exceptionally(e -> {
// TODO: We may need to revisit this error case.
log.error("Failed to filter out brokers when select bundle: {}", bundle, e);
return null;
}).thenApply(__ -> {
if (availableBrokerCandidates.isEmpty()) {
result.complete(Optional.empty());
return;
return Optional.empty();
}
Set<String> candidateBrokers = availableBrokerCandidates.keySet();

result.complete(getBrokerSelectionStrategy().select(candidateBrokers, bundle, context));
return getBrokerSelectionStrategy().select(candidateBrokers, bundle, context);
});
return result;
});
}

Expand Down Expand Up @@ -667,6 +672,9 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
boolean force,
long timeout,
TimeUnit timeoutUnit) {
if (state.get() == State.INIT) {
return CompletableFuture.completedFuture(null);
}
if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -755,24 +763,11 @@ private CompletableFuture<Void> splitAsync(SplitDecision decision,

@Override
public void close() throws PulsarServerException {
if (!this.started) {
if (state.get() == State.INIT) {
return;
}
try {
if (brokerLoadDataReportTask != null) {
brokerLoadDataReportTask.cancel(true);
}

if (topBundlesLoadDataReportTask != null) {
topBundlesLoadDataReportTask.cancel(true);
}

if (monitorTask != null) {
monitorTask.cancel(true);
}

this.brokerLoadDataStore.shutdown();
this.topBundlesLoadDataStore.shutdown();
stopLoadDataReportTasks();
this.unloadScheduler.close();
this.splitScheduler.close();
this.serviceUnitStateTableViewSyncer.close();
Expand All @@ -791,14 +786,36 @@ public void close() throws PulsarServerException {
} catch (Exception e) {
throw new PulsarServerException(e);
} finally {
this.started = false;
state.set(State.INIT);
}
}

}
}
}

private void stopLoadDataReportTasks() {
if (brokerLoadDataReportTask != null) {
brokerLoadDataReportTask.cancel(true);
}
if (topBundlesLoadDataReportTask != null) {
topBundlesLoadDataReportTask.cancel(true);
}
if (monitorTask != null) {
monitorTask.cancel(true);
}
try {
brokerLoadDataStore.shutdown();
} catch (IOException e) {
log.warn("Failed to shutdown brokerLoadDataStore", e);
}
try {
topBundlesLoadDataStore.shutdown();
} catch (IOException e) {
log.warn("Failed to shutdown topBundlesLoadDataStore", e);
}
}

public static boolean isInternalTopic(String topic) {
return INTERNAL_TOPICS.contains(topic)
|| topic.startsWith(TOPIC)
Expand All @@ -814,13 +831,16 @@ synchronized void playLeader() {
boolean becameFollower = false;
while (!Thread.currentThread().isInterrupted()) {
try {
if (!initWaiter.get()) {
if (!initWaiter.get() || disabled()) {
return;
}
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
}
if (disabled()) {
return;
}
// Confirm the system topics have been created or create them if they do not exist.
// If the leader has changed, the new leader need to reset
// the local brokerService.topics (by this topic creations).
Expand All @@ -835,6 +855,11 @@ synchronized void playLeader() {
}
break;
} catch (Throwable e) {
if (disabled()) {
log.warn("The broker:{} failed to set the role but exit because it's disabled",
pulsar.getBrokerId(), e);
return;
}
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Expand All @@ -846,6 +871,9 @@ synchronized void playLeader() {
}
}
}
if (disabled()) {
return;
}

if (becameFollower) {
log.warn("The broker:{} became follower while initializing leader role.", pulsar.getBrokerId());
Expand All @@ -869,13 +897,16 @@ synchronized void playFollower() {
boolean becameLeader = false;
while (!Thread.currentThread().isInterrupted()) {
try {
if (!initWaiter.get()) {
if (!initWaiter.get() || disabled()) {
return;
}
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
}
if (disabled()) {
return;
}
unloadScheduler.close();
serviceUnitStateChannel.cancelOwnershipMonitor();
closeInternalTopics();
Expand All @@ -885,6 +916,11 @@ synchronized void playFollower() {
serviceUnitStateTableViewSyncer.close();
break;
} catch (Throwable e) {
if (disabled()) {
log.warn("The broker:{} failed to set the role but exit because it's disabled",
pulsar.getBrokerId(), e);
return;
}
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Expand All @@ -896,6 +932,9 @@ synchronized void playFollower() {
}
}
}
if (disabled()) {
return;
}

if (becameLeader) {
log.warn("This broker:{} became leader while initializing follower role.", pulsar.getBrokerId());
Expand Down Expand Up @@ -982,9 +1021,20 @@ protected void monitor() {
}

public void disableBroker() throws Exception {
// TopicDoesNotExistException might be thrown and it's not recoverable. Enable this flag to exit playFollower()
// or playLeader() quickly.
if (!state.compareAndSet(State.RUNNING, State.DISABLED)) {
failForUnexpectedState("disableBroker");
}
stopLoadDataReportTasks();
serviceUnitStateChannel.cleanOwnerships();
leaderElectionService.close();
brokerRegistry.unregister();
leaderElectionService.close();
final var availableBrokers = brokerRegistry.getAvailableBrokersAsync()
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
if (availableBrokers.isEmpty()) {
close();
}
// Close the internal topics (if owned any) after giving up the possible leader role,
// so that the subsequent lookups could hit the next leader.
closeInternalTopics();
Expand Down Expand Up @@ -1018,4 +1068,16 @@ protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) {
protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) {
return new ServiceUnitStateChannelImpl(pulsar);
}

private void failForUnexpectedState(String msg) {
throw new IllegalStateException("Failed to " + msg + ", state: " + state.get());
}

boolean running() {
return state.get() == State.RUNNING;
}

private boolean disabled() {
return state.get() == State.DISABLED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void start() throws PulsarServerException {
}

public boolean started() {
return loadManager.started && loadManager.getServiceUnitStateChannel().started();
return loadManager.running() && loadManager.getServiceUnitStateChannel().started();
}

@Override
Expand Down
Loading

0 comments on commit e91574a

Please sign in to comment.