From aa4cbf383d675a4c4998e15e8372ad1d89cbe8c2 Mon Sep 17 00:00:00 2001 From: Karen Chen <64801825+karenc-bq@users.noreply.github.com> Date: Mon, 4 Apr 2022 18:04:07 -0700 Subject: [PATCH] [RDS-681] pooled writer connection (#120) * test: implement basic connection pool failover integration test --- integration/failover_integration_test.cc | 64 +++++++++++------------- 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/integration/failover_integration_test.cc b/integration/failover_integration_test.cc index 9a742b9ac..0fac470e8 100644 --- a/integration/failover_integration_test.cc +++ b/integration/failover_integration_test.cc @@ -248,15 +248,18 @@ void wait_until_writer_instance_changed(Aws::RDS::RDSClient client, Aws::String } } -void failover_cluster(Aws::RDS::RDSClient client, Aws::String cluster_id) { +void failover_cluster(Aws::RDS::RDSClient client, Aws::String cluster_id, Aws::String target_instance_id = "") { wait_until_cluster_has_right_state(client, cluster_id); Aws::RDS::Model::FailoverDBClusterRequest rds_req; rds_req.WithDBClusterIdentifier(cluster_id); + if (!target_instance_id.empty()) { + rds_req.WithTargetDBInstanceIdentifier(target_instance_id); + } auto outcome = client.FailoverDBCluster(rds_req); } -void failover_cluster_and_wait_until_writer_changed(Aws::RDS::RDSClient client, Aws::String cluster_id, Aws::String cluster_writer_id) { - failover_cluster(client, cluster_id); +void failover_cluster_and_wait_until_writer_changed(Aws::RDS::RDSClient client, Aws::String cluster_id, Aws::String cluster_writer_id, Aws::String target_writer_id = "") { + failover_cluster(client, cluster_id, target_writer_id); wait_until_writer_instance_changed(client, cluster_id, cluster_writer_id); } @@ -448,34 +451,33 @@ TEST_F(FailoverIntegrationTest, test_writerFailWithinTransaction_setAutocommitSq EXPECT_EQ(SQL_SUCCESS, SQLDisconnect(dbc)); } -// Obsolete, removed once all integration tests are implemented -TEST_F(FailoverIntegrationTest, EndToEndTest) { - std::string initial_writer = retrieve_writer_endpoint(rds_client, cluster_id, DB_CONN_STR_SUFFIX).second; +/* Pooled connection tests. */ - build_connection_string(conn_in, dsn, user, pwd, initial_writer, MYSQL_PORT, db); - SQLCHAR conn_out[4096], message[SQL_MAX_MESSAGE_LENGTH]; +/* Writer connection failover within the connection pool. */ +TEST_F(FailoverIntegrationTest, test_pooledWriterConnection_BasicFailover) { + std::vector instances = retrieve_topology_via_SDK(rds_client, cluster_id); + const auto initial_writer_id = instances[0]; + const auto nominated_writer_id = instances[1]; + const auto initial_writer_endpoint = initial_writer_id + DB_CONN_STR_SUFFIX; + + // Enable connection pooling + EXPECT_EQ(SQL_SUCCESS, SQLSetEnvAttr(NULL, SQL_ATTR_CONNECTION_POOLING, (SQLPOINTER)SQL_CP_ONE_PER_DRIVER, 0)); + EXPECT_EQ(SQL_SUCCESS, SQLSetEnvAttr(env, SQL_ATTR_CP_MATCH, reinterpret_cast(SQL_CP_STRICT_MATCH), 0)); + + build_connection_string(conn_in, dsn, user, pwd, initial_writer_endpoint, MYSQL_PORT, db); + SQLCHAR conn_out[4096], sqlstate[6], message[SQL_MAX_MESSAGE_LENGTH]; SQLINTEGER native_error; - SQLSMALLINT len; + SQLSMALLINT len, length; SQLRETURN rc = SQLDriverConnect(dbc, nullptr, conn_in, SQL_NTS, conn_out, MAX_NAME_LEN, &len, SQL_DRIVER_NOPROMPT); if ((rc != SQL_SUCCESS) && (rc != SQL_SUCCESS_WITH_INFO)) { FAIL(); } - SQLCHAR buf[255]; - SQLLEN buflen; - SQLHSTMT handle; - const auto query = (SQLCHAR*)"SELECT CONCAT(@@hostname, ':', @@port)"; - - EXPECT_EQ(SQL_SUCCESS, SQLAllocHandle(SQL_HANDLE_STMT, dbc, &handle)); - EXPECT_EQ(SQL_SUCCESS, SQLExecDirect(handle, query, SQL_NTS)); - EXPECT_EQ(SQL_SUCCESS, SQLFetch(handle)); - EXPECT_EQ(SQL_SUCCESS, SQLGetData(handle, 1, SQL_CHAR, buf, sizeof(buf), &buflen)); - EXPECT_EQ(SQL_SUCCESS, SQLFreeHandle(SQL_HANDLE_STMT, handle)); - - failover_cluster(rds_client, cluster_id); - std::this_thread::sleep_for(std::chrono::seconds(90)); + failover_cluster_and_wait_until_writer_changed(rds_client, cluster_id, initial_writer_id, nominated_writer_id); + SQLCHAR buf2[255]; + SQLLEN buflen2; SQLHSTMT handle2; SQLSMALLINT stmt_length; SQLCHAR stmt_sqlstate[6]; @@ -488,17 +490,11 @@ TEST_F(FailoverIntegrationTest, EndToEndTest) { const std::string state = (char*)stmt_sqlstate; const std::string expected = "08S02"; EXPECT_EQ(expected, state); - - SQLCHAR buf3[255]; - SQLLEN buflen3; - SQLHSTMT handle3; - const auto query3 = (SQLCHAR*)"SELECT CONCAT(@@hostname, ':', @@port)"; - - EXPECT_EQ(SQL_SUCCESS, SQLAllocHandle(SQL_HANDLE_STMT, dbc, &handle3)); - EXPECT_EQ(SQL_SUCCESS, SQLExecDirect(handle3, query3, SQL_NTS)); - EXPECT_EQ(SQL_SUCCESS, SQLFetch(handle3)); - EXPECT_EQ(SQL_SUCCESS, SQLGetData(handle3, 1, SQL_CHAR, buf3, sizeof(buf3), &buflen3)); - EXPECT_EQ(SQL_SUCCESS, SQLFreeHandle(SQL_HANDLE_STMT, handle3)); - + + const std::string current_connection_id = query_instance_id(dbc); + const std::string next_cluster_writer_id = get_DB_cluster_writer_instance_id(rds_client, cluster_id); + EXPECT_TRUE(is_DB_instance_writer(rds_client, cluster_id, current_connection_id)); + EXPECT_EQ(next_cluster_writer_id, current_connection_id); + EXPECT_EQ(nominated_writer_id, current_connection_id); EXPECT_EQ(SQL_SUCCESS, SQLDisconnect(dbc)); }