Skip to content

Commit

Permalink
Fix flake in C++ negative acknowledgement tests (apache#7099)
Browse files Browse the repository at this point in the history
Negative acknowledgement runs in the background on a consumer and
triggers redelivery of messages. The tests verify a that messages do
indeed get redelivered, and which messages they are, for the base
case, batching and partitioned consumer.

There's a fundamental dependency on timing in the base case. If 100ms
pass between consumer creation and receiving the last message in first
receive loop, redelivery will be triggered and the order of messages,
as asserted by the test will fail.

This first case can be fixed by moving the negative ack to run after
all messages have been received. However, this can also then fail for
the batch case.

If the negative ack tracker kicks off during the loop to negatively
ack the messages, then the redelivery will happen twice (and possibly
more times depending on how many time it manages to run).

For this reason, if we want the test to be deterministic, we need to
disable the tracker from kicking off redelivery while we send mark the
messages as negatively acked.

Co-authored-by: Ivan Kelly <ikelly@splunk.com>
  • Loading branch information
merlimat and Ivan Kelly authored May 31, 2020
1 parent 973e4cc commit ae324b1
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 6 deletions.
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1184,4 +1184,8 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
}
}

void ConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
negativeAcksTracker_.setEnabledForTesting(enabled);
}

} /* namespace pulsar */
5 changes: 5 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class ConsumerImpl : public ConsumerImplBase,
void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
void failPendingReceiveCallback();
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);

Optional<MessageId> clearReceiveQueue();

Expand Down Expand Up @@ -197,6 +198,10 @@ class ConsumerImpl : public ConsumerImplBase,
}

friend class PulsarFriend;

// these two declared friend to access setNegativeAcknowledgeEnabledForTesting
friend class MultiTopicsConsumerImpl;
friend class PartitionedConsumerImpl;
};

} /* namespace pulsar */
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ class ConsumerImplBase {
virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
virtual void seekAsync(uint64_t timestamp, ResultCallback callback) = 0;
virtual void negativeAcknowledge(const MessageId& msgId) = 0;

private:
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;

friend class PulsarFriend;
};
} // namespace pulsar
#endif // PULSAR_CONSUMER_IMPL_BASE_HEADER
7 changes: 7 additions & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -746,3 +746,10 @@ void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c
void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
callback(ResultOperationNotSupported);
}

void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
Lock lock(mutex_);
for (auto&& c : consumers_) {
c.second->setNegativeAcknowledgeEnabledForTesting(enabled);
}
}
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void handleOneTopicUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed,
int numberPartitions, TopicNamePtr topicNamePtr,
std::string& topicPartitionName, ResultCallback callback);

private:
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
};

} // namespace pulsar
Expand Down
16 changes: 14 additions & 2 deletions pulsar-client-cpp/lib/NegativeAcksTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ namespace pulsar {

NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &consumer,
const ConsumerConfiguration &conf)
: consumer_(consumer), timerInterval_(0), executor_(client->getIOExecutorProvider()->get()) {
: consumer_(consumer),
timerInterval_(0),
executor_(client->getIOExecutorProvider()->get()),
enabledForTesting_(true) {
static const long MIN_NACK_DELAY_MILLIS = 100;

nackDelay_ =
Expand All @@ -56,7 +59,7 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {
std::lock_guard<std::mutex> lock(mutex_);
timer_ = nullptr;

if (nackedMessages_.empty()) {
if (nackedMessages_.empty() || !enabledForTesting_) {
return;
}

Expand Down Expand Up @@ -103,4 +106,13 @@ void NegativeAcksTracker::close() {
}
}

void NegativeAcksTracker::setEnabledForTesting(bool enabled) {
std::lock_guard<std::mutex> lock(mutex_);
enabledForTesting_ = enabled;

if (enabledForTesting_ && !timer_) {
scheduleTimer();
}
}

} // namespace pulsar
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/NegativeAcksTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class NegativeAcksTracker {

void close();

void setEnabledForTesting(bool enabled);

private:
void scheduleTimer();
void handleTimer(const boost::system::error_code &ec);
Expand All @@ -55,6 +57,7 @@ class NegativeAcksTracker {

ExecutorServicePtr executor_;
DeadlineTimerPtr timer_;
bool enabledForTesting_; // to be able to test deterministically
};

} // namespace pulsar
7 changes: 7 additions & 0 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -592,4 +592,11 @@ void PartitionedConsumerImpl::handleGetPartitions(Result result,
runPartitionUpdateTask();
}

void PartitionedConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
Lock lock(mutex_);
for (auto&& c : consumers_) {
c->setNegativeAcknowledgeEnabledForTesting(enabled);
}
}

} // namespace pulsar
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
void internalListener(Consumer consumer);
void receiveMessages();
void failPendingReceiveCallback();
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
Promise<Result, ConsumerImplBaseWeakPtr> partitionedConsumerCreatedPromise_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
std::queue<ReceiveCallback> pendingReceives_;
Expand Down
26 changes: 22 additions & 4 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2934,6 +2934,16 @@ TEST(BasicEndToEndTest, testPartitionedReceiveAsyncFailedConsumer) {
client.shutdown();
}

static void expectTimeoutOnRecv(Consumer &consumer) {
Message msg;
Result res = consumer.receive(msg, 100);
if (res != ResultTimeout) {
LOG_ERROR("Received a msg when not expecting to id(" << msg.getMessageId() << ") "
<< msg.getDataAsString());
}
ASSERT_EQ(ResultTimeout, res);
}

void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
Client client(lookupUrl);
Consumer consumer;
Expand All @@ -2955,14 +2965,24 @@ void testNegativeAcks(const std::string &topic, bool batchingEnabled) {

producer.flush();

std::vector<MessageId> toNeg;
for (int i = 0; i < 10; i++) {
Message msg;
consumer.receive(msg);

LOG_INFO("Received message " << msg.getDataAsString());
ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
consumer.negativeAcknowledge(msg);
toNeg.push_back(msg.getMessageId());
}
// No more messages expected
expectTimeoutOnRecv(consumer);

PulsarFriend::setNegativeAckEnabled(consumer, false);
// negatively acknowledge all at once
for (auto &&msgId : toNeg) {
consumer.negativeAcknowledge(msgId);
}
PulsarFriend::setNegativeAckEnabled(consumer, true);

for (int i = 0; i < 10; i++) {
Message msg;
Expand All @@ -2975,9 +2995,7 @@ void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
}

// No more messages expected
Message msg;
Result res = consumer.receive(msg, 100);
ASSERT_EQ(ResultTimeout, res);
expectTimeoutOnRecv(consumer);

client.shutdown();
}
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ class PulsarFriend {
return *consumerImpl;
}

static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled);
}

static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { return handler.connection_; }

static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) {
Expand Down

0 comments on commit ae324b1

Please sign in to comment.