Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-2.10][cherry-pick] Fix the broker close hanged issue. #17689

Merged
merged 1 commit into from
Sep 16, 2022

Conversation

Technoboy-
Copy link
Contributor

Cherry-pick #15755
Master issue #15643, #15753

Motivation

Blocked at BrokerService#unloadNamespaceBundlesGracefully:

2022-05-20T03:37:05.4960249Z "main" #1 prio=5 os_prio=0 cpu=32274.29ms elapsed=2566.54s tid=0x00007fd108024380 nid=0x1af8f waiting on condition  [0x00007fd10fcd0000]
2022-05-20T03:37:05.4960659Z    java.lang.Thread.State: WAITING (parking)
2022-05-20T03:37:05.4961114Z 	at jdk.internal.misc.Unsafe.park(java.base@17.0.3/Native Method)
2022-05-20T03:37:05.4961875Z 	- parking to wait for  <0x00000000cdf00010> (a java.util.concurrent.CompletableFuture$Signaller)
2022-05-20T03:37:05.4962343Z 	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.3/LockSupport.java:211)
2022-05-20T03:37:05.4963171Z 	at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.3/CompletableFuture.java:1864)
2022-05-20T03:37:05.4963683Z 	at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.3/ForkJoinPool.java:3463)
2022-05-20T03:37:05.4964169Z 	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.3/ForkJoinPool.java:3434)
2022-05-20T03:37:05.4964660Z 	at java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.3/CompletableFuture.java:1898)
2022-05-20T03:37:05.4965158Z 	at java.util.concurrent.CompletableFuture.get(java.base@17.0.3/CompletableFuture.java:2072)
2022-05-20T03:37:05.4965715Z 	at org.apache.pulsar.broker.service.BrokerService.lambda$unloadNamespaceBundlesGracefully$21(BrokerService.java:919)
2022-05-20T03:37:05.4966467Z 	at org.apache.pulsar.broker.service.BrokerService$$Lambda$1164/0x0000000801527c70.accept(Unknown Source)
2022-05-20T03:37:05.4966882Z 	at java.lang.Iterable.forEach(java.base@17.0.3/Iterable.java:75)
2022-05-20T03:37:05.4967408Z 	at org.apache.pulsar.broker.service.BrokerService.unloadNamespaceBundlesGracefully(BrokerService.java:911)
2022-05-20T03:37:05.4968078Z 	at org.apache.pulsar.broker.service.BrokerService.unloadNamespaceBundlesGracefully(BrokerService.java:887)
2022-05-20T03:37:05.4968664Z 	at org.apache.pulsar.broker.service.BrokerService.closeAsync(BrokerService.java:732)
2022-05-20T03:37:05.4969579Z 	at org.apache.pulsar.broker.PulsarService.closeAsync(PulsarService.java:450)
2022-05-20T03:37:05.4970123Z 	at org.apache.pulsar.broker.PulsarService.close(PulsarService.java:372)
2022-05-20T03:37:05.4970720Z 	at 

Blocked at CoordinationServiceImpl#close

2022-05-20T01:17:56.3359346Z "main" #1 prio=5 os_prio=0 cpu=11209.07ms elapsed=3506.06s tid=0x00007f9484024380 nid=0xaba waiting on condition  [0x00007f9489edd000]
2022-05-20T01:17:56.3361587Z    java.lang.Thread.State: WAITING (parking)
2022-05-20T01:17:56.3363789Z 	at jdk.internal.misc.Unsafe.park(java.base@17.0.3/Native Method)
2022-05-20T01:17:56.3366545Z 	- parking to wait for  <0x00000000cd180010> (a java.util.concurrent.CompletableFuture$Signaller)
2022-05-20T01:17:56.3368917Z 	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.3/LockSupport.java:211)
2022-05-20T01:17:56.3371298Z 	at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.3/CompletableFuture.java:1864)
2022-05-20T01:17:56.3373823Z 	at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.3/ForkJoinPool.java:3463)
2022-05-20T01:17:56.3376212Z 	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.3/ForkJoinPool.java:3434)
2022-05-20T01:17:56.3378608Z 	at java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.3/CompletableFuture.java:1898)
2022-05-20T01:17:56.3380999Z 	at java.util.concurrent.CompletableFuture.join(java.base@17.0.3/CompletableFuture.java:2117)
2022-05-20T01:17:56.3383947Z 	at org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl.close(CoordinationServiceImpl.java:72)
2022-05-20T01:17:56.3386574Z 	at org.apache.pulsar.broker.PulsarService.closeAsync(PulsarService.java:526)
2022-05-20T01:17:56.3388569Z 	at org.apache.pulsar.broker.PulsarService.close(PulsarService.java:372)

For BrokerService#unloadNamespaceBundlesGracefully, the request chain :

brokerService.closeAsync() -> OwnedBundle.handleUnloadRequest -> pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle) -> OwnershipCache.removeOwnership ->
ResourceLock.release 

For CoordinationServiceImpl#close, the request chain :

CoordinationServiceImpl.close -> LockManager.asyncClose -> ResourceLock.release

We find that it's all related to ResourceLock#release.

As the CI using the MockedZooKeeper, I find that if there are some RuntimeException, the response could never finish. So I add the catch block to ensure that all the requests will reply. But I'm not sure if the return code is right.

