-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pulsar-client-cpp crashes consumer due to thread safty #7851
Comments
Though make First, make a correction that void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
/* ... */
unAckedMessageTrackerPtr_->clear();
batchAcknowledgementTracker_.clear();
/* ... */
uint64_t requestId = client->newRequestId();
SharedBuffer cmd = Commands::newSubscribe(
topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
startMessageId_, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition());
// trigger the `handleCreateConsumer()` method after receiving response from broker
cnx->sendRequestWithId(cmd, requestId)
.addListener(
std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1));
} The future is completed in void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
static bool firstTime = true;
if (result == ResultOk) {
/* ... */
consumerCreatedPromise_.setValue(shared_from_this()); // getConsumerCreatedFuture() is completed
} else { Then void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr partitionMetadata,
TopicNamePtr topicName, const std::string& consumerName,
ConsumerConfiguration conf, SubscribeCallback callback) {
if (result == ResultOk) {
/* ... */
// `handleConsumerCreated` is called when getConsumerCreatedFuture() is completed
consumer->getConsumerCreatedFuture().addListener(
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, consumer));
Lock lock(mutex_);
consumers_.push_back(consumer);
lock.unlock();
consumer->start();
} else { void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
SubscribeCallback callback, ConsumerImplBasePtr consumer) {
callback(result, Consumer(consumer)); // Finally, the consumer argument of `Client::subscribe` is set to this consumer
} Therefore, the consumer could only call |
ConsumerImpl::connectionOpened may also comes from a re-connection after connection breaked? |
Fixes #7851 ### Motivation `clear()` methods of `BatchAcknowledgementTracker` and `UnAckedMessageTrackerEnabled` are not thread-safe. ### Modifications Acquire a mutex in these `clear()` methods.
Fixes apache#7851 ### Motivation `clear()` methods of `BatchAcknowledgementTracker` and `UnAckedMessageTrackerEnabled` are not thread-safe. ### Modifications Acquire a mutex in these `clear()` methods.
Fixes apache#7851 ### Motivation `clear()` methods of `BatchAcknowledgementTracker` and `UnAckedMessageTrackerEnabled` are not thread-safe. ### Modifications Acquire a mutex in these `clear()` methods.
Fixes apache#7851 ### Motivation `clear()` methods of `BatchAcknowledgementTracker` and `UnAckedMessageTrackerEnabled` are not thread-safe. ### Modifications Acquire a mutex in these `clear()` methods.
Fixes apache#7851 ### Motivation `clear()` methods of `BatchAcknowledgementTracker` and `UnAckedMessageTrackerEnabled` are not thread-safe. ### Modifications Acquire a mutex in these `clear()` methods.
Fixes apache#7851 ### Motivation `clear()` methods of `BatchAcknowledgementTracker` and `UnAckedMessageTrackerEnabled` are not thread-safe. ### Modifications Acquire a mutex in these `clear()` methods. (cherry picked from commit 97f4112)
Describe the bug
When running the pulsar-client-cpp on 2.5.2 we are experiencing the following stack trace. Our code instanciates and holds open a client then time slices the consumers on a number of threads. Each thread subscribe, consumes message followed by immediate acknowledge, then closes the consumer. The stack trace has been seen between a few hours and a month apart. We finally received a core file for the crash shown below:
#0 0x00007f03272b02c7 in raise () from /lib64/libc.so.6
#1 0x00007f03272b19b8 in abort () from /lib64/libc.so.6
#2 0x0000000004075565 in Basics::Backtrace::DoCoreDump(char const*, bool) ()
#3 0x00000000040c95c5 in Basics::GlobalSignalHandlers::logCoreDump() ()
#4 0x00000000040ca6a5 in Basics::sigHandler_withinATryCatch(int, siginfo*, void*) ()
#5 0x00000000040ca77e in Basics::sigHandler(int, siginfo*, void*) ()
#6
#7 0x00007f018313414e in pulsar::MessageId::operator< (this=0x7f030ea6c9e8, other=
@0x7f00ebffaf00: {impl_ = std::shared_ptr (count 3, weak 0) 0x7f030f68b538})
at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/MessageId.cc:100
#8 0x00007f01831c63f0 in operator() (this=0x7f030ea6c9c0, _y=@0x7f00ebffaf00: {impl = std::shared_ptr (count 3, weak 0) 0x7f030f68b538},
__x=) at /usr/include/c++/4.8.2/bits/stl_function.h:235
#9 std::_Rb_tree<pulsar::MessageId, std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator > >, std::_Select1st<std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator > > >, std::lesspulsar::MessageId, std::allocator<std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator > > > >::_M_get_insert_hint_unique_pos (
this=this@entry=0x7f030ea6c9c0, __position=, _k=@0x7f00ebffaf00: {impl = std::shared_ptr (count 3, weak 0) 0x7f030f68b538})
at /usr/include/c++/4.8.2/bits/stl_tree.h:1422
#10 0x00007f01831c6419 in std::_Rb_tree<pulsar::MessageId, std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator > >, std::_Select1st<std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator > > >, std::lesspulsar::MessageId, std::allocator<std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator > > > >::M_insert_unique<std::pair<pulsar::MessageId, boost::dynamic_bitset<unsigned long, std::allocator > > >(std::_Rb_tree_const_iterator<std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator > > >, std::pair<pulsar::MessageId, boost::dynamic_bitset<unsigned long, std::allocator > >&&) (this=this@entry=0x7f030ea6c9c0, __position=, _position@entry=
{first = {impl = std::shared_ptr (count -660842048, weak 32512) 0xffffffffffffffff}, second = {static bits_per_block = <error reading variable: No global symbol "boost::dynamic_bitset<unsigned long, std::allocator >::bits_per_block".>, static npos = , static ulong_width = <error reading variable: No global symbol "boost::dynamic_bitset<unsigned long, std::allocator >::ulong_width".>, m_bits = std::vector of length 128, capacity -25369645 = {139645905820120, 139645905820096, 139645903947864, 139645903947840, 139645895612888, 139645895612864, 139645903478456, 139645903478432, 139645905415224, 139645905415200, 139645907124328, 139645907124304, 139645897524632, 139645897524608, 139645896941944, 139645896941920, 139645903479768, 139645903479744, 139645897213704, 139645897213680, 139645897213832, 139645897213808, 139645902580088, 139645902580064, 139645902579928, 139645902579904, 139645896941448, 139645896941424, 139645897213320, 139645897213296, 139645901676504, 139645901676480, 139645899269576, 139645899269552, 139645905827016, 139645905826992, 139645898799784, 139645898799760, 139645901615560, 139645901615536, 139645903352184, 139645903352160, 139645905772872, 139645905772848, 139645899479656, 139645899479632, 139645905308152, 139645905308128, 139645897513528, 139645897513504, 139645896668632, 139645896668608, 139645898799128, 139645898799104, 139645905331112, 139645905331088, 139645905331304, 139645905331280, 139645898799256, 139645898799232, 139645899213224, 139645899213200, 139645905418312, 139645905418288, 139645903262888, 139645903262864, 139645896344696, 139645896344672, 139645899177384, 139645899177360, 139645905401368, 139645905401344, 139645907120840, 139645907120816, 139645896458248, 139645896458224, 139645898790536, 139645898790512, 139645897192552, 139645897192528, 139645899378632, 139645899378608, 112513136, 216, 112528512, 433, 139645899296624, 139645900272320, 139645898787520, 139645898012976, 112486928, 61818273202178, 112528416, 273, 139645895573880, 139645895573880, 0, 0, 45035996273704978, 45035996273704978, 1, 114046456, 0, 0, 0, 139645898784768, 114046456, 139645898012928, 0, 0, 139645897425536, 139642271694848, 0, 139645897425560, 139645897425560, 0, 114046456, 139645897424640, 0, 0, 139645897424848, 139642271694848, 0, 139645897425640, 139645897425640, 0, 336, 68}, m_num_bits = 139645694467488}},
__v=__v@entry=<unknown type in /usr/lib/debug/usr/local/itom-di-pulsarudx/lib/libitom-di-pulsarudx-9.2.1.so.debug, CU 0x59efd9, DIE 0x5aa2ab>)
at /usr/include/c++/4.8.2/bits/stl_tree.h:1478
#11 0x00007f01831c4d19 in insert<std::pair<pulsar::MessageId, boost::dynamic_bitset<> >, void> (
__x=<unknown type in /usr/lib/debug/usr/local/itom-di-pulsarudx/lib/libitom-di-pulsarudx-9.2.1.so.debug, CU 0x59efd9, DIE 0x5aa2ab>,
_position=, this=0x7f030ea6c9c0) at /usr/include/c++/4.8.2/bits/stl_map.h:657
#12 pulsar::BatchAcknowledgementTracker::receivedMessage (this=this@entry=0x7f030ea6c998, message=warning: RTTI symbol not found for class 'std::_Sp_counted_ptr_inplace<pulsar::MessageImpl, std::allocatorpulsar::MessageImpl, (__gnu_cxx::_Lock_policy)2>'
warning: RTTI symbol not found for class 'std::_Sp_counted_ptr_inplace<pulsar::MessageImpl, std::allocatorpulsar::MessageImpl, (__gnu_cxx::_Lock_policy)2>'
---Type to continue, or q to quit---
@0x7f00ebffb350: {impl = std::shared_ptr (count 1, weak 0) 0x7f030f6a7148})
at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc:61
#13 0x00007f0183113a2e in pulsar::ConsumerImpl::receiveIndividualMessagesFromBatch (this=this@entry=0x7f030ea6bcc8, cnx=warning: RTTI symbol not found for class 'std::_Sp_counted_ptr<pulsar::ClientConnection*, (__gnu_cxx::_Lock_policy)2>'
warning: RTTI symbol not found for class 'std::_Sp_counted_ptr<pulsar::ClientConnection*, (__gnu_cxx::_Lock_policy)2>'
std::shared_ptr (count 7, weak 4) 0x7f030c018fd0, batchedMessage=warning: RTTI symbol not found for class 'std::_Sp_counted_ptr_inplace<pulsar::MessageImpl, std::allocatorpulsar::MessageImpl, (__gnu_cxx::_Lock_policy)2>'
warning: RTTI symbol not found for class 'std::_Sp_counted_ptr_inplace<pulsar::MessageImpl, std::allocatorpulsar::MessageImpl, (__gnu_cxx::Lock_policy)2>'
@0x7f00ebffb350: {impl = std::shared_ptr (count 1, weak 0) 0x7f030f6a7148},
redeliveryCount=0) at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/ConsumerImpl.cc:369
Now understanding that we are doing a close, focus in on Line #12. This shows that we are manipulating a RB tree which is not inherently thread safe. And we have two possible competing threads:
To Reproduce
Steps to reproduce the behavior:
Since this is 1/1 Billion to 1/1Trillion failure rate per message. This is not advisable to attempt to reproduce and simply wait for the failure to occur while the software grinds on processing messages and closing the consumers()...
Create a Unit test that executes 2 threads.
Core dump should occur fairly quickly.
Expected behavior
Software runs without crashing.
Desktop (please complete the following information):
Additional context
Code inspection of BatchAcknowledgmentTracker and UnAcknowledgeMessageTrackerEnabled classes both show similar problems with the clear() not using a mutex and thus not thread safe.
'
The text was updated successfully, but these errors were encountered: