Skip to content

Commit

Permalink
[RDS-681] pooled writer connection (#120)
Browse files Browse the repository at this point in the history
* test: implement basic connection pool failover integration test
  • Loading branch information
karenc-bq authored and yanw-bq committed Apr 27, 2023
1 parent 894546b commit aa4cbf3
Showing 1 changed file with 30 additions and 34 deletions.
64 changes: 30 additions & 34 deletions integration/failover_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,18 @@ void wait_until_writer_instance_changed(Aws::RDS::RDSClient client, Aws::String
}
}

void failover_cluster(Aws::RDS::RDSClient client, Aws::String cluster_id) {
void failover_cluster(Aws::RDS::RDSClient client, Aws::String cluster_id, Aws::String target_instance_id = "") {
wait_until_cluster_has_right_state(client, cluster_id);
Aws::RDS::Model::FailoverDBClusterRequest rds_req;
rds_req.WithDBClusterIdentifier(cluster_id);
if (!target_instance_id.empty()) {
rds_req.WithTargetDBInstanceIdentifier(target_instance_id);
}
auto outcome = client.FailoverDBCluster(rds_req);
}

void failover_cluster_and_wait_until_writer_changed(Aws::RDS::RDSClient client, Aws::String cluster_id, Aws::String cluster_writer_id) {
failover_cluster(client, cluster_id);
void failover_cluster_and_wait_until_writer_changed(Aws::RDS::RDSClient client, Aws::String cluster_id, Aws::String cluster_writer_id, Aws::String target_writer_id = "") {
failover_cluster(client, cluster_id, target_writer_id);
wait_until_writer_instance_changed(client, cluster_id, cluster_writer_id);
}

Expand Down Expand Up @@ -448,34 +451,33 @@ TEST_F(FailoverIntegrationTest, test_writerFailWithinTransaction_setAutocommitSq
EXPECT_EQ(SQL_SUCCESS, SQLDisconnect(dbc));
}

// Obsolete, removed once all integration tests are implemented
TEST_F(FailoverIntegrationTest, EndToEndTest) {
std::string initial_writer = retrieve_writer_endpoint(rds_client, cluster_id, DB_CONN_STR_SUFFIX).second;
/* Pooled connection tests. */

build_connection_string(conn_in, dsn, user, pwd, initial_writer, MYSQL_PORT, db);
SQLCHAR conn_out[4096], message[SQL_MAX_MESSAGE_LENGTH];
/* Writer connection failover within the connection pool. */
TEST_F(FailoverIntegrationTest, test_pooledWriterConnection_BasicFailover) {
std::vector<std::string> instances = retrieve_topology_via_SDK(rds_client, cluster_id);
const auto initial_writer_id = instances[0];
const auto nominated_writer_id = instances[1];
const auto initial_writer_endpoint = initial_writer_id + DB_CONN_STR_SUFFIX;

// Enable connection pooling
EXPECT_EQ(SQL_SUCCESS, SQLSetEnvAttr(NULL, SQL_ATTR_CONNECTION_POOLING, (SQLPOINTER)SQL_CP_ONE_PER_DRIVER, 0));
EXPECT_EQ(SQL_SUCCESS, SQLSetEnvAttr(env, SQL_ATTR_CP_MATCH, reinterpret_cast<SQLPOINTER>(SQL_CP_STRICT_MATCH), 0));

build_connection_string(conn_in, dsn, user, pwd, initial_writer_endpoint, MYSQL_PORT, db);
SQLCHAR conn_out[4096], sqlstate[6], message[SQL_MAX_MESSAGE_LENGTH];
SQLINTEGER native_error;
SQLSMALLINT len;
SQLSMALLINT len, length;

SQLRETURN rc = SQLDriverConnect(dbc, nullptr, conn_in, SQL_NTS, conn_out, MAX_NAME_LEN, &len, SQL_DRIVER_NOPROMPT);
if ((rc != SQL_SUCCESS) && (rc != SQL_SUCCESS_WITH_INFO)) {
FAIL();
}

SQLCHAR buf[255];
SQLLEN buflen;
SQLHSTMT handle;
const auto query = (SQLCHAR*)"SELECT CONCAT(@@hostname, ':', @@port)";

EXPECT_EQ(SQL_SUCCESS, SQLAllocHandle(SQL_HANDLE_STMT, dbc, &handle));
EXPECT_EQ(SQL_SUCCESS, SQLExecDirect(handle, query, SQL_NTS));
EXPECT_EQ(SQL_SUCCESS, SQLFetch(handle));
EXPECT_EQ(SQL_SUCCESS, SQLGetData(handle, 1, SQL_CHAR, buf, sizeof(buf), &buflen));
EXPECT_EQ(SQL_SUCCESS, SQLFreeHandle(SQL_HANDLE_STMT, handle));

failover_cluster(rds_client, cluster_id);
std::this_thread::sleep_for(std::chrono::seconds(90));
failover_cluster_and_wait_until_writer_changed(rds_client, cluster_id, initial_writer_id, nominated_writer_id);

SQLCHAR buf2[255];
SQLLEN buflen2;
SQLHSTMT handle2;
SQLSMALLINT stmt_length;
SQLCHAR stmt_sqlstate[6];
Expand All @@ -488,17 +490,11 @@ TEST_F(FailoverIntegrationTest, EndToEndTest) {
const std::string state = (char*)stmt_sqlstate;
const std::string expected = "08S02";
EXPECT_EQ(expected, state);

SQLCHAR buf3[255];
SQLLEN buflen3;
SQLHSTMT handle3;
const auto query3 = (SQLCHAR*)"SELECT CONCAT(@@hostname, ':', @@port)";

EXPECT_EQ(SQL_SUCCESS, SQLAllocHandle(SQL_HANDLE_STMT, dbc, &handle3));
EXPECT_EQ(SQL_SUCCESS, SQLExecDirect(handle3, query3, SQL_NTS));
EXPECT_EQ(SQL_SUCCESS, SQLFetch(handle3));
EXPECT_EQ(SQL_SUCCESS, SQLGetData(handle3, 1, SQL_CHAR, buf3, sizeof(buf3), &buflen3));
EXPECT_EQ(SQL_SUCCESS, SQLFreeHandle(SQL_HANDLE_STMT, handle3));


const std::string current_connection_id = query_instance_id(dbc);
const std::string next_cluster_writer_id = get_DB_cluster_writer_instance_id(rds_client, cluster_id);
EXPECT_TRUE(is_DB_instance_writer(rds_client, cluster_id, current_connection_id));
EXPECT_EQ(next_cluster_writer_id, current_connection_id);
EXPECT_EQ(nominated_writer_id, current_connection_id);
EXPECT_EQ(SQL_SUCCESS, SQLDisconnect(dbc));
}

0 comments on commit aa4cbf3

Please sign in to comment.