Skip to content

Commit

Permalink
[issue 7851][C++] Make clear() thread-safe (#7862)
Browse files Browse the repository at this point in the history
Fixes #7851

### Motivation

`clear()` methods of `BatchAcknowledgementTracker` and `UnAckedMessageTrackerEnabled` are not thread-safe.

### Modifications

Acquire a mutex in these `clear()` methods.
(cherry picked from commit 97f4112)
  • Loading branch information
BewareMyPower authored and jiazhai committed Oct 16, 2020
1 parent 3e65a6c commit 9d02404
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 1 deletion.
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic
}

void BatchAcknowledgementTracker::clear() {
Lock lock(mutex_);
trackerMap_.clear();
sendList_.clear();
}
Expand Down
6 changes: 5 additions & 1 deletion pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
}

void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
std::lock_guard<std::mutex> acquire(lock_);
std::unique_lock<std::mutex> acquire(lock_);
LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "
<< consumerReference_.getName().c_str());

Expand All @@ -60,6 +60,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
timePartitions.push_back(headPartition);

if (msgIdsToRedeliver.size() > 0) {
// redeliverUnacknowledgedMessages() may call clear() that acquire the lock again, so we should unlock
// here to avoid deadlock
acquire.unlock();
consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver);
}
}
Expand Down Expand Up @@ -148,6 +151,7 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic)
}

void UnAckedMessageTrackerEnabled::clear() {
std::lock_guard<std::mutex> acquire(lock_);
messageIdPartitionMap.clear();
for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) {
it->clear();
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,8 @@ TEST(BasicEndToEndTest, testPartitionTopicUnAckedMessageTimeout) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
timeWaited += 500;
}

client.close();
}

TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {
Expand Down

0 comments on commit 9d02404

Please sign in to comment.