diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 670873cebe39d..bc18b82dbf5e8 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -25,6 +25,7 @@ #include "boost/date_time/posix_time/posix_time.hpp" #include "CustomRoutingPolicy.h" #include +#include #include "HttpHelper.h" @@ -34,9 +35,10 @@ DECLARE_LOG_OBJECT() using namespace pulsar; +boost::mutex mutex_; static int globalTestBatchMessagesCounter = 0; static int globalCount = 0; -static int globalResendMessageCount = 0; +static long globalResendMessageCount = 0; static std::string lookupUrl = "pulsar://localhost:8885"; static std::string adminUrl = "http://localhost:8765/"; @@ -119,14 +121,16 @@ TEST(BasicEndToEndTest, testBatchMessages) } void resendMessage(Result r, const Message& msg, Producer &producer) { + Lock lock(mutex_); if (r != ResultOk) { - int attemptNumber = boost::lexical_cast(msg.getProperty("attempt#")); - if (attemptNumber++ < 3) { - globalResendMessageCount++; - producer.sendAsync(MessageBuilder().setProperty("attempt#", boost::lexical_cast(attemptNumber)).build(), - boost::bind(resendMessage, _1, _2, producer)); - } + LOG_DEBUG("globalResendMessageCount" << globalResendMessageCount); + if (globalResendMessageCount++ >= 3) { + return; + } + lock.unlock(); } + producer.sendAsync(MessageBuilder().build(), + boost::bind(resendMessage, _1, _2, producer)); } TEST(BasicEndToEndTest, testProduceConsume) @@ -378,6 +382,11 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer) Producer producer; Result result = client.createProducer(topicName, producer); ASSERT_EQ(ResultOk, result); + + Consumer consumer; + result = client.subscribe(topicName, "subscription-A", consumer); + ASSERT_EQ(ResultOk, result); + for (int i = 0; i < 10; i++ ) { boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time()); long nanoSeconds = t.time_of_day().total_nanoseconds(); @@ -388,9 +397,7 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer) LOG_INFO("Message Timestamp is " << msg.getPublishTimestamp()); LOG_INFO("Message is " << msg); } - Consumer consumer; - result = client.subscribe(topicName, "subscription-A", consumer); - ASSERT_EQ(ResultOk, result); + ASSERT_EQ(consumer.getSubscriptionName(), "subscription-A"); for (int i = 0; i < 10; i++) { Message m; @@ -535,8 +542,12 @@ TEST(BasicEndToEndTest, testSinglePartitionRoutingPolicy) Producer producer; ProducerConfiguration producerConfiguration; producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); - Result result = client.createProducer(topicName, producerConfiguration, producer); + + Consumer consumer; + result = client.subscribe(topicName, "subscription-A", consumer); + ASSERT_EQ(ResultOk, result); + ASSERT_EQ(ResultOk, result); for (int i = 0; i < 10; i++ ) { boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time()); @@ -546,9 +557,7 @@ TEST(BasicEndToEndTest, testSinglePartitionRoutingPolicy) Message msg = MessageBuilder().setContent(ss.str()).build(); ASSERT_EQ(ResultOk, producer.send(msg)); } - Consumer consumer; - result = client.subscribe(topicName, "subscription-A", consumer); - ASSERT_EQ(ResultOk, result); + for (int i = 0; i < 10; i++) { Message m; consumer.receive(m); @@ -793,7 +802,7 @@ TEST(BasicEndToEndTest, testMessageListenerPause) client.close(); } - TEST(BasicEndToEndTest, testResendViaListener) + TEST(BasicEndToEndTest, testResendViaSendCallback) { Client client(lookupUrl); std::string topicName = "persistent://my-property/my-cluster/my-namespace/testResendViaListener"; @@ -810,11 +819,13 @@ TEST(BasicEndToEndTest, testMessageListenerPause) Result result = producerFuture.get(producer); ASSERT_EQ(ResultOk, result); - // Send asynchronously - producer.sendAsync(MessageBuilder().setProperty("attempt#", boost::lexical_cast(0)).build(), boost::bind(resendMessage, _1, _2, producer)); + // Send asynchronously for 3 seconds + // Expect timeouts since we have set timeout to 1 ms + // On receiving timeout send the message using the CMS client IO thread via cb function. + producer.sendAsync(MessageBuilder().build(), boost::bind(resendMessage, _1, _2, producer)); // 3 seconds usleep(3 * 1000 * 1000); - - ASSERT_EQ(globalResendMessageCount, 3); + Lock lock(mutex_); + ASSERT_GE(globalResendMessageCount, 3); } diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc index 8329156a1ab97..4dfa290493239 100644 --- a/pulsar-client-cpp/tests/BatchMessageTest.cc +++ b/pulsar-client-cpp/tests/BatchMessageTest.cc @@ -128,7 +128,8 @@ TEST(BatchMessageTest, testProducerTimeout) { /* End the timer */ end = time(NULL); LOG_INFO("end = "<