Skip to content

Commit

Permalink
[C++] ClientImpl::handleClose shouldn't use statics (apache#7068)
Browse files Browse the repository at this point in the history
ClientImpl::handleClose was using static variables to record the first
error on closing a client. This is just wrong. A static stack variable
in c++ acts like a global. So if errorClosing was ever set to true in
a process, all clients closed in that process after that point would
be errored with first error.

This fixes a failure in BasicEndToEndTest.testDelayedMessages, which
happens sporadically on C++, and always when running the tests in
serial, probably due to a double close in some other test.

Co-authored-by: Ivan Kelly <ikelly@splunk.com>
  • Loading branch information
merlimat and Ivan Kelly authored May 29, 2020
1 parent 0068bd9 commit ff594b8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
24 changes: 11 additions & 13 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
pool_(clientConfiguration_, ioExecutorProvider_, clientConfiguration_.getAuthPtr(), poolConnections),
producerIdGenerator_(0),
consumerIdGenerator_(0),
requestIdGenerator_(0) {
requestIdGenerator_(0),
closingError(ResultOk) {
if (clientConfiguration_.getLogger()) {
// A logger factory was explicitely configured. Let's just use that
LogUtils::setLoggerFactory(clientConfiguration_.getLogger());
Expand Down Expand Up @@ -510,30 +511,27 @@ void ClientImpl::closeAsync(CloseCallback callback) {
}

void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, ResultCallback callback) {
static bool errorClosing = false;
static Result failResult = ResultOk;
if (result != ResultOk) {
errorClosing = true;
failResult = result;
Result expected = ResultOk;
if (!closingError.compare_exchange_strong(expected, result)) {
LOG_DEBUG("Tried to updated closingError, but already set to "
<< expected << ". This means multiple errors have occurred while closing the client");
}

if (*numberOfOpenHandlers > 0) {
--(*numberOfOpenHandlers);
}
if (*numberOfOpenHandlers == 0) {
Lock lock(mutex_);
state_ = Closed;
lock.unlock();
if (errorClosing) {
LOG_DEBUG("Problem in closing client, could not close one or more consumers or producers");
if (callback) {
callback(failResult);
}
}

LOG_DEBUG("Shutting down producers and consumers for client");
shutdown();
if (callback) {
callback(ResultOk);
if (closingError != ResultOk) {
LOG_DEBUG("Problem in closing client, could not close one or more consumers or producers");
}
callback(closingError);
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion pulsar-client-cpp/lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include <lib/TopicName.h>
#include "ProducerImplBase.h"
#include "ConsumerImplBase.h"

#include <atomic>
#include <vector>

namespace pulsar {
Expand Down Expand Up @@ -148,6 +148,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
typedef std::vector<ConsumerImplBaseWeakPtr> ConsumersList;
ConsumersList consumers_;

std::atomic<Result> closingError;

friend class Client;
};
} /* namespace pulsar */
Expand Down

0 comments on commit ff594b8

Please sign in to comment.