From b1ca0feb584c64ee692653b087a75fbfd132e20f Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Tue, 28 Mar 2017 18:50:59 -0700 Subject: [PATCH 1/5] Corrected C++ Tests --- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 25 ++++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 670873cebe39d..80ce5de80e0af 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -25,6 +25,7 @@ #include "boost/date_time/posix_time/posix_time.hpp" #include "CustomRoutingPolicy.h" #include +#include #include "HttpHelper.h" @@ -34,6 +35,7 @@ DECLARE_LOG_OBJECT() using namespace pulsar; +boost::mutex mutex_; static int globalTestBatchMessagesCounter = 0; static int globalCount = 0; static int globalResendMessageCount = 0; @@ -119,13 +121,19 @@ TEST(BasicEndToEndTest, testBatchMessages) } void resendMessage(Result r, const Message& msg, Producer &producer) { + int attemptNumber = boost::lexical_cast(msg.getProperty("attempt#")); + Lock lock(mutex_); if (r != ResultOk) { - int attemptNumber = boost::lexical_cast(msg.getProperty("attempt#")); - if (attemptNumber++ < 3) { + LOG_DEBUG("attempt#" << attemptNumber); + if (attemptNumber < 3) { globalResendMessageCount++; - producer.sendAsync(MessageBuilder().setProperty("attempt#", boost::lexical_cast(attemptNumber)).build(), + lock.unlock(); + producer.sendAsync(MessageBuilder().setProperty("attempt#", boost::lexical_cast(attemptNumber + 1)).build(), boost::bind(resendMessage, _1, _2, producer)); } + } else { + producer.sendAsync(MessageBuilder().setProperty("attempt#", boost::lexical_cast(attemptNumber + 1)).build(), + boost::bind(resendMessage, _1, _2, producer)); } } @@ -378,6 +386,11 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer) Producer producer; Result result = client.createProducer(topicName, producer); ASSERT_EQ(ResultOk, result); + + Consumer consumer; + result = client.subscribe(topicName, "subscription-A", consumer); + ASSERT_EQ(ResultOk, result); + for (int i = 0; i < 10; i++ ) { boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time()); long nanoSeconds = t.time_of_day().total_nanoseconds(); @@ -388,9 +401,7 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer) LOG_INFO("Message Timestamp is " << msg.getPublishTimestamp()); LOG_INFO("Message is " << msg); } - Consumer consumer; - result = client.subscribe(topicName, "subscription-A", consumer); - ASSERT_EQ(ResultOk, result); + ASSERT_EQ(consumer.getSubscriptionName(), "subscription-A"); for (int i = 0; i < 10; i++) { Message m; @@ -815,6 +826,6 @@ TEST(BasicEndToEndTest, testMessageListenerPause) // 3 seconds usleep(3 * 1000 * 1000); - + Lock lock(mutex_); ASSERT_EQ(globalResendMessageCount, 3); } From b0537c17daf973bd6692f7c1a3d9e62b25889da2 Mon Sep 17 00:00:00 2001 From: jai1 Date: Wed, 29 Mar 2017 23:28:06 -0700 Subject: [PATCH 2/5] Update BasicEndToEndTest.cc --- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 80ce5de80e0af..422662bbe1dac 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -826,6 +826,6 @@ TEST(BasicEndToEndTest, testMessageListenerPause) // 3 seconds usleep(3 * 1000 * 1000); - Lock lock(mutex_); + Lock lock(mutex_); ASSERT_EQ(globalResendMessageCount, 3); } From 35e52d59b953b16ed1d744456650b55bcc35e976 Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Thu, 30 Mar 2017 01:11:43 -0700 Subject: [PATCH 3/5] Intermittent Failures in CPP Tests --- pulsar-client-cpp/tests/BatchMessageTest.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc index 8329156a1ab97..4dfa290493239 100644 --- a/pulsar-client-cpp/tests/BatchMessageTest.cc +++ b/pulsar-client-cpp/tests/BatchMessageTest.cc @@ -128,7 +128,8 @@ TEST(BatchMessageTest, testProducerTimeout) { /* End the timer */ end = time(NULL); LOG_INFO("end = "< Date: Thu, 30 Mar 2017 12:46:17 -0700 Subject: [PATCH 4/5] Refactored the test case --- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 30 +++++++++----------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 422662bbe1dac..52b3565f8625e 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -38,7 +38,7 @@ using namespace pulsar; boost::mutex mutex_; static int globalTestBatchMessagesCounter = 0; static int globalCount = 0; -static int globalResendMessageCount = 0; +static long globalResendMessageCount = 0; static std::string lookupUrl = "pulsar://localhost:8885"; static std::string adminUrl = "http://localhost:8765/"; @@ -121,20 +121,16 @@ TEST(BasicEndToEndTest, testBatchMessages) } void resendMessage(Result r, const Message& msg, Producer &producer) { - int attemptNumber = boost::lexical_cast(msg.getProperty("attempt#")); Lock lock(mutex_); if (r != ResultOk) { - LOG_DEBUG("attempt#" << attemptNumber); - if (attemptNumber < 3) { - globalResendMessageCount++; - lock.unlock(); - producer.sendAsync(MessageBuilder().setProperty("attempt#", boost::lexical_cast(attemptNumber + 1)).build(), - boost::bind(resendMessage, _1, _2, producer)); - } - } else { - producer.sendAsync(MessageBuilder().setProperty("attempt#", boost::lexical_cast(attemptNumber + 1)).build(), - boost::bind(resendMessage, _1, _2, producer)); + LOG_DEBUG("globalResendMessageCount" << globalResendMessageCount); + if (globalResendMessageCount++ >= 3) { + return; + } + lock.unlock(); } + producer.sendAsync(MessageBuilder().build(), + boost::bind(resendMessage, _1, _2, producer)); } TEST(BasicEndToEndTest, testProduceConsume) @@ -804,7 +800,7 @@ TEST(BasicEndToEndTest, testMessageListenerPause) client.close(); } - TEST(BasicEndToEndTest, testResendViaListener) + TEST(BasicEndToEndTest, testResendViaSendCallback) { Client client(lookupUrl); std::string topicName = "persistent://my-property/my-cluster/my-namespace/testResendViaListener"; @@ -821,11 +817,13 @@ TEST(BasicEndToEndTest, testMessageListenerPause) Result result = producerFuture.get(producer); ASSERT_EQ(ResultOk, result); - // Send asynchronously - producer.sendAsync(MessageBuilder().setProperty("attempt#", boost::lexical_cast(0)).build(), boost::bind(resendMessage, _1, _2, producer)); + // Send asynchronously for 3 seconds + // Expect timeouts since we have set timeout to 1 ms + // On receiving timeout send the message using the CMS client IO thread via cb function. + producer.sendAsync(MessageBuilder().build(), boost::bind(resendMessage, _1, _2, producer)); // 3 seconds usleep(3 * 1000 * 1000); Lock lock(mutex_); - ASSERT_EQ(globalResendMessageCount, 3); + ASSERT_GE(globalResendMessageCount, 3); } From 4b1ae36e07a75c8ffe000e0960cdce2f4d2e59d2 Mon Sep 17 00:00:00 2001 From: jai1 Date: Thu, 30 Mar 2017 15:05:00 -0700 Subject: [PATCH 5/5] Update BasicEndToEndTest.cc --- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 52b3565f8625e..bc18b82dbf5e8 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -542,8 +542,12 @@ TEST(BasicEndToEndTest, testSinglePartitionRoutingPolicy) Producer producer; ProducerConfiguration producerConfiguration; producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); - Result result = client.createProducer(topicName, producerConfiguration, producer); + + Consumer consumer; + result = client.subscribe(topicName, "subscription-A", consumer); + ASSERT_EQ(ResultOk, result); + ASSERT_EQ(ResultOk, result); for (int i = 0; i < 10; i++ ) { boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time()); @@ -553,9 +557,7 @@ TEST(BasicEndToEndTest, testSinglePartitionRoutingPolicy) Message msg = MessageBuilder().setContent(ss.str()).build(); ASSERT_EQ(ResultOk, producer.send(msg)); } - Consumer consumer; - result = client.subscribe(topicName, "subscription-A", consumer); - ASSERT_EQ(ResultOk, result); + for (int i = 0; i < 10; i++) { Message m; consumer.receive(m);