Skip to content

Commit

Permalink
[fix][broker] Fix unloadNamespaceBundlesGracefully can be stuck with …
Browse files Browse the repository at this point in the history
…extensible load manager (apache#23349)

(cherry picked from commit e91574a)
  • Loading branch information
BewareMyPower authored and heesung-sn committed Oct 22, 2024
1 parent 10ca808 commit 3f5fb9f
Show file tree
Hide file tree
Showing 16 changed files with 403 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,9 @@ public CompletableFuture<Void> closeAsync() {
return closeFuture;
}
LOG.info("Closing PulsarService");
if (topicPoliciesService != null) {
topicPoliciesService.close();
}
if (brokerService != null) {
brokerService.unloadNamespaceBundlesGracefully();
}
Expand Down Expand Up @@ -575,10 +578,6 @@ public CompletableFuture<Void> closeAsync() {
transactionBufferClient.close();
}

if (topicPoliciesService != null) {
topicPoliciesService.close();
topicPoliciesService = null;
}

if (client != null) {
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,14 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS

private SplitManager splitManager;

private volatile boolean started = false;
enum State {
INIT,
RUNNING,
// It's removing visibility of the current broker from other brokers. In this state, it cannot play as a leader
// or follower.
DISABLED,
}
private final AtomicReference<State> state = new AtomicReference<>(State.INIT);

private boolean configuredSystemTopics = false;

Expand Down Expand Up @@ -210,7 +217,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
* Get all the bundles that are owned by this broker.
*/
public CompletableFuture<Set<NamespaceBundle>> getOwnedServiceUnitsAsync() {
if (!started) {
if (state.get() == State.INIT) {
log.warn("Failed to get owned service units, load manager is not started.");
return CompletableFuture.completedFuture(Collections.emptySet());
}
Expand Down Expand Up @@ -373,7 +380,7 @@ public static CompletableFuture<Optional<BrokerLookupData>> getAssignedBrokerLoo

@Override
public void start() throws PulsarServerException {
if (this.started) {
if (state.get() != State.INIT) {
return;
}
try {
Expand Down Expand Up @@ -471,7 +478,9 @@ public void start() throws PulsarServerException {

this.splitScheduler.start();
this.initWaiter.complete(true);
this.started = true;
if (!state.compareAndSet(State.INIT, State.RUNNING)) {
failForUnexpectedState("start");
}
log.info("Started load manager.");
} catch (Throwable e) {
failStarting(e);
Expand Down Expand Up @@ -643,21 +652,17 @@ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle,
filter.filterAsync(availableBrokerCandidates, bundle, context);
futures.add(future);
}
CompletableFuture<Optional<String>> result = new CompletableFuture<>();
FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
if (ex != null) {
// TODO: We may need to revisit this error case.
log.error("Failed to filter out brokers when select bundle: {}", bundle, ex);
}
return FutureUtil.waitForAll(futures).exceptionally(e -> {
// TODO: We may need to revisit this error case.
log.error("Failed to filter out brokers when select bundle: {}", bundle, e);
return null;
}).thenApply(__ -> {
if (availableBrokerCandidates.isEmpty()) {
result.complete(Optional.empty());
return;
return Optional.empty();
}
Set<String> candidateBrokers = availableBrokerCandidates.keySet();

result.complete(getBrokerSelectionStrategy().select(candidateBrokers, bundle, context));
return getBrokerSelectionStrategy().select(candidateBrokers, bundle, context);
});
return result;
});
}

Expand Down Expand Up @@ -695,6 +700,9 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
boolean force,
long timeout,
TimeUnit timeoutUnit) {
if (state.get() == State.INIT) {
return CompletableFuture.completedFuture(null);
}
if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -783,28 +791,13 @@ private CompletableFuture<Void> splitAsync(SplitDecision decision,

@Override
public void close() throws PulsarServerException {
if (!this.started) {
if (state.get() == State.INIT) {
return;
}
try {
if (brokerLoadDataReportTask != null) {
brokerLoadDataReportTask.cancel(true);
}

if (topBundlesLoadDataReportTask != null) {
topBundlesLoadDataReportTask.cancel(true);
}

if (monitorTask != null) {
monitorTask.cancel(true);
}

this.brokerLoadDataStore.close();
this.topBundlesLoadDataStore.close();
stopLoadDataReportTasks();
this.unloadScheduler.close();
this.splitScheduler.close();
} catch (IOException ex) {
throw new PulsarServerException(ex);
} finally {
try {
this.brokerRegistry.close();
Expand All @@ -818,14 +811,36 @@ public void close() throws PulsarServerException {
} catch (Exception e) {
throw new PulsarServerException(e);
} finally {
this.started = false;
state.set(State.INIT);
}
}

}
}
}

private void stopLoadDataReportTasks() {
if (brokerLoadDataReportTask != null) {
brokerLoadDataReportTask.cancel(true);
}
if (topBundlesLoadDataReportTask != null) {
topBundlesLoadDataReportTask.cancel(true);
}
if (monitorTask != null) {
monitorTask.cancel(true);
}
try {
brokerLoadDataStore.shutdown();
} catch (IOException e) {
log.warn("Failed to shutdown brokerLoadDataStore", e);
}
try {
topBundlesLoadDataStore.shutdown();
} catch (IOException e) {
log.warn("Failed to shutdown topBundlesLoadDataStore", e);
}
}

public static boolean isInternalTopic(String topic) {
return INTERNAL_TOPICS.contains(topic)
|| topic.startsWith(TOPIC)
Expand All @@ -841,13 +856,16 @@ synchronized void playLeader() {
boolean becameFollower = false;
while (!Thread.currentThread().isInterrupted()) {
try {
if (!initWaiter.get()) {
if (!initWaiter.get() || disabled()) {
return;
}
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
}
if (disabled()) {
return;
}
// Confirm the system topics have been created or create them if they do not exist.
// If the leader has changed, the new leader need to reset
// the local brokerService.topics (by this topic creations).
Expand All @@ -859,6 +877,11 @@ synchronized void playLeader() {
serviceUnitStateChannel.scheduleOwnershipMonitor();
break;
} catch (Throwable e) {
if (disabled()) {
log.warn("The broker:{} failed to set the role but exit because it's disabled",
pulsar.getBrokerId(), e);
return;
}
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Expand All @@ -870,6 +893,9 @@ synchronized void playLeader() {
}
}
}
if (disabled()) {
return;
}

if (becameFollower) {
log.warn("The broker:{} became follower while initializing leader role.", pulsar.getBrokerId());
Expand All @@ -893,13 +919,16 @@ synchronized void playFollower() {
boolean becameLeader = false;
while (!Thread.currentThread().isInterrupted()) {
try {
if (!initWaiter.get()) {
if (!initWaiter.get() || disabled()) {
return;
}
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
}
if (disabled()) {
return;
}
unloadScheduler.close();
serviceUnitStateChannel.cancelOwnershipMonitor();
closeInternalTopics();
Expand All @@ -908,6 +937,11 @@ synchronized void playFollower() {
topBundlesLoadDataStore.startProducer();
break;
} catch (Throwable e) {
if (disabled()) {
log.warn("The broker:{} failed to set the role but exit because it's disabled",
pulsar.getBrokerId(), e);
return;
}
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Expand All @@ -919,6 +953,9 @@ synchronized void playFollower() {
}
}
}
if (disabled()) {
return;
}

if (becameLeader) {
log.warn("This broker:{} became leader while initializing follower role.", pulsar.getBrokerId());
Expand Down Expand Up @@ -997,9 +1034,20 @@ protected void monitor() {
}

public void disableBroker() throws Exception {
// TopicDoesNotExistException might be thrown and it's not recoverable. Enable this flag to exit playFollower()
// or playLeader() quickly.
if (!state.compareAndSet(State.RUNNING, State.DISABLED)) {
failForUnexpectedState("disableBroker");
}
stopLoadDataReportTasks();
serviceUnitStateChannel.cleanOwnerships();
leaderElectionService.close();
brokerRegistry.unregister();
leaderElectionService.close();
final var availableBrokers = brokerRegistry.getAvailableBrokersAsync()
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
if (availableBrokers.isEmpty()) {
close();
}
// Close the internal topics (if owned any) after giving up the possible leader role,
// so that the subsequent lookups could hit the next leader.
closeInternalTopics();
Expand Down Expand Up @@ -1033,4 +1081,16 @@ protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) {
protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) {
return new ServiceUnitStateChannelImpl(pulsar);
}

private void failForUnexpectedState(String msg) {
throw new IllegalStateException("Failed to " + msg + ", state: " + state.get());
}

boolean running() {
return state.get() == State.RUNNING;
}

private boolean disabled() {
return state.get() == State.DISABLED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public void start() throws PulsarServerException {
loadManager.start();
}

public boolean started() {
return loadManager.running() && loadManager.getServiceUnitStateChannel().started();
}

@Override
public void initialize(PulsarService pulsar) {
loadManager.initialize(pulsar);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public interface ServiceUnitStateChannel extends Closeable {
*/
void start() throws PulsarServerException;

/**
* Whether the channel started.
*/
boolean started();

/**
* Closes the ServiceUnitStateChannel.
* @throws PulsarServerException if it fails to close the channel.
Expand Down
Loading

0 comments on commit 3f5fb9f

Please sign in to comment.