Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix the broker close hanged issue. #15755

Merged
merged 5 commits into from
May 26, 2022

Conversation

Technoboy-
Copy link
Contributor

@Technoboy- Technoboy- commented May 24, 2022

Master issue #15643, #15753

Motivation

Blocked at BrokerService#unloadNamespaceBundlesGracefully:

2022-05-20T03:37:05.4960249Z "main" #1 prio=5 os_prio=0 cpu=32274.29ms elapsed=2566.54s tid=0x00007fd108024380 nid=0x1af8f waiting on condition  [0x00007fd10fcd0000]
2022-05-20T03:37:05.4960659Z    java.lang.Thread.State: WAITING (parking)
2022-05-20T03:37:05.4961114Z 	at jdk.internal.misc.Unsafe.park(java.base@17.0.3/Native Method)
2022-05-20T03:37:05.4961875Z 	- parking to wait for  <0x00000000cdf00010> (a java.util.concurrent.CompletableFuture$Signaller)
2022-05-20T03:37:05.4962343Z 	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.3/LockSupport.java:211)
2022-05-20T03:37:05.4963171Z 	at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.3/CompletableFuture.java:1864)
2022-05-20T03:37:05.4963683Z 	at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.3/ForkJoinPool.java:3463)
2022-05-20T03:37:05.4964169Z 	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.3/ForkJoinPool.java:3434)
2022-05-20T03:37:05.4964660Z 	at java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.3/CompletableFuture.java:1898)
2022-05-20T03:37:05.4965158Z 	at java.util.concurrent.CompletableFuture.get(java.base@17.0.3/CompletableFuture.java:2072)
2022-05-20T03:37:05.4965715Z 	at org.apache.pulsar.broker.service.BrokerService.lambda$unloadNamespaceBundlesGracefully$21(BrokerService.java:919)
2022-05-20T03:37:05.4966467Z 	at org.apache.pulsar.broker.service.BrokerService$$Lambda$1164/0x0000000801527c70.accept(Unknown Source)
2022-05-20T03:37:05.4966882Z 	at java.lang.Iterable.forEach(java.base@17.0.3/Iterable.java:75)
2022-05-20T03:37:05.4967408Z 	at org.apache.pulsar.broker.service.BrokerService.unloadNamespaceBundlesGracefully(BrokerService.java:911)
2022-05-20T03:37:05.4968078Z 	at org.apache.pulsar.broker.service.BrokerService.unloadNamespaceBundlesGracefully(BrokerService.java:887)
2022-05-20T03:37:05.4968664Z 	at org.apache.pulsar.broker.service.BrokerService.closeAsync(BrokerService.java:732)
2022-05-20T03:37:05.4969579Z 	at org.apache.pulsar.broker.PulsarService.closeAsync(PulsarService.java:450)
2022-05-20T03:37:05.4970123Z 	at org.apache.pulsar.broker.PulsarService.close(PulsarService.java:372)
2022-05-20T03:37:05.4970720Z 	at 

Blocked at CoordinationServiceImpl#close

2022-05-20T01:17:56.3359346Z "main" #1 prio=5 os_prio=0 cpu=11209.07ms elapsed=3506.06s tid=0x00007f9484024380 nid=0xaba waiting on condition  [0x00007f9489edd000]
2022-05-20T01:17:56.3361587Z    java.lang.Thread.State: WAITING (parking)
2022-05-20T01:17:56.3363789Z 	at jdk.internal.misc.Unsafe.park(java.base@17.0.3/Native Method)
2022-05-20T01:17:56.3366545Z 	- parking to wait for  <0x00000000cd180010> (a java.util.concurrent.CompletableFuture$Signaller)
2022-05-20T01:17:56.3368917Z 	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.3/LockSupport.java:211)
2022-05-20T01:17:56.3371298Z 	at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.3/CompletableFuture.java:1864)
2022-05-20T01:17:56.3373823Z 	at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.3/ForkJoinPool.java:3463)
2022-05-20T01:17:56.3376212Z 	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.3/ForkJoinPool.java:3434)
2022-05-20T01:17:56.3378608Z 	at java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.3/CompletableFuture.java:1898)
2022-05-20T01:17:56.3380999Z 	at java.util.concurrent.CompletableFuture.join(java.base@17.0.3/CompletableFuture.java:2117)
2022-05-20T01:17:56.3383947Z 	at org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl.close(CoordinationServiceImpl.java:72)
2022-05-20T01:17:56.3386574Z 	at org.apache.pulsar.broker.PulsarService.closeAsync(PulsarService.java:526)
2022-05-20T01:17:56.3388569Z 	at org.apache.pulsar.broker.PulsarService.close(PulsarService.java:372)

For BrokerService#unloadNamespaceBundlesGracefully, the request chain :

brokerService.closeAsync() -> OwnedBundle.handleUnloadRequest -> pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle) -> OwnershipCache.removeOwnership ->
ResourceLock.release 

For CoordinationServiceImpl#close, the request chain :

CoordinationServiceImpl.close -> LockManager.asyncClose -> ResourceLock.release

We find that it's all related to ResourceLock#release.

As the CI using the MockedZooKeeper, I find that if there are some RuntimeException, the response could never finish. So I add the catch block to ensure that all the requests will reply. But I'm not sure if the return code is right.

public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode,
final StringCallback cb, final Object ctx) {
executor.execute(() -> {
lock();
try {
if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
return;
}
final Set<Watcher> toNotifyCreate = Sets.newHashSet();
toNotifyCreate.addAll(watchers.get(path));
final Set<Watcher> toNotifyParent = Sets.newHashSet();
final String parent = path.substring(0, path.lastIndexOf("/"));
if (!parent.isEmpty()) {
toNotifyParent.addAll(watchers.get(parent));
}
final String name;
if (createMode != null && createMode.isSequential()) {
name = path + sequentialIdGenerator.getAndIncrement();
} else {
name = path;
}
Optional<KeeperException.Code> failure = programmedFailure(Op.CREATE, path);
if (failure.isPresent()) {
unlockIfLocked();
cb.processResult(failure.get().intValue(), path, ctx, null);
} else if (stopped) {
unlockIfLocked();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
} else if (tree.containsKey(path)) {
unlockIfLocked();
cb.processResult(KeeperException.Code.NODEEXISTS.intValue(), path, ctx, null);
} else if (!parent.isEmpty() && !tree.containsKey(parent)) {
unlockIfLocked();
toNotifyParent.forEach(watcher -> watcher
.process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected,
parent)));
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
} else {
tree.put(name, MockZNode.of(data, 0,
createMode != null && createMode.isEphemeral() ? getEphemeralOwner() : -1L));
watchers.removeAll(name);
unlockIfLocked();
cb.processResult(0, path, ctx, name);
triggerPersistentWatches(path, parent, EventType.NodeCreated);
toNotifyCreate.forEach(
watcher -> watcher.process(
new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected,
name)));
toNotifyParent.forEach(
watcher -> watcher.process(
new WatchedEvent(EventType.NodeChildrenChanged,
KeeperState.SyncConnected,
parent)));
}
} finally {
unlockIfLocked();
}
});
}

public void delete(final String path, int version, final VoidCallback cb, final Object ctx) {
Runnable r = () -> {
lock();
try {
final Set<Watcher> toNotifyDelete = Sets.newHashSet();
toNotifyDelete.addAll(watchers.get(path));
final Set<Watcher> toNotifyParent = Sets.newHashSet();
final String parent = path.substring(0, path.lastIndexOf("/"));
if (!parent.isEmpty()) {
toNotifyParent.addAll(watchers.get(parent));
}
watchers.removeAll(path);
Optional<KeeperException.Code> failure = programmedFailure(Op.DELETE, path);
if (failure.isPresent()) {
unlockIfLocked();
cb.processResult(failure.get().intValue(), path, ctx);
} else if (stopped) {
unlockIfLocked();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx);
} else if (!tree.containsKey(path)) {
unlockIfLocked();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx);
} else if (hasChildren(path)) {
unlockIfLocked();
cb.processResult(KeeperException.Code.NOTEMPTY.intValue(), path, ctx);
} else {
if (version != -1) {
int currentVersion = tree.get(path).getVersion();
if (version != currentVersion) {
unlockIfLocked();
cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx);
return;
}
}
tree.remove(path);
unlockIfLocked();
cb.processResult(0, path, ctx);
toNotifyDelete.forEach(watcher -> watcher
.process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path)));
toNotifyParent.forEach(watcher -> watcher
.process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected,
parent)));
triggerPersistentWatches(path, parent, EventType.NodeDeleted);
}
} finally {
unlockIfLocked();
}
};
try {
executor.execute(r);
} catch (RejectedExecutionException ree) {
cb.processResult(KeeperException.Code.SESSIONEXPIRED.intValue(), path, ctx);
}
}

More, the current close process has some order issues. LoadManager is closed before BrokerService, but BrokerService closes need to invoke LoadManager, even though the LoadManager is stateless, but is a little confused here.

LoadManager loadManager = this.loadManager.get();
if (loadManager != null) {
loadManager.stop();
}
List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
if (this.brokerService != null) {
CompletableFuture<Void> brokerCloseFuture = this.brokerService.closeAsync();
if (this.transactionMetadataStoreService != null) {
asyncCloseFutures.add(brokerCloseFuture.whenComplete((__, ___) -> {

public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean closeWithoutWaitingClientDisconnect) {
try {
log.info("Unloading namespace-bundles...");
// make broker-node unavailable from the cluster
if (pulsar.getLoadManager() != null && pulsar.getLoadManager().get() != null) {
try {
pulsar.getLoadManager().get().disableBroker();
} catch (PulsarServerException.NotFoundException ne) {
log.warn("Broker load-manager znode doesn't exist ", ne);
// still continue and release bundle ownership as broker's registration node doesn't exist.
}
}

Documentation

  • no-need-doc
    (Please explain why)

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label May 24, 2022
@Technoboy- Technoboy- closed this May 24, 2022
@Technoboy- Technoboy- reopened this May 24, 2022
@Technoboy- Technoboy- changed the title [WIP]Fix potential race condition when close LockManager [fix][broker] Fix the broker close hanged issue. May 25, 2022
@Technoboy- Technoboy- marked this pull request as ready for review May 25, 2022 13:57
@Technoboy- Technoboy- added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label May 25, 2022
@Technoboy- Technoboy- added this to the 2.11.0 milestone May 25, 2022
@Technoboy- Technoboy- requested review from lhotari and merlimat May 25, 2022 14:13
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Nice work!

@lhotari
Copy link
Member

lhotari commented Aug 26, 2022

@Technoboy- A CI build got stuck in ModularLoadManagerImpl.disableBroker / ResourceLock.release. Is it related to this PR? Please check the stack trace at #15643 (comment) .

nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Sep 16, 2022
gaozhangmin pushed a commit to gaozhangmin/pulsar that referenced this pull request Apr 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants