Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement][client][schema] Remove synchronous calls in asynchronous callback #23432

Open
1 of 2 tasks
nodece opened this issue Oct 10, 2024 · 0 comments
Open
1 of 2 tasks
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Comments

@nodece
Copy link
Member

nodece commented Oct 10, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

#23378 CI failed, which reports the deadlock issue in the pulsar client.

The binary lookup uses the pulsar client's internal executor.

2024-10-09T14:07:01.8281866Z "broker-client-shared-internal-executor-203-1" #398 [4208] prio=5 os_prio=0 cpu=5.08ms elapsed=3422.76s tid=0x00007efce4013ce0 nid=4208 waiting on condition  [0x00007efca57fd000]
2024-10-09T14:07:01.8282015Z    java.lang.Thread.State: WAITING (parking)
2024-10-09T14:07:01.8282244Z 	at jdk.internal.misc.Unsafe.park(java.base@21.0.4/Native Method)
2024-10-09T14:07:01.8282658Z 	- parking to wait for  <0x000010000c5f6c58> (a java.util.concurrent.CompletableFuture$Signaller)
2024-10-09T14:07:01.8283361Z 	at java.util.concurrent.locks.LockSupport.park(java.base@21.0.4/LockSupport.java:221)
2024-10-09T14:07:01.8283798Z 	at java.util.concurrent.CompletableFuture$Signaller.block(java.base@21.0.4/CompletableFuture.java:1864)
2024-10-09T14:07:01.8284421Z 	at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@21.0.4/ForkJoinPool.java:3780)
2024-10-09T14:07:01.8284853Z 	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@21.0.4/ForkJoinPool.java:3725)
2024-10-09T14:07:01.8285257Z 	at java.util.concurrent.CompletableFuture.waitingGet(java.base@21.0.4/CompletableFuture.java:1898)
2024-10-09T14:07:01.8285614Z 	at java.util.concurrent.CompletableFuture.get(java.base@21.0.4/CompletableFuture.java:2072)
2024-10-09T14:07:01.8286326Z 	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:118)
2024-10-09T14:07:01.8286927Z 	at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:45)
2024-10-09T14:07:01.8287435Z 	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:51)
2024-10-09T14:07:01.8287942Z 	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:48)
2024-10-09T14:07:01.8288307Z 	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
2024-10-09T14:07:01.8288582Z 	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
2024-10-09T14:07:01.8288898Z 	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
2024-10-09T14:07:01.8289291Z 	- locked <0x000010000c5f4f80> (a com.google.common.cache.LocalCache$StrongAccessEntry)
2024-10-09T14:07:01.8289762Z 	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
2024-10-09T14:07:01.8289989Z 	at com.google.common.cache.LocalCache.get(LocalCache.java:4012)
2024-10-09T14:07:01.8290251Z 	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
2024-10-09T14:07:01.8290554Z 	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
2024-10-09T14:07:01.8291133Z 	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:82)
2024-10-09T14:07:01.8291642Z 	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:73)
2024-10-09T14:07:01.8292052Z 	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:90)
2024-10-09T14:07:01.8292382Z 	at org.apache.pulsar.client.impl.MessageImpl.decodeBySchema(MessageImpl.java:512)
2024-10-09T14:07:01.8292665Z 	at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:493)
2024-10-09T14:07:01.8292959Z 	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:478)
2024-10-09T14:07:01.8293681Z 	at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(SystemTopicBasedTopicPoliciesService.java:523)
2024-10-09T14:07:01.8294463Z 	at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$readMorePoliciesAsync$24(SystemTopicBasedTopicPoliciesService.java:500)
2024-10-09T14:07:01.8294991Z 	at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda/0x00007efd04b54f00.accept(Unknown Source)
2024-10-09T14:07:01.8295396Z 	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(java.base@21.0.4/CompletableFuture.java:718)
2024-10-09T14:07:01.8295796Z 	at java.util.concurrent.CompletableFuture.postComplete(java.base@21.0.4/CompletableFuture.java:510)
2024-10-09T14:07:01.8296257Z 	at java.util.concurrent.CompletableFuture.complete(java.base@21.0.4/CompletableFuture.java:2179)
2024-10-09T14:07:01.8296677Z 	at org.apache.pulsar.client.impl.ConsumerBase.lambda$completePendingReceive$0(ConsumerBase.java:333)
2024-10-09T14:07:01.8297062Z 	at org.apache.pulsar.client.impl.ConsumerBase$$Lambda/0x00007efd04bb29a8.run(Unknown Source)
2024-10-09T14:07:01.8297599Z 	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@21.0.4/ThreadPoolExecutor.java:1144)
2024-10-09T14:07:01.8298124Z 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@21.0.4/ThreadPoolExecutor.java:642)
2024-10-09T14:07:01.8298487Z 	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
2024-10-09T14:07:01.8298698Z 	at java.lang.Thread.runWith(java.base@21.0.4/Thread.java:1596)
2024-10-09T14:07:01.8298883Z 	at java.lang.Thread.run(java.base@21.0.4/Thread.java:1583)
2024-10-09T14:07:01.8298896Z 
2024-10-09T14:07:01.8299012Z    Locked ownable synchronizers:
2024-10-09T14:07:01.8299347Z 	- <0x000010000c50c540> (a java.util.concurrent.ThreadPoolExecutor$Worker)

We are using synchronous calls in asynchronous callback.

Solution

Solution 1:

  1. Add a set of async methods:
    • org.apache.pulsar.client.api.schema.SchemaReader#read
    • org.apache.pulsar.client.api.Message#getValue
  2. Check the call stack, and then remove synchronous calls in asynchronous callback.

This solution will change the method signature, which breaks the public interface(SchemaReader and Message).

Solution 2:

private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader) {
        if (closed.get()) {
            cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
            return;
        }
        reader.readNextAsync()
                .thenAccept(msg -> {
                    refreshTopicPoliciesCache(msg);
                    notifyListener(msg);
                })

It looks like refreshTopicPoliciesCache has a deadlock, and the callback thread is broker-client-shared-internal-executor.
We can use the pulsarService.getExecutor() to run the callback, this can avoid the deadlock.

Solution 3:

Don't use the pulsar client's internal executor in the org.apache.pulsar.client.impl.BinaryProtoLookupService.

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@nodece nodece added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Oct 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

No branches or pull requests

1 participant