Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioned consumer test failure #323

Merged
merged 6 commits into from
Mar 31, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "boost/date_time/posix_time/posix_time.hpp"
#include "CustomRoutingPolicy.h"
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>

#include "HttpHelper.h"

Expand All @@ -34,9 +35,10 @@ DECLARE_LOG_OBJECT()

using namespace pulsar;

boost::mutex mutex_;
static int globalTestBatchMessagesCounter = 0;
static int globalCount = 0;
static int globalResendMessageCount = 0;
static long globalResendMessageCount = 0;
static std::string lookupUrl = "pulsar://localhost:8885";
static std::string adminUrl = "http://localhost:8765/";

Expand Down Expand Up @@ -119,14 +121,16 @@ TEST(BasicEndToEndTest, testBatchMessages)
}

void resendMessage(Result r, const Message& msg, Producer &producer) {
Lock lock(mutex_);
if (r != ResultOk) {
int attemptNumber = boost::lexical_cast<int>(msg.getProperty("attempt#"));
if (attemptNumber++ < 3) {
globalResendMessageCount++;
producer.sendAsync(MessageBuilder().setProperty("attempt#", boost::lexical_cast<std::string>(attemptNumber)).build(),
boost::bind(resendMessage, _1, _2, producer));
}
LOG_DEBUG("globalResendMessageCount" << globalResendMessageCount);
if (globalResendMessageCount++ >= 3) {
return;
}
lock.unlock();
}
producer.sendAsync(MessageBuilder().build(),
boost::bind(resendMessage, _1, _2, producer));
}

TEST(BasicEndToEndTest, testProduceConsume)
Expand Down Expand Up @@ -378,6 +382,11 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer)
Producer producer;
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultOk, result);

Consumer consumer;
result = client.subscribe(topicName, "subscription-A", consumer);
ASSERT_EQ(ResultOk, result);

for (int i = 0; i < 10; i++ ) {
boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time());
long nanoSeconds = t.time_of_day().total_nanoseconds();
Expand All @@ -388,9 +397,7 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer)
LOG_INFO("Message Timestamp is " << msg.getPublishTimestamp());
LOG_INFO("Message is " << msg);
}
Consumer consumer;
result = client.subscribe(topicName, "subscription-A", consumer);
ASSERT_EQ(ResultOk, result);

ASSERT_EQ(consumer.getSubscriptionName(), "subscription-A");
for (int i = 0; i < 10; i++) {
Message m;
Expand Down Expand Up @@ -793,7 +800,7 @@ TEST(BasicEndToEndTest, testMessageListenerPause)
client.close();
}

TEST(BasicEndToEndTest, testResendViaListener)
TEST(BasicEndToEndTest, testResendViaSendCallback)
{
Client client(lookupUrl);
std::string topicName = "persistent://my-property/my-cluster/my-namespace/testResendViaListener";
Expand All @@ -810,11 +817,13 @@ TEST(BasicEndToEndTest, testMessageListenerPause)
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);

// Send asynchronously
producer.sendAsync(MessageBuilder().setProperty("attempt#", boost::lexical_cast<std::string>(0)).build(), boost::bind(resendMessage, _1, _2, producer));
// Send asynchronously for 3 seconds
// Expect timeouts since we have set timeout to 1 ms
// On receiving timeout send the message using the CMS client IO thread via cb function.
producer.sendAsync(MessageBuilder().build(), boost::bind(resendMessage, _1, _2, producer));

// 3 seconds
usleep(3 * 1000 * 1000);

ASSERT_EQ(globalResendMessageCount, 3);
Lock lock(mutex_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the failure without the lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two threads using the variable - hence W/O the lock the behavior is undefined.

ASSERT_GE(globalResendMessageCount, 3);
}
3 changes: 2 additions & 1 deletion pulsar-client-cpp/tests/BatchMessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ TEST(BatchMessageTest, testProducerTimeout) {
/* End the timer */
end = time(NULL);
LOG_INFO("end = "<<end);
ASSERT_EQ(timeout/1000.0, (double)(end - start));
// Greater than or equal to since there may be delay in sending messaging
ASSERT_GE((double)(end - start), timeout/1000.0);
}

Message receivedMsg;
Expand Down