Skip to content

Commit

Permalink
Fix the dead lock when using hasMessageAvailableAsync and readNextAsy…
Browse files Browse the repository at this point in the history
…nc (apache#11183)

The issue will happens after satisfying the following conditions:

1. The messages are added to the incoming queue before reading messages
2. The result future of the readNextAsync been complete before call future.whenComplete by users,
   This won't always appear.

 After that, since we are using the IO thread to call the callback of the hasMessageAvailableAsync,
 so the IO thread will process the message.getValue(). Then might get a deadlock as followings:

 ```
 java.util.concurrent.CompletableFuture.get() CompletableFuture.java:1998
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(byte[]) AbstractMultiVersionReader.java:115
 org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(BytesSchemaVersion) MultiVersionAvroReader.java:47
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(BytesSchemaVersion) AbstractMultiVersionReader.java:52
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(Object) AbstractMultiVersionReader.java:49
 com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Object, CacheLoader) LocalCache.java:3529
 com.google.common.cache.LocalCache$Segment.loadSync(Object, int, LocalCache$LoadingValueReference, CacheLoader) LocalCache.java:2278
 com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(Object, int, CacheLoader) LocalCache.java:2155
 com.google.common.cache.LocalCache$Segment.get(Object, int, CacheLoader) LocalCache.java:2045
 com.google.common.cache.LocalCache.get(Object, CacheLoader) LocalCache.java:3951
 com.google.common.cache.LocalCache.getOrLoad(Object) LocalCache.java:3974
 com.google.common.cache.LocalCache$LocalLoadingCache.get(Object) LocalCache.java:4935
 org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(byte[], byte[]) AbstractMultiVersionReader.java:86
 org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(byte[], byte[]) AbstractStructSchema.java:60
 org.apache.pulsar.client.impl.MessageImpl.getValue() MessageImpl.java:301
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.refreshTopicPoliciesCache(Message) SystemTopicBasedTopicPoliciesService.java:302
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$7(SystemTopicClient$Reader, Throwable, CompletableFuture, Message, Throwable) SystemTopicBasedTopicPoliciesService.java:254
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$817.accept(Object, Object)
 java.util.concurrent.CompletableFuture.uniWhenComplete(Object, BiConsumer, CompletableFuture$UniWhenComplete) CompletableFuture.java:859
 java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Executor, BiConsumer) CompletableFuture.java:883
 java.util.concurrent.CompletableFuture.whenComplete(BiConsumer) CompletableFuture.java:2251
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$initPolicesCache$10(SystemTopicClient$Reader, CompletableFuture, Boolean, Throwable) SystemTopicBasedTopicPoliciesService.java:246
 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$$Lambda$725.accept(Object, Object) org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(PulsarApi$CommandGetLastMessageIdResponse) ClientCnx.java:468

 ```

 Since we are introduced the internal thread pool for handling the client internal executions.
 So the fix is using the internal thread to process the callback of the hasMessageAvailableAsync
  • Loading branch information
codelipenghui authored Jul 2, 2021
1 parent 1b94225 commit ed42007
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -51,10 +54,11 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -510,4 +514,44 @@ public void testSameSubName() throws Exception {

}

@Test(timeOut = 30000)
public void testAvoidUsingIoThreadToGetValueOfMessage() throws Exception {
final String topic = "persistent://my-property/my-ns/testAvoidUsingIoThreadToGetValueOfMessage";

@Cleanup
Producer<Schemas.PersonOne> producer = pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic)
.create();

producer.send(new Schemas.PersonOne(1));

@Cleanup
Reader<Schemas.PersonOne> reader = pulsarClient.newReader(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic)
.startMessageId(MessageId.earliest)
.create();

CountDownLatch latch = new CountDownLatch(1);
List<Schemas.PersonOne> received = new ArrayList<>(1);
// Make sure the message is added to the incoming queue
Awaitility.await().untilAsserted(() ->
assertTrue(((ReaderImpl<?>) reader).getConsumer().incomingMessages.size() > 0));
reader.hasMessageAvailableAsync().whenComplete((has, e) -> {
if (e == null && has) {
CompletableFuture<Message<Schemas.PersonOne>> future = reader.readNextAsync();
// Make sure the future completed
Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(future::isDone);
future.whenComplete((msg, ex) -> {
if (ex == null) {
received.add(msg.getValue());
}
latch.countDown();
});
} else {
latch.countDown();
}
});
latch.await();
Assert.assertEquals(received.size(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1906,14 +1906,15 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
.compare(markDeletePosition.getEntryId(), lastMessageId.getEntryId())
.result();
if (lastMessageId.getEntryId() < 0) {
booleanFuture.complete(false);
completehasMessageAvailableWithValue(booleanFuture, false);
} else {
booleanFuture.complete(resetIncludeHead ? result <= 0 : result < 0);
completehasMessageAvailableWithValue(booleanFuture,
resetIncludeHead ? result <= 0 : result < 0);
}
} else if (lastMessageId == null || lastMessageId.getEntryId() < 0) {
booleanFuture.complete(false);
completehasMessageAvailableWithValue(booleanFuture, false);
} else {
booleanFuture.complete(resetIncludeHead);
completehasMessageAvailableWithValue(booleanFuture, resetIncludeHead);
}
}).exceptionally(ex -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription, ex);
Expand All @@ -1925,16 +1926,16 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
}

if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) {
booleanFuture.complete(true);
completehasMessageAvailableWithValue(booleanFuture, true);
return booleanFuture;
}

getLastMessageIdAsync().thenAccept(messageId -> {
lastMessageIdInBroker = messageId;
if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) {
booleanFuture.complete(true);
completehasMessageAvailableWithValue(booleanFuture, true);
} else {
booleanFuture.complete(false);
completehasMessageAvailableWithValue(booleanFuture, false);
}
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
Expand All @@ -1945,16 +1946,16 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
} else {
// read before, use lastDequeueMessage for comparison
if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) {
booleanFuture.complete(true);
completehasMessageAvailableWithValue(booleanFuture, true);
return booleanFuture;
}

getLastMessageIdAsync().thenAccept(messageId -> {
lastMessageIdInBroker = messageId;
if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) {
booleanFuture.complete(true);
completehasMessageAvailableWithValue(booleanFuture, true);
} else {
booleanFuture.complete(false);
completehasMessageAvailableWithValue(booleanFuture, false);
}
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
Expand All @@ -1966,6 +1967,12 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
return booleanFuture;
}

private void completehasMessageAvailableWithValue(CompletableFuture<Boolean> future, boolean value) {
internalPinnedExecutor.execute(() -> {
future.complete(value);
});
}

private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId messageId, boolean inclusive) {
if (inclusive && lastMessageIdInBroker.compareTo(messageId) >= 0 &&
((MessageIdImpl) lastMessageIdInBroker).getEntryId() != -1) {
Expand Down

0 comments on commit ed42007

Please sign in to comment.