From ed42007d5f5df063a61626e49803bbf35c5a3eff Mon Sep 17 00:00:00 2001 From: lipenghui Date: Fri, 2 Jul 2021 19:25:43 +0800 Subject: [PATCH] Fix the dead lock when using hasMessageAvailableAsync and readNextAsync (#11183) The issue will happens after satisfying the following conditions: 1. The messages are added to the incoming queue before reading messages 2. The result future of the readNextAsync been complete before call future.whenComplete by users, This won't always appear. After that, since we are using the IO thread to call the callback of the hasMessageAvailableAsync, so the IO thread will process the message.getValue(). Then might get a deadlock as followings: ``` java.util.concurrent.CompletableFuture.get() CompletableFuture.java:1998 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(byte[]) AbstractMultiVersionReader.java:115 org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(BytesSchemaVersion) MultiVersionAvroReader.java:47 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(BytesSchemaVersion) AbstractMultiVersionReader.java:52 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(Object) AbstractMultiVersionReader.java:49 com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Object, CacheLoader) LocalCache.java:3529 com.google.common.cache.LocalCache$Segment.loadSync(Object, int, LocalCache$LoadingValueReference, CacheLoader) LocalCache.java:2278 com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(Object, int, CacheLoader) LocalCache.java:2155 com.google.common.cache.LocalCache$Segment.get(Object, int, CacheLoader) LocalCache.java:2045 com.google.common.cache.LocalCache.get(Object, CacheLoader) LocalCache.java:3951 com.google.common.cache.LocalCache.getOrLoad(Object) LocalCache.java:3974 com.google.common.cache.LocalCache$LocalLoadingCache.get(Object) LocalCache.java:4935 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(byte[], byte[]) AbstractMultiVersionReader.java:86 org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(byte[], byte[]) AbstractStructSchema.java:60 org.apache.pulsar.client.impl.MessageImpl.getValue() MessageImpl.java:301 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(Message) SystemTopicBasedTopicPoliciesService.java:302 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$7(SystemTopicClient$Reader, Throwable, CompletableFuture, Message, Throwable) SystemTopicBasedTopicPoliciesService.java:254 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$817.accept(Object, Object) java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859 java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Executor, BiConsumer) CompletableFuture.java:883 java.util.concurrent.CompletableFuture.whenComplete(BiConsumer) CompletableFuture.java:2251 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$10(SystemTopicClient$Reader, CompletableFuture, Boolean, Throwable) SystemTopicBasedTopicPoliciesService.java:246 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$725.accept(Object, Object) org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(PulsarApi$CommandGetLastMessageIdResponse) ClientCnx.java:468 ``` Since we are introduced the internal thread pool for handling the client internal executions. So the fix is using the internal thread to process the callback of the hasMessageAvailableAsync --- .../apache/pulsar/client/impl/ReaderTest.java | 46 ++++++++++++++++++- .../pulsar/client/impl/ConsumerImpl.java | 27 +++++++---- 2 files changed, 62 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index a04f73b167574..4a2466f53e347 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -32,8 +32,11 @@ import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; + import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -51,10 +54,11 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.Murmur3_32Hash; +import org.apache.pulsar.schema.Schemas; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -510,4 +514,44 @@ public void testSameSubName() throws Exception { } + @Test(timeOut = 30000) + public void testAvoidUsingIoThreadToGetValueOfMessage() throws Exception { + final String topic = "persistent://my-property/my-ns/testAvoidUsingIoThreadToGetValueOfMessage"; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic) + .create(); + + producer.send(new Schemas.PersonOne(1)); + + @Cleanup + Reader reader = pulsarClient.newReader(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + + CountDownLatch latch = new CountDownLatch(1); + List received = new ArrayList<>(1); + // Make sure the message is added to the incoming queue + Awaitility.await().untilAsserted(() -> + assertTrue(((ReaderImpl) reader).getConsumer().incomingMessages.size() > 0)); + reader.hasMessageAvailableAsync().whenComplete((has, e) -> { + if (e == null && has) { + CompletableFuture> future = reader.readNextAsync(); + // Make sure the future completed + Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(future::isDone); + future.whenComplete((msg, ex) -> { + if (ex == null) { + received.add(msg.getValue()); + } + latch.countDown(); + }); + } else { + latch.countDown(); + } + }); + latch.await(); + Assert.assertEquals(received.size(), 1); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index f70205d5fe111..ad410f39ba310 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1906,14 +1906,15 @@ public CompletableFuture hasMessageAvailableAsync() { .compare(markDeletePosition.getEntryId(), lastMessageId.getEntryId()) .result(); if (lastMessageId.getEntryId() < 0) { - booleanFuture.complete(false); + completehasMessageAvailableWithValue(booleanFuture, false); } else { - booleanFuture.complete(resetIncludeHead ? result <= 0 : result < 0); + completehasMessageAvailableWithValue(booleanFuture, + resetIncludeHead ? result <= 0 : result < 0); } } else if (lastMessageId == null || lastMessageId.getEntryId() < 0) { - booleanFuture.complete(false); + completehasMessageAvailableWithValue(booleanFuture, false); } else { - booleanFuture.complete(resetIncludeHead); + completehasMessageAvailableWithValue(booleanFuture, resetIncludeHead); } }).exceptionally(ex -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription, ex); @@ -1925,16 +1926,16 @@ public CompletableFuture hasMessageAvailableAsync() { } if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) { - booleanFuture.complete(true); + completehasMessageAvailableWithValue(booleanFuture, true); return booleanFuture; } getLastMessageIdAsync().thenAccept(messageId -> { lastMessageIdInBroker = messageId; if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) { - booleanFuture.complete(true); + completehasMessageAvailableWithValue(booleanFuture, true); } else { - booleanFuture.complete(false); + completehasMessageAvailableWithValue(booleanFuture, false); } }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); @@ -1945,16 +1946,16 @@ public CompletableFuture hasMessageAvailableAsync() { } else { // read before, use lastDequeueMessage for comparison if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { - booleanFuture.complete(true); + completehasMessageAvailableWithValue(booleanFuture, true); return booleanFuture; } getLastMessageIdAsync().thenAccept(messageId -> { lastMessageIdInBroker = messageId; if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { - booleanFuture.complete(true); + completehasMessageAvailableWithValue(booleanFuture, true); } else { - booleanFuture.complete(false); + completehasMessageAvailableWithValue(booleanFuture, false); } }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); @@ -1966,6 +1967,12 @@ public CompletableFuture hasMessageAvailableAsync() { return booleanFuture; } + private void completehasMessageAvailableWithValue(CompletableFuture future, boolean value) { + internalPinnedExecutor.execute(() -> { + future.complete(value); + }); + } + private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId messageId, boolean inclusive) { if (inclusive && lastMessageIdInBroker.compareTo(messageId) >= 0 && ((MessageIdImpl) lastMessageIdInBroker).getEntryId() != -1) {