Skip to content

Commit

Permalink
Allow BasicEndToEndTest to run multiple times locally (apache#7069)
Browse files Browse the repository at this point in the history
The topics used in a bunch of the cases in BasicEndToEndTest had no
unique identifier, so if you ran it multiple times against the same
standalone cluster, they would fail, as the topics would already have
some state.

This change adds the unique identifer to the topics.

Co-authored-by: Ivan Kelly <ikelly@splunk.com>
  • Loading branch information
merlimat and Ivan Kelly authored May 29, 2020
1 parent 3989c98 commit 9f9478e
Showing 1 changed file with 33 additions and 18 deletions.
51 changes: 33 additions & 18 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ static int globalCount = 0;
static long globalResendMessageCount = 0;
static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";
static int uniqueCounter = 0;

static std::string unique_str() {
long nanos = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();

return std::to_string(uniqueCounter++) + "_" + std::to_string(nanos);
}

static void messageListenerFunction(Consumer consumer, const Message &msg) {
globalCount++;
Expand Down Expand Up @@ -574,12 +583,10 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer) {

TEST(BasicEndToEndTest, testPartitionedProducerConsumerSubscriptionName) {
Client client(lookupUrl);
std::string topicName = "testPartitionedProducerConsumerSubscriptionName";
std::string topicName = "testPartitionedProducerConsumerSubscriptionName" + unique_str();

// call admin api to make it partitioned
std::string url =
adminUrl +
"admin/v2/persistent/public/default/testPartitionedProducerConsumerSubscriptionName/partitions";
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
int res = makePutRequest(url, "3");

LOG_INFO("res = " << res);
Expand Down Expand Up @@ -721,11 +728,10 @@ TEST(BasicEndToEndTest, testConfigurationFile) {

TEST(BasicEndToEndTest, testSinglePartitionRoutingPolicy) {
Client client(lookupUrl);
std::string topicName = "partition-testSinglePartitionRoutingPolicy";
std::string topicName = "partition-testSinglePartitionRoutingPolicy" + unique_str();

// call admin api to make it partitioned
std::string url =
adminUrl + "admin/v2/persistent/public/default/partition-testSinglePartitionRoutingPolicy/partitions";
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
int res = makePutRequest(url, "5");

LOG_INFO("res = " << res);
Expand Down Expand Up @@ -1611,11 +1617,14 @@ TEST(BasicEndToEndTest, testPartitionTopicUnAckedMessageTimeout) {
Client client(lookupUrl);
long unAckedMessagesTimeoutMs = 10000;

std::string topicName = "persistent://public/default/testPartitionTopicUnAckedMessageTimeout";
std::string uniqueChunk = unique_str();
std::string topicName =
"persistent://public/default/testPartitionTopicUnAckedMessageTimeout" + uniqueChunk;

// call admin api to make it partitioned
std::string url =
adminUrl + "admin/v2/persistent/public/default/testPartitionTopicUnAckedMessageTimeout/partitions";
std::string url = adminUrl +
"admin/v2/persistent/public/default/testPartitionTopicUnAckedMessageTimeout" +
uniqueChunk + "/partitions";
int res = makePutRequest(url, "3");

LOG_INFO("res = " << res);
Expand Down Expand Up @@ -2352,10 +2361,11 @@ static void simpleCallback(Result code, const MessageId &msgId) {

TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) {
Client client(lookupUrl);
std::string topicName = "persistent://public/default/partition-testSyncFlushBatchMessages";
std::string uniqueChunk = unique_str();
std::string topicName = "persistent://public/default/partition-testSyncFlushBatchMessages" + uniqueChunk;
// call admin api to make it partitioned
std::string url =
adminUrl + "admin/v2/persistent/public/default/partition-testSyncFlushBatchMessages/partitions";
std::string url = adminUrl + "admin/v2/persistent/public/default/partition-testSyncFlushBatchMessages" +
uniqueChunk + "/partitions";
int res = makePutRequest(url, "5");
const int numberOfPartitions = 5;

Expand Down Expand Up @@ -2565,10 +2575,13 @@ TEST(BasicEndToEndTest, testFlushInProducer) {

TEST(BasicEndToEndTest, testFlushInPartitionedProducer) {
Client client(lookupUrl);
std::string topicName = "persistent://public/default/partition-testFlushInPartitionedProducer";
std::string uniqueChunk = unique_str();
std::string topicName =
"persistent://public/default/partition-testFlushInPartitionedProducer" + uniqueChunk;
// call admin api to make it partitioned
std::string url =
adminUrl + "admin/v2/persistent/public/default/partition-testFlushInPartitionedProducer/partitions";
std::string url = adminUrl +
"admin/v2/persistent/public/default/partition-testFlushInPartitionedProducer" +
uniqueChunk + "/partitions";
int res = makePutRequest(url, "5");
const int numberOfPartitions = 5;

Expand Down Expand Up @@ -3076,7 +3089,7 @@ TEST(BasicEndToEndTest, testRegexTopicsWithInitialPosition) {
TEST(BasicEndToEndTest, testPartitionedTopicWithOnePartition) {
ClientConfiguration config;
Client client(lookupUrl);
std::string topicName = "testPartitionedTopicWithOnePartition";
std::string topicName = "testPartitionedTopicWithOnePartition" + unique_str();
std::string subsName = topicName + "-sub-";

// call admin api to make 1 partition
Expand Down Expand Up @@ -3169,7 +3182,9 @@ TEST(BasicEndToEndTest, testDelayedMessages) {
ASSERT_EQ(ResultOk, result);
ASSERT_EQ("msg-2", msgReceived.getDataAsString());

ASSERT_EQ(ResultOk, client.close());
auto result1 = client.close();
std::cout << "closed with " << result1 << std::endl;
ASSERT_EQ(ResultOk, result1);
}

TEST(BasicEndToEndTest, testCumulativeAcknowledgeNotAllowed) {
Expand Down

0 comments on commit 9f9478e

Please sign in to comment.