public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode,
final StringCallback cb, final Object ctx) {
executor.execute(() -> {
lock();
try {
if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
return;
}
final Set<Watcher> toNotifyCreate = Sets.newHashSet();
toNotifyCreate.addAll(watchers.get(path));
final Set<Watcher> toNotifyParent = Sets.newHashSet();
final String parent = path.substring(0, path.lastIndexOf("/"));
if (!parent.isEmpty()) {
toNotifyParent.addAll(watchers.get(parent));
}
final String name;
if (createMode != null && createMode.isSequential()) {
name = path + sequentialIdGenerator.getAndIncrement();
} else {
name = path;
}
Optional<KeeperException.Code> failure = programmedFailure(Op.CREATE, path);
if (failure.isPresent()) {
unlockIfLocked();
cb.processResult(failure.get().intValue(), path, ctx, null);
} else if (stopped) {
unlockIfLocked();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
} else if (tree.containsKey(path)) {
unlockIfLocked();
cb.processResult(KeeperException.Code.NODEEXISTS.intValue(), path, ctx, null);
} else if (!parent.isEmpty() && !tree.containsKey(parent)) {
unlockIfLocked();
toNotifyParent.forEach(watcher -> watcher
.process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected,
parent)));
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
} else {
tree.put(name, MockZNode.of(data, 0,
createMode != null && createMode.isEphemeral() ? getEphemeralOwner() : -1L));
watchers.removeAll(name);
unlockIfLocked();
cb.processResult(0, path, ctx, name);
triggerPersistentWatches(path, parent, EventType.NodeCreated);
toNotifyCreate.forEach(
watcher -> watcher.process(
new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected,
name)));
toNotifyParent.forEach(
watcher -> watcher.process(
new WatchedEvent(EventType.NodeChildrenChanged,
KeeperState.SyncConnected,
parent)));
}
} finally {
unlockIfLocked();
}
});
}

public void delete(final String path, int version, final VoidCallback cb, final Object ctx) {
Runnable r = () -> {
lock();
try {
final Set<Watcher> toNotifyDelete = Sets.newHashSet();
toNotifyDelete.addAll(watchers.get(path));
final Set<Watcher> toNotifyParent = Sets.newHashSet();
final String parent = path.substring(0, path.lastIndexOf("/"));
if (!parent.isEmpty()) {
toNotifyParent.addAll(watchers.get(parent));
}
watchers.removeAll(path);
Optional<KeeperException.Code> failure = programmedFailure(Op.DELETE, path);
if (failure.isPresent()) {
unlockIfLocked();
cb.processResult(failure.get().intValue(), path, ctx);
} else if (stopped) {
unlockIfLocked();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx);
} else if (!tree.containsKey(path)) {
unlockIfLocked();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx);
} else if (hasChildren(path)) {
unlockIfLocked();
cb.processResult(KeeperException.Code.NOTEMPTY.intValue(), path, ctx);
} else {
if (version != -1) {
int currentVersion = tree.get(path).getVersion();
if (version != currentVersion) {
unlockIfLocked();
cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx);
return;
}
}
tree.remove(path);
unlockIfLocked();
cb.processResult(0, path, ctx);
toNotifyDelete.forEach(watcher -> watcher
.process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path)));
toNotifyParent.forEach(watcher -> watcher
.process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected,
parent)));
triggerPersistentWatches(path, parent, EventType.NodeDeleted);
}
} finally {
unlockIfLocked();
}
};
try {
executor.execute(r);
} catch (RejectedExecutionException ree) {
cb.processResult(KeeperException.Code.SESSIONEXPIRED.intValue(), path, ctx);
}
}

More, the current close process has some order issues. LoadManager is closed before BrokerService, but BrokerService closes need to invoke LoadManager, even though the LoadManager is stateless, but is a little confused here.

LoadManager loadManager = this.loadManager.get();
if (loadManager != null) {
loadManager.stop();
}
List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
if (this.brokerService != null) {
CompletableFuture<Void> brokerCloseFuture = this.brokerService.closeAsync();
if (this.transactionMetadataStoreService != null) {
asyncCloseFutures.add(brokerCloseFuture.whenComplete((__, ___) -> {

public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean closeWithoutWaitingClientDisconnect) {
try {
log.info("Unloading namespace-bundles...");
// make broker-node unavailable from the cluster
if (pulsar.getLoadManager() != null && pulsar.getLoadManager().get() != null) {
try {
pulsar.getLoadManager().get().disableBroker();
} catch (PulsarServerException.NotFoundException ne) {
log.warn("Broker load-manager znode doesn't exist ", ne);
// still continue and release bundle ownership as broker's registration node doesn't exist.
}
}

Documentation

  • no-need-doc
    (Please explain why)

@github-actions
Copy link

@Technoboy- Please provide a correct documentation label for your PR.
Instructions see Pulsar Documentation Label Guide.

@Technoboy- Technoboy- self-assigned this Sep 16, 2022
@Technoboy- Technoboy- added release/2.10.3 doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Sep 16, 2022
@github-actions github-actions bot added doc-label-missing and removed doc-not-needed Your PR changes do not impact docs labels Sep 16, 2022
@Technoboy- Technoboy- merged commit 5c661b6 into apache:branch-2.10 Sep 16, 2022
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Sep 16, 2022
@Technoboy- Technoboy- deleted the cherry-pick-15755 branch September 14, 2023 11:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants