Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the dead lock when using hasMessageAvailableAsync and readNextAsync #11183

Merged

Conversation

codelipenghui
Copy link
Contributor

@codelipenghui codelipenghui commented Jul 1, 2021

The issue will happens after satisfying the following conditions:

  1. The messages are added to the incoming queue before reading messages
  2. The result future of the readNextAsync been complete before call future.whenComplete by users,
    This won't always appear.

After that, since we are using the IO thread to call the callback of the hasMessageAvailableAsync,
so the IO thread will process the message.getValue(). Then might get a deadlock as followings:

pulsar-client-io-44-1 Frozen for at least 15s <Ignore a false positive>
    jdk.internal.misc.Unsafe.park(boolean, long) Unsafe.java (native)
    java.util.concurrent.locks.LockSupport.park(Object) LockSupport.java:194
    java.util.concurrent.CompletableFuture$Signaller.block() CompletableFuture.java:1796
    java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool$ManagedBlocker) ForkJoinPool.java:3128
    java.util.concurrent.CompletableFuture.waitingGet(boolean) CompletableFuture.java:1823
    java.util.concurrent.CompletableFuture.get() CompletableFuture.java:1998
    org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(byte[]) AbstractMultiVersionReader.java:115
    org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(BytesSchemaVersion) MultiVersionAvroReader.java:47
    org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(BytesSchemaVersion) AbstractMultiVersionReader.java:52
    org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(Object) AbstractMultiVersionReader.java:49
    com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Object, CacheLoader) LocalCache.java:3529
    com.google.common.cache.LocalCache$Segment.loadSync(Object, int, LocalCache$LoadingValueReference, CacheLoader) LocalCache.java:2278
    com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(Object, int, CacheLoader) LocalCache.java:2155
    com.google.common.cache.LocalCache$Segment.get(Object, int, CacheLoader) LocalCache.java:2045
    com.google.common.cache.LocalCache.get(Object, CacheLoader) LocalCache.java:3951
    com.google.common.cache.LocalCache.getOrLoad(Object) LocalCache.java:3974
    com.google.common.cache.LocalCache$LocalLoadingCache.get(Object) LocalCache.java:4935
    org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(byte[], byte[]) AbstractMultiVersionReader.java:86
    org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(byte[], byte[]) AbstractStructSchema.java:60
    org.apache.pulsar.client.impl.MessageImpl.getValue() MessageImpl.java:301
    org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(Message) SystemTopicBasedTopicPoliciesService.java:302
    org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$7(SystemTopicClient$Reader, Throwable, CompletableFuture, Message, Throwable) SystemTopicBasedTopicPoliciesService.java:254
    org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$817.accept(Object, Object)
    java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859
    java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Executor, BiConsumer) CompletableFuture.java:883
    java.util.concurrent.CompletableFuture.whenComplete(BiConsumer) CompletableFuture.java:2251
    org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$10(SystemTopicClient$Reader, CompletableFuture, Boolean, Throwable) SystemTopicBasedTopicPoliciesService.java:246
    org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$725.accept(Object, Object)
    java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859
    java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(int) CompletableFuture.java:837
    java.util.concurrent.CompletableFuture.postComplete() CompletableFuture.java:506
    java.util.concurrent.CompletableFuture.complete(Object) CompletableFuture.java:2073
    org.apache.pulsar.client.impl.ConsumerImpl.lambda$hasMessageAvailableAsync$48(CompletableFuture, MessageId) ConsumerImpl.java:2075
    org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$723.accept(Object)
    java.util.concurrent.CompletableFuture$UniAccept.tryFire(int) CompletableFuture.java:714
    java.util.concurrent.CompletableFuture.postComplete() CompletableFuture.java:506
    java.util.concurrent.CompletableFuture.complete(Object) CompletableFuture.java:2073
    org.apache.pulsar.client.impl.ConsumerImpl.lambda$internalGetLastMessageIdAsync$53(CompletableFuture, PulsarApi$CommandGetLastMessageIdResponse) ConsumerImpl.java:2187
    org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$720.accept(Object)
    java.util.concurrent.CompletableFuture$UniAccept.tryFire(int) CompletableFuture.java:714
    java.util.concurrent.CompletableFuture.postComplete() CompletableFuture.java:506
    java.util.concurrent.CompletableFuture.complete(Object) CompletableFuture.java:2073
    org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(PulsarApi$CommandGetLastMessageIdResponse) ClientCnx.java:468
    org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(ChannelHandlerContext, Object) PulsarDecoder.java:342
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Object) AbstractChannelHandlerContext.java:379
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) AbstractChannelHandlerContext.java:365
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Object) AbstractChannelHandlerContext.java:357
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ChannelHandlerContext, CodecOutputList, int) ByteToMessageDecoder.java:324
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ChannelHandlerContext, Object) ByteToMessageDecoder.java:296
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Object) AbstractChannelHandlerContext.java:379
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) AbstractChannelHandlerContext.java:365
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Object) AbstractChannelHandlerContext.java:357
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(ChannelHandlerContext, Object) DefaultChannelPipeline.java:1410
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Object) AbstractChannelHandlerContext.java:379
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) AbstractChannelHandlerContext.java:365
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(Object) DefaultChannelPipeline.java:919
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read() AbstractNioByteChannel.java:166
    io.netty.channel.nio.NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) NioEventLoop.java:719
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized() NioEventLoop.java:655
    io.netty.channel.nio.NioEventLoop.processSelectedKeys() NioEventLoop.java:581
    io.netty.channel.nio.NioEventLoop.run() NioEventLoop.java:493
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run() SingleThreadEventExecutor.java:989
    io.netty.util.internal.ThreadExecutorMap$2.run() ThreadExecutorMap.java:74
    io.netty.util.concurrent.FastThreadLocalRunnable.run() FastThreadLocalRunnable.java:30
    java.lang.Thread.run() Thread.java:834

Since we are introduced the internal thread pool for handling the client internal executions.
So the fix is using the internal thread to process the callback of the hasMessageAvailableAsync

@codelipenghui codelipenghui added type/bug The PR fixed a bug or issue reported a bug release/2.7.3 release/2.8.1 doc-not-needed Your PR changes do not impact docs labels Jul 1, 2021
@codelipenghui codelipenghui added this to the 2.9.0 milestone Jul 1, 2021
@codelipenghui codelipenghui self-assigned this Jul 1, 2021
if (e == null && has) {
CompletableFuture<Message<Schemas.PersonOne>> future = reader.readNextAsync();
// Make sure the future completed
Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(future::isDone);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to wait for the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to simulate the race condition here, the problem happens only the future is already completed, so that will continue using the IO thread to execute the callback.

@315157973
Copy link
Contributor

Great work! This is very difficult to troubleshoot, thanks penghui
When the Future has been completed, use future.whenComplete at this time, and the current thread will be used. A thread switch causes a deadlock. Here is a Demo:

        CompletableFuture<Void> future = new CompletableFuture<>();
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
                new BasicThreadFactory.Builder().namingPattern("metrics-schedule-pool-%d").build());
        executorService.execute(() -> {
            System.out.println("executorService:" + Thread.currentThread().getName());
            future.complete(null);
        });
        Thread.sleep(5000);

        future.whenComplete(((unused, throwable) -> {
            System.out.println("whenComplete:" + Thread.currentThread().getName());
        }));
        System.out.println("out:" + Thread.currentThread().getName());

image

Copy link
Contributor

@315157973 315157973 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch !

LGTM

The issue will happens after satisfying the following conditions:

1. The messages are added to the incoming queue before reading messages
2. The result future of the readNextAsync been complete before call future.whenComplete by users,
   This won't always appear.

 After that, since we are using the IO thread to call the callback of the hasMessageAvailableAsync,
 so the IO thread will process the message.getValue(). Then might get a deadlock as followings:

 ```
 jdk.internal.misc.Unsafe.park(boolean, long) Unsafe.java (native)
 java.util.concurrent.locks.LockSupport.park(Object) LockSupport.java:194
 java.util.concurrent.CompletableFuture$Signaller.block() CompletableFuture.java:1796
 java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool$ManagedBlocker) ForkJoinPool.java:3128
 java.util.concurrent.CompletableFuture.waitingGet(boolean) CompletableFuture.java:1823
 java.util.concurrent.CompletableFuture.get() CompletableFuture.java:1998
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(byte[]) AbstractMultiVersionReader.java:115
 org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(BytesSchemaVersion) MultiVersionAvroReader.java:47
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(BytesSchemaVersion) AbstractMultiVersionReader.java:52
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(Object) AbstractMultiVersionReader.java:49
 com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Object, CacheLoader) LocalCache.java:3529
 com.google.common.cache.LocalCache$Segment.loadSync(Object, int, LocalCache$LoadingValueReference, CacheLoader) LocalCache.java:2278
 com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(Object, int, CacheLoader) LocalCache.java:2155
 com.google.common.cache.LocalCache$Segment.get(Object, int, CacheLoader) LocalCache.java:2045
 com.google.common.cache.LocalCache.get(Object, CacheLoader) LocalCache.java:3951
 com.google.common.cache.LocalCache.getOrLoad(Object) LocalCache.java:3974
 com.google.common.cache.LocalCache$LocalLoadingCache.get(Object) LocalCache.java:4935
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(byte[], byte[]) AbstractMultiVersionReader.java:86
 org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(byte[], byte[]) AbstractStructSchema.java:60
 org.apache.pulsar.client.impl.MessageImpl.getValue() MessageImpl.java:301
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(Message) SystemTopicBasedTopicPoliciesService.java:302
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$7(SystemTopicClient$Reader, Throwable, CompletableFuture, Message, Throwable) SystemTopicBasedTopicPoliciesService.java:254
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$817.accept(Object, Object)
 java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859
 java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Executor, BiConsumer) CompletableFuture.java:883
 java.util.concurrent.CompletableFuture.whenComplete(BiConsumer) CompletableFuture.java:2251
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$10(SystemTopicClient$Reader, CompletableFuture, Boolean, Throwable) SystemTopicBasedTopicPoliciesService.java:246
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$725.accept(Object, Object)
 java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859
 java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(int) CompletableFuture.java:837
 java.util.concurrent.CompletableFuture.postComplete() CompletableFuture.java:506
 java.util.concurrent.CompletableFuture.complete(Object) CompletableFuture.java:2073
 org.apache.pulsar.client.impl.ConsumerImpl.lambda$hasMessageAvailableAsync$48(CompletableFuture, MessageId) ConsumerImpl.java:2075
 org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$723.accept(Object)
 java.util.concurrent.CompletableFuture$UniAccept.tryFire(int) CompletableFuture.java:714
 java.util.concurrent.CompletableFuture.postComplete() CompletableFuture.java:506
 java.util.concurrent.CompletableFuture.complete(Object) CompletableFuture.java:2073
 org.apache.pulsar.client.impl.ConsumerImpl.lambda$internalGetLastMessageIdAsync$53(CompletableFuture, PulsarApi$CommandGetLastMessageIdResponse) ConsumerImpl.java:2187
 org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$720.accept(Object)
 java.util.concurrent.CompletableFuture$UniAccept.tryFire(int) CompletableFuture.java:714
 java.util.concurrent.CompletableFuture.postComplete() CompletableFuture.java:506
 java.util.concurrent.CompletableFuture.complete(Object) CompletableFuture.java:2073
 org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(PulsarApi$CommandGetLastMessageIdResponse) ClientCnx.java:468
 org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(ChannelHandlerContext, Object) PulsarDecoder.java:342
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Object) AbstractChannelHandlerContext.java:379
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) AbstractChannelHandlerContext.java:365
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Object) AbstractChannelHandlerContext.java:357
 io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ChannelHandlerContext, CodecOutputList, int) ByteToMessageDecoder.java:324
 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ChannelHandlerContext, Object) ByteToMessageDecoder.java:296
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Object) AbstractChannelHandlerContext.java:379
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) AbstractChannelHandlerContext.java:365
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Object) AbstractChannelHandlerContext.java:357
 io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(ChannelHandlerContext, Object) DefaultChannelPipeline.java:1410
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Object) AbstractChannelHandlerContext.java:379
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) AbstractChannelHandlerContext.java:365
 io.netty.channel.DefaultChannelPipeline.fireChannelRead(Object) DefaultChannelPipeline.java:919
 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read() AbstractNioByteChannel.java:166
 io.netty.channel.nio.NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) NioEventLoop.java:719
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized() NioEventLoop.java:655
 io.netty.channel.nio.NioEventLoop.processSelectedKeys() NioEventLoop.java:581
 io.netty.channel.nio.NioEventLoop.run() NioEventLoop.java:493
 io.netty.util.concurrent.SingleThreadEventExecutor$4.run() SingleThreadEventExecutor.java:989
 io.netty.util.internal.ThreadExecutorMap$2.run() ThreadExecutorMap.java:74
 io.netty.util.concurrent.FastThreadLocalRunnable.run() FastThreadLocalRunnable.java:30
 java.lang.Thread.run() Thread.java:834
 ```

 Since we are introduced the internal thread pool for handling the client internal executions.
 So the fix is using the internal thread to process the callback of the hasMessageAvailableAsync
@codelipenghui codelipenghui force-pushed the penghui/fix-dead-lock-consumer branch from 2ec404c to 6d562f3 Compare July 2, 2021 08:21
@codelipenghui codelipenghui merged commit ed42007 into apache:master Jul 2, 2021
@codelipenghui codelipenghui deleted the penghui/fix-dead-lock-consumer branch July 2, 2021 11:25
@hangc0276
Copy link
Contributor

Great catch! The deadlock it hard to debug.

codelipenghui added a commit that referenced this pull request Jul 2, 2021
…nc (#11183)

The issue will happens after satisfying the following conditions:

1. The messages are added to the incoming queue before reading messages
2. The result future of the readNextAsync been complete before call future.whenComplete by users,
   This won't always appear.

 After that, since we are using the IO thread to call the callback of the hasMessageAvailableAsync,
 so the IO thread will process the message.getValue(). Then might get a deadlock as followings:

 ```
 java.util.concurrent.CompletableFuture.get() CompletableFuture.java:1998
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(byte[]) AbstractMultiVersionReader.java:115
 org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(BytesSchemaVersion) MultiVersionAvroReader.java:47
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(BytesSchemaVersion) AbstractMultiVersionReader.java:52
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(Object) AbstractMultiVersionReader.java:49
 com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Object, CacheLoader) LocalCache.java:3529
 com.google.common.cache.LocalCache$Segment.loadSync(Object, int, LocalCache$LoadingValueReference, CacheLoader) LocalCache.java:2278
 com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(Object, int, CacheLoader) LocalCache.java:2155
 com.google.common.cache.LocalCache$Segment.get(Object, int, CacheLoader) LocalCache.java:2045
 com.google.common.cache.LocalCache.get(Object, CacheLoader) LocalCache.java:3951
 com.google.common.cache.LocalCache.getOrLoad(Object) LocalCache.java:3974
 com.google.common.cache.LocalCache$LocalLoadingCache.get(Object) LocalCache.java:4935
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(byte[], byte[]) AbstractMultiVersionReader.java:86
 org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(byte[], byte[]) AbstractStructSchema.java:60
 org.apache.pulsar.client.impl.MessageImpl.getValue() MessageImpl.java:301
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(Message) SystemTopicBasedTopicPoliciesService.java:302
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$7(SystemTopicClient$Reader, Throwable, CompletableFuture, Message, Throwable) SystemTopicBasedTopicPoliciesService.java:254
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$817.accept(Object, Object)
 java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859
 java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Executor, BiConsumer) CompletableFuture.java:883
 java.util.concurrent.CompletableFuture.whenComplete(BiConsumer) CompletableFuture.java:2251
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$10(SystemTopicClient$Reader, CompletableFuture, Boolean, Throwable) SystemTopicBasedTopicPoliciesService.java:246
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$725.accept(Object, Object) org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(PulsarApi$CommandGetLastMessageIdResponse) ClientCnx.java:468

 ```

 Since we are introduced the internal thread pool for handling the client internal executions.
 So the fix is using the internal thread to process the callback of the hasMessageAvailableAsync

(cherry picked from commit ed42007)
@codelipenghui codelipenghui added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Jul 4, 2021
codelipenghui added a commit that referenced this pull request Jul 7, 2021
…nc (#11183)

The issue will happens after satisfying the following conditions:

1. The messages are added to the incoming queue before reading messages
2. The result future of the readNextAsync been complete before call future.whenComplete by users,
   This won't always appear.

 After that, since we are using the IO thread to call the callback of the hasMessageAvailableAsync,
 so the IO thread will process the message.getValue(). Then might get a deadlock as followings:

 ```
 java.util.concurrent.CompletableFuture.get() CompletableFuture.java:1998
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(byte[]) AbstractMultiVersionReader.java:115
 org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(BytesSchemaVersion) MultiVersionAvroReader.java:47
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(BytesSchemaVersion) AbstractMultiVersionReader.java:52
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(Object) AbstractMultiVersionReader.java:49
 com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Object, CacheLoader) LocalCache.java:3529
 com.google.common.cache.LocalCache$Segment.loadSync(Object, int, LocalCache$LoadingValueReference, CacheLoader) LocalCache.java:2278
 com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(Object, int, CacheLoader) LocalCache.java:2155
 com.google.common.cache.LocalCache$Segment.get(Object, int, CacheLoader) LocalCache.java:2045
 com.google.common.cache.LocalCache.get(Object, CacheLoader) LocalCache.java:3951
 com.google.common.cache.LocalCache.getOrLoad(Object) LocalCache.java:3974
 com.google.common.cache.LocalCache$LocalLoadingCache.get(Object) LocalCache.java:4935
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(byte[], byte[]) AbstractMultiVersionReader.java:86
 org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(byte[], byte[]) AbstractStructSchema.java:60
 org.apache.pulsar.client.impl.MessageImpl.getValue() MessageImpl.java:301
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(Message) SystemTopicBasedTopicPoliciesService.java:302
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$7(SystemTopicClient$Reader, Throwable, CompletableFuture, Message, Throwable) SystemTopicBasedTopicPoliciesService.java:254
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$817.accept(Object, Object)
 java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859
 java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Executor, BiConsumer) CompletableFuture.java:883
 java.util.concurrent.CompletableFuture.whenComplete(BiConsumer) CompletableFuture.java:2251
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$10(SystemTopicClient$Reader, CompletableFuture, Boolean, Throwable) SystemTopicBasedTopicPoliciesService.java:246
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$725.accept(Object, Object) org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(PulsarApi$CommandGetLastMessageIdResponse) ClientCnx.java:468

 ```

 Since we are introduced the internal thread pool for handling the client internal executions.
 So the fix is using the internal thread to process the callback of the hasMessageAvailableAsync

(cherry picked from commit ed42007)
@codelipenghui codelipenghui added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Jul 7, 2021
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Oct 18, 2021
…nc (apache#11183)

The issue will happens after satisfying the following conditions:

1. The messages are added to the incoming queue before reading messages
2. The result future of the readNextAsync been complete before call future.whenComplete by users,
   This won't always appear.

 After that, since we are using the IO thread to call the callback of the hasMessageAvailableAsync,
 so the IO thread will process the message.getValue(). Then might get a deadlock as followings:

 ```
 java.util.concurrent.CompletableFuture.get() CompletableFuture.java:1998
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(byte[]) AbstractMultiVersionReader.java:115
 org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(BytesSchemaVersion) MultiVersionAvroReader.java:47
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(BytesSchemaVersion) AbstractMultiVersionReader.java:52
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(Object) AbstractMultiVersionReader.java:49
 com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Object, CacheLoader) LocalCache.java:3529
 com.google.common.cache.LocalCache$Segment.loadSync(Object, int, LocalCache$LoadingValueReference, CacheLoader) LocalCache.java:2278
 com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(Object, int, CacheLoader) LocalCache.java:2155
 com.google.common.cache.LocalCache$Segment.get(Object, int, CacheLoader) LocalCache.java:2045
 com.google.common.cache.LocalCache.get(Object, CacheLoader) LocalCache.java:3951
 com.google.common.cache.LocalCache.getOrLoad(Object) LocalCache.java:3974
 com.google.common.cache.LocalCache$LocalLoadingCache.get(Object) LocalCache.java:4935
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(byte[], byte[]) AbstractMultiVersionReader.java:86
 org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(byte[], byte[]) AbstractStructSchema.java:60
 org.apache.pulsar.client.impl.MessageImpl.getValue() MessageImpl.java:301
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(Message) SystemTopicBasedTopicPoliciesService.java:302
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$7(SystemTopicClient$Reader, Throwable, CompletableFuture, Message, Throwable) SystemTopicBasedTopicPoliciesService.java:254
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$817.accept(Object, Object)
 java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859
 java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Executor, BiConsumer) CompletableFuture.java:883
 java.util.concurrent.CompletableFuture.whenComplete(BiConsumer) CompletableFuture.java:2251
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$10(SystemTopicClient$Reader, CompletableFuture, Boolean, Throwable) SystemTopicBasedTopicPoliciesService.java:246
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$725.accept(Object, Object) org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(PulsarApi$CommandGetLastMessageIdResponse) ClientCnx.java:468

 ```

 Since we are introduced the internal thread pool for handling the client internal executions.
 So the fix is using the internal thread to process the callback of the hasMessageAvailableAsync

(cherry picked from commit ed42007)
jerrypeng pushed a commit to jerrypeng/incubator-pulsar that referenced this pull request Nov 4, 2021
dlg99 pushed a commit to dlg99/pulsar that referenced this pull request Dec 2, 2021
…nc (apache#11183)

The issue will happens after satisfying the following conditions:

1. The messages are added to the incoming queue before reading messages
2. The result future of the readNextAsync been complete before call future.whenComplete by users,
   This won't always appear.

 After that, since we are using the IO thread to call the callback of the hasMessageAvailableAsync,
 so the IO thread will process the message.getValue(). Then might get a deadlock as followings:

 ```
 java.util.concurrent.CompletableFuture.get() CompletableFuture.java:1998
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(byte[]) AbstractMultiVersionReader.java:115
 org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(BytesSchemaVersion) MultiVersionAvroReader.java:47
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(BytesSchemaVersion) AbstractMultiVersionReader.java:52
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(Object) AbstractMultiVersionReader.java:49
 com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Object, CacheLoader) LocalCache.java:3529
 com.google.common.cache.LocalCache$Segment.loadSync(Object, int, LocalCache$LoadingValueReference, CacheLoader) LocalCache.java:2278
 com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(Object, int, CacheLoader) LocalCache.java:2155
 com.google.common.cache.LocalCache$Segment.get(Object, int, CacheLoader) LocalCache.java:2045
 com.google.common.cache.LocalCache.get(Object, CacheLoader) LocalCache.java:3951
 com.google.common.cache.LocalCache.getOrLoad(Object) LocalCache.java:3974
 com.google.common.cache.LocalCache$LocalLoadingCache.get(Object) LocalCache.java:4935
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(byte[], byte[]) AbstractMultiVersionReader.java:86
 org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(byte[], byte[]) AbstractStructSchema.java:60
 org.apache.pulsar.client.impl.MessageImpl.getValue() MessageImpl.java:301
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(Message) SystemTopicBasedTopicPoliciesService.java:302
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$7(SystemTopicClient$Reader, Throwable, CompletableFuture, Message, Throwable) SystemTopicBasedTopicPoliciesService.java:254
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$817.accept(Object, Object)
 java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859
 java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Executor, BiConsumer) CompletableFuture.java:883
 java.util.concurrent.CompletableFuture.whenComplete(BiConsumer) CompletableFuture.java:2251
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$10(SystemTopicClient$Reader, CompletableFuture, Boolean, Throwable) SystemTopicBasedTopicPoliciesService.java:246
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$725.accept(Object, Object) org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(PulsarApi$CommandGetLastMessageIdResponse) ClientCnx.java:468

 ```

 Since we are introduced the internal thread pool for handling the client internal executions.
 So the fix is using the internal thread to process the callback of the hasMessageAvailableAsync

(cherry picked from commit ed42007)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Dec 2, 2021
…nc (apache#11183)

The issue will happens after satisfying the following conditions:

1. The messages are added to the incoming queue before reading messages
2. The result future of the readNextAsync been complete before call future.whenComplete by users,
   This won't always appear.

 After that, since we are using the IO thread to call the callback of the hasMessageAvailableAsync,
 so the IO thread will process the message.getValue(). Then might get a deadlock as followings:

 ```
 java.util.concurrent.CompletableFuture.get() CompletableFuture.java:1998
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(byte[]) AbstractMultiVersionReader.java:115
 org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(BytesSchemaVersion) MultiVersionAvroReader.java:47
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(BytesSchemaVersion) AbstractMultiVersionReader.java:52
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(Object) AbstractMultiVersionReader.java:49
 com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Object, CacheLoader) LocalCache.java:3529
 com.google.common.cache.LocalCache$Segment.loadSync(Object, int, LocalCache$LoadingValueReference, CacheLoader) LocalCache.java:2278
 com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(Object, int, CacheLoader) LocalCache.java:2155
 com.google.common.cache.LocalCache$Segment.get(Object, int, CacheLoader) LocalCache.java:2045
 com.google.common.cache.LocalCache.get(Object, CacheLoader) LocalCache.java:3951
 com.google.common.cache.LocalCache.getOrLoad(Object) LocalCache.java:3974
 com.google.common.cache.LocalCache$LocalLoadingCache.get(Object) LocalCache.java:4935
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(byte[], byte[]) AbstractMultiVersionReader.java:86
 org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(byte[], byte[]) AbstractStructSchema.java:60
 org.apache.pulsar.client.impl.MessageImpl.getValue() MessageImpl.java:301
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(Message) SystemTopicBasedTopicPoliciesService.java:302
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$7(SystemTopicClient$Reader, Throwable, CompletableFuture, Message, Throwable) SystemTopicBasedTopicPoliciesService.java:254
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$817.accept(Object, Object)
 java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859
 java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Executor, BiConsumer) CompletableFuture.java:883
 java.util.concurrent.CompletableFuture.whenComplete(BiConsumer) CompletableFuture.java:2251
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$10(SystemTopicClient$Reader, CompletableFuture, Boolean, Throwable) SystemTopicBasedTopicPoliciesService.java:246
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$725.accept(Object, Object) org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(PulsarApi$CommandGetLastMessageIdResponse) ClientCnx.java:468

 ```

 Since we are introduced the internal thread pool for handling the client internal executions.
 So the fix is using the internal thread to process the callback of the hasMessageAvailableAsync

(cherry picked from commit ed42007)
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…nc (apache#11183)

The issue will happens after satisfying the following conditions:

1. The messages are added to the incoming queue before reading messages
2. The result future of the readNextAsync been complete before call future.whenComplete by users,
   This won't always appear.

 After that, since we are using the IO thread to call the callback of the hasMessageAvailableAsync,
 so the IO thread will process the message.getValue(). Then might get a deadlock as followings:

 ```
 java.util.concurrent.CompletableFuture.get() CompletableFuture.java:1998
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(byte[]) AbstractMultiVersionReader.java:115
 org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(BytesSchemaVersion) MultiVersionAvroReader.java:47
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(BytesSchemaVersion) AbstractMultiVersionReader.java:52
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(Object) AbstractMultiVersionReader.java:49
 com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Object, CacheLoader) LocalCache.java:3529
 com.google.common.cache.LocalCache$Segment.loadSync(Object, int, LocalCache$LoadingValueReference, CacheLoader) LocalCache.java:2278
 com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(Object, int, CacheLoader) LocalCache.java:2155
 com.google.common.cache.LocalCache$Segment.get(Object, int, CacheLoader) LocalCache.java:2045
 com.google.common.cache.LocalCache.get(Object, CacheLoader) LocalCache.java:3951
 com.google.common.cache.LocalCache.getOrLoad(Object) LocalCache.java:3974
 com.google.common.cache.LocalCache$LocalLoadingCache.get(Object) LocalCache.java:4935
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(byte[], byte[]) AbstractMultiVersionReader.java:86
 org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(byte[], byte[]) AbstractStructSchema.java:60
 org.apache.pulsar.client.impl.MessageImpl.getValue() MessageImpl.java:301
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(Message) SystemTopicBasedTopicPoliciesService.java:302
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$7(SystemTopicClient$Reader, Throwable, CompletableFuture, Message, Throwable) SystemTopicBasedTopicPoliciesService.java:254
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$817.accept(Object, Object)
 java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859
 java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Executor, BiConsumer) CompletableFuture.java:883
 java.util.concurrent.CompletableFuture.whenComplete(BiConsumer) CompletableFuture.java:2251
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$10(SystemTopicClient$Reader, CompletableFuture, Boolean, Throwable) SystemTopicBasedTopicPoliciesService.java:246
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$725.accept(Object, Object) org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(PulsarApi$CommandGetLastMessageIdResponse) ClientCnx.java:468

 ```

 Since we are introduced the internal thread pool for handling the client internal executions.
 So the fix is using the internal thread to process the callback of the hasMessageAvailableAsync
@codelipenghui codelipenghui restored the penghui/fix-dead-lock-consumer branch May 17, 2022 01:20
@codelipenghui codelipenghui deleted the penghui/fix-dead-lock-consumer branch May 17, 2022 01:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.7 Archived: 2.7 is end of life cherry-picked/branch-2.8 Archived: 2.8 is end of life doc-not-needed Your PR changes do not impact docs release/2.7.3 release/2.8.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